diff options
34 files changed, 178 insertions, 330 deletions
@@ -51,11 +51,11 @@ <properties> <kotlin.version>1.3.21</kotlin.version> - <arrow.version>0.8.0</arrow.version> + <arrow.version>0.9.0</arrow.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version> <jacoco.version>0.8.2</jacoco.version> - <detekt.version>1.0.0-RC11</detekt.version> + <detekt.version>1.0.0-RC14</detekt.version> <sdk.version>1.1.4-SNAPSHOT</sdk.version> <!-- Protocol buffers --> @@ -408,13 +408,10 @@ </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-stdlib-jdk8</artifactId> - <version>${kotlin.version}</version> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-reflect</artifactId> + <artifactId>kotlin-bom</artifactId> <version>${kotlin.version}</version> + <type>pom</type> + <scope>import</scope> </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> @@ -423,12 +420,6 @@ </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-script-runtime</artifactId> - <version>${kotlin.version}</version> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-script-util</artifactId> <version>${kotlin.version}</version> <scope>runtime</scope> @@ -436,7 +427,7 @@ <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> - <version>1.0.0</version> + <version>1.1.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> @@ -445,47 +436,27 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-core</artifactId> + <artifactId>arrow-core-data</artifactId> <version>${arrow.version}</version> - <exclusions> - <exclusion> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-stdlib</artifactId> - </exclusion> - <exclusion> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-stdlib-jdk7</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-syntax</artifactId> + <artifactId>arrow-core-extensions</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-instances-core</artifactId> + <artifactId>arrow-extras-data</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-instances-data</artifactId> - <version>${arrow.version}</version> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - <version>${arrow.version}</version> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> + <artifactId>arrow-syntax</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-reactor</artifactId> + <artifactId>arrow-typeclasses</artifactId> <version>${arrow.version}</version> </dependency> <dependency> @@ -596,7 +567,7 @@ <dependency> <groupId>com.nhaarman.mockitokotlin2</groupId> <artifactId>mockito-kotlin</artifactId> - <version>2.0.0</version> + <version>2.1.0</version> <scope>test</scope> </dependency> <dependency> diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml index 078a3cb5..a2ab34fa 100644 --- a/sources/hv-collector-commandline/pom.xml +++ b/sources/hv-collector-commandline/pom.xml @@ -37,20 +37,11 @@ <artifactId>commons-cli</artifactId> </dependency> <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-reflect</artifactId> - </dependency> - <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.jetbrains.spek</groupId> <artifactId>spek-api</artifactId> <scope>test</scope> diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt index d8c83ea0..4656d46b 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt @@ -36,7 +36,7 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) { Try { parseArgumentsArray(args) } .toEither() .mapLeft { WrongArgumentError(it, cmdLineOptionsList) } - .map(this::getConfiguration) + .map(::getConfiguration) .flatMap { it.toEither { WrongArgumentError( diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt index f3749b35..3e4814f8 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.commandline -import arrow.core.Option import org.apache.commons.cli.HelpFormatter import org.apache.commons.cli.Options @@ -53,17 +52,15 @@ data class WrongArgumentError( private fun getOptions() = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption) companion object { - fun generateRequiredParametersNote(cmdLineOptionsList: List<CommandLineOption>): String { - val requiredParams = Option.fromNullable(cmdLineOptionsList.filter { it.required } - .takeUnless { it.isEmpty() }) - return requiredParams.fold( - { "" }, - { - it.map { commandLineOption -> commandLineOption.option.opt } + fun generateRequiredParametersNote(cmdLineOptionsList: List<CommandLineOption>): String = + cmdLineOptionsList.filter { it.required }.let { requiredParams -> + if (requiredParams.isEmpty()) + "" + else + requiredParams.map { commandLineOption -> commandLineOption.option.opt } .joinToString(prefix = "Required parameters: ", separator = ", ") - } - ) - } + } + } } diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt index 48cac69a..6d8ba3ff 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt @@ -21,10 +21,10 @@ package org.onap.dcae.collectors.veshv.commandline import arrow.core.Option import arrow.core.getOrElse -import arrow.effects.IO import arrow.syntax.function.curried import org.apache.commons.cli.CommandLine -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain /** @@ -34,10 +34,11 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried() -fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO<Unit> = IO { +fun handleWrongArgumentError(programName: String, err: WrongArgumentError): ExitCode { err.printMessage() err.printHelp(programName) -}.flatMap { ExitFailure(2).io() } + return ExitFailure(2) +} fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = longValue(cmdLineOpt).getOrElse { default } diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index e15592f3..e1e35d8b 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -71,11 +71,6 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-health-check</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-utils</artifactId> <version>${project.parent.version}</version> </dependency> @@ -90,24 +85,19 @@ <version>${project.parent.version}</version> <scope>test</scope> </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-reflect</artifactId> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-core-data</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-core</artifactId> + <artifactId>arrow-extras-data</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> - <groupId>io.projectreactor.addons</groupId> - <artifactId>reactor-extra</artifactId> - </dependency> - <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> @@ -115,10 +105,6 @@ <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> - <dependency> - <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> - <artifactId>cbs-client</artifactId> - </dependency> </dependencies> </project> diff --git a/sources/hv-collector-ct/pom.xml b/sources/hv-collector-ct/pom.xml index 86103d0e..c8461973 100644 --- a/sources/hv-collector-ct/pom.xml +++ b/sources/hv-collector-ct/pom.xml @@ -67,7 +67,7 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-xnf-simulator</artifactId> + <artifactId>hv-collector-ves-message-generator</artifactId> <version>${project.parent.version}</version> </dependency> <dependency> diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index f1b1ba2d..a32d4454 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml index 832a5785..8bedc886 100644 --- a/sources/hv-collector-dcae-app-simulator/pom.xml +++ b/sources/hv-collector-dcae-app-simulator/pom.xml @@ -77,11 +77,6 @@ <dependencies> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-commandline</artifactId> <version>${project.parent.version}</version> </dependency> @@ -97,30 +92,10 @@ <scope>test</scope> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-reactor</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-syntax</artifactId> - </dependency> - <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java-util</artifactId> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <optional>true</optional> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 28866f36..93c12d25 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -22,9 +22,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono import java.io.InputStream -import java.lang.IllegalArgumentException import java.util.concurrent.atomic.AtomicReference /** diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 5d2977e4..f3fd56bb 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -19,11 +19,15 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.http.* +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Response +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendOrError import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono import reactor.netty.http.server.HttpServer @@ -50,14 +54,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { ) } - fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> = - IO { + fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> = + Mono.defer { simulator.listenToTopics(kafkaTopics) HttpServer.create() .host(socketAddress.hostName) .port(socketAddress.port) .route(::setRoutes) - .let { NettyServerHandle(it.bindNow()) } + .bind() + .map { NettyServerHandle(it) } } private fun setRoutes(route: HttpServerRoutes) { @@ -66,7 +71,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { req .receive().aggregate().asString() .flatMap { - res.sendOrError{ simulator.listenToTopics(it) } + res.sendOrError { simulator.listenToTopics(it) } } } .delete("/messages") { _, res -> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 4ad92712..7f4e62bb 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp -import arrow.effects.IO import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator @@ -27,35 +26,29 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValid import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" private val logger = Logger(PACKAGE_NAME) const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" -fun main(args: Array<String>) = +fun main(args: Array<String>): Unit = ArgDcaeAppSimConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startApp) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { - logger.info { "Started DCAE-APP Simulator API server" } - } - ) + .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp) + .let(ExitCode::doExit) -private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - logger.info { "Using configuration: $config" } + +private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess { + logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" } val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) - return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) + DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiAddress, config.kafkaTopics) .flatMap { it.await() } + .block() + return ExitSuccess } diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml index 63fee2d9..40e7c936 100644 --- a/sources/hv-collector-domain/pom.xml +++ b/sources/hv-collector-domain/pom.xml @@ -71,7 +71,7 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-core</artifactId> + <artifactId>arrow-core-data</artifactId> </dependency> <dependency> <groupId>org.assertj</groupId> diff --git a/sources/hv-collector-health-check/pom.xml b/sources/hv-collector-health-check/pom.xml index 68915939..90ec958d 100644 --- a/sources/hv-collector-health-check/pom.xml +++ b/sources/hv-collector-health-check/pom.xml @@ -49,15 +49,10 @@ <artifactId>reactor-netty</artifactId> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <scope>test</scope> </dependency> - <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-test</artifactId> diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index fb5bb9a2..cff8160c 100644 --- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.factory -import arrow.effects.IO import io.netty.handler.codec.http.HttpResponseStatus import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -45,9 +44,9 @@ class HealthCheckApiServer(private val healthState: HealthState, private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING) - fun start(): IO<ServerHandle> = IO { + fun start(): Mono<ServerHandle> = Mono.defer { healthState().subscribe(healthDescription::set) - val ctx = HttpServer.create() + HttpServer.create() .tcpConfiguration { it.addressSupplier { listenAddress } .doOnUnbound { logClose() } @@ -57,7 +56,9 @@ class HealthCheckApiServer(private val healthState: HealthState, routes.get("/health/alive", ::livenessHandler) routes.get("/monitoring/prometheus", ::monitoringHandler) } - NettyServerHandle(ctx.bindNow()) + .bind() + .map { NettyServerHandle(it) } + } private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) = diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index 57f21a66..3fe8932f 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -93,7 +93,7 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-core</artifactId> + <artifactId>arrow-core-data</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 8b0a38bb..dfb388d8 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -50,7 +50,7 @@ fun main(args: Array<String>) { } } - HealthCheckServer.start(configurationModule.healthCheckPort(args)) + HealthCheckServer.start(configurationModule.healthCheckPort(args)).block() configurationModule .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc) .publishOn(Schedulers.single(Schedulers.elastic())) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index 9b58dcc9..c970e5c8 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.net.InetSocketAddress @@ -39,8 +38,7 @@ object HealthCheckServer { fun start(port: Int) = createHealthCheckServer(port) .start() - .then(::logServerStarted) - .unsafeRunSync() + .doOnSuccess(::logServerStarted) private fun createHealthCheckServer(listenPort: Int) = HealthCheckApiServer( diff --git a/sources/hv-collector-ssl/pom.xml b/sources/hv-collector-ssl/pom.xml index a4bc7c7f..0ba609e5 100644 --- a/sources/hv-collector-ssl/pom.xml +++ b/sources/hv-collector-ssl/pom.xml @@ -78,11 +78,7 @@ <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-core</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-syntax</artifactId> + <artifactId>arrow-core-data</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml index e85b8ee4..5053cf00 100644 --- a/sources/hv-collector-utils/pom.xml +++ b/sources/hv-collector-utils/pom.xml @@ -65,26 +65,22 @@ <artifactId>kotlin-reflect</artifactId> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-instances-data</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> + <artifactId>arrow-typeclasses</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> + <artifactId>arrow-core-extensions</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-syntax</artifactId> </dependency> <dependency> - <groupId>org.jetbrains.kotlinx</groupId> - <artifactId>kotlinx-coroutines-core</artifactId> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <optional>true</optional> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index 47b3d559..cfed7f32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -23,16 +23,11 @@ import arrow.core.Either import arrow.core.ForOption import arrow.core.Option import arrow.core.Try +import arrow.core.extensions.option.monad.monad import arrow.core.fix import arrow.core.identity -import arrow.effects.ForIO -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monad.monad -import arrow.instances.option.monad.monad import arrow.syntax.collections.firstOption import arrow.typeclasses.MonadContinuation -import arrow.typeclasses.binding import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.util.concurrent.atomic.AtomicReference @@ -47,11 +42,6 @@ object OptionUtils { : Option<A> = Option.monad().binding(c).fix() } -object IOUtils { - fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A) - : IO<A> = IO.monad().binding(c).fix() -} - fun <A> Either<A, A>.flatten() = fold(::identity, ::identity) fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt index 9023528e..ac39100d 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt @@ -28,7 +28,8 @@ sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<Str object Entry : Marker(ENTRY) object Exit : Marker(EXIT) - class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) { + class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : + Marker(INVOKE, mdc(id, timestamp)) { companion object { private fun mdc(id: UUID, timestamp: Instant) = mapOf( OnapMdc.INVOCATION_ID to id.toString(), diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt index 56825221..58859462 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt @@ -17,16 +17,8 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.utils.arrow +package org.onap.dcae.collectors.veshv.utils.process -import arrow.core.Either -import arrow.core.Left -import arrow.core.Right -import arrow.effects.IO -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.toMono import kotlin.system.exitProcess /** @@ -37,9 +29,8 @@ import kotlin.system.exitProcess sealed class ExitCode { abstract val code: Int - fun io() = IO { - exitProcess(code) - } + fun doExit(): Nothing = exitProcess(code) + } object ExitSuccess : ExitCode() { @@ -47,33 +38,3 @@ object ExitSuccess : ExitCode() { } data class ExitFailure(override val code: Int) : ExitCode() - -inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = - flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) - -fun IO<Any>.unit() = map { Unit } - -fun <T> Mono<T>.asIo() = IO.async<T> { callback -> - subscribe({ - callback(Right(it)) - }, { - callback(Left(it)) - }) -} - -fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> = - toMono().then(Mono.fromCallable(callback)) - -fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = - flatMap { io -> - io.attempt().unsafeRunSync().fold( - { Flux.error<T>(it) }, - { Flux.just<T>(it) } - ) - } - -inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> = - map { - block(it) - it - } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt new file mode 100644 index 00000000..ceccbcba --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +package org.onap.dcae.collectors.veshv.utils.rx + +import org.reactivestreams.Publisher +import reactor.core.publisher.Mono +import reactor.core.publisher.toMono + +fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> = + toMono().then(Mono.fromCallable(callback)) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 670ab4ac..728d62bb 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono import reactor.netty.DisposableServer @@ -29,7 +28,7 @@ import reactor.netty.DisposableServer * @since August 2018 */ abstract class ServerHandle(val host: String, val port: Int) : Closeable { - abstract fun await(): IO<Unit> + abstract fun await(): Mono<Void> } /** @@ -58,8 +57,10 @@ class NettyServerHandle(private val ctx: DisposableServer, } } - override fun await() = IO<Unit> { - ctx.channel().closeFuture().sync() + override fun await(): Mono<Void> = Mono.create { callback -> + ctx.channel().closeFuture().addListener { + callback.success() + } } companion object { diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml index c17d29f6..a9ac0bc8 100644 --- a/sources/hv-collector-xnf-simulator/pom.xml +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -78,11 +78,6 @@ <dependencies> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-ssl</artifactId> <version>${project.parent.version}</version> </dependency> @@ -112,18 +107,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlinx</groupId> - <artifactId>kotlinx-coroutines-core</artifactId> - </dependency> - <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> @@ -136,18 +119,6 @@ <artifactId>logback-classic</artifactId> <scope>runtime</scope> </dependency> - <!-- See comment in main pom - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-transport-native-epoll</artifactId> - <classifier>${os.detected.classifier}</classifier> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-tcnative-boringssl-static</artifactId> - <classifier>${os.detected.classifier}</classifier> - </dependency> - --> <dependency> <groupId>org.glassfish</groupId> <artifactId>javax.json</artifactId> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 93c43173..49d6a470 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.core.Either import arrow.core.Some import arrow.core.Try +import arrow.core.extensions.either.monad.monad import arrow.core.fix -import arrow.effects.IO -import arrow.instances.either.monad.monad -import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError @@ -60,7 +57,7 @@ class XnfSimulator( private val defaultHvVesClient by lazy { clientFactory.create() } - fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = + fun startSimulation(messageParameters: InputStream): Either<ParsingError, Mono<Void>> = Either.monad<ParsingError>().binding { val json = parseJsonArray(messageParameters).bind() val parameters = messageParametersParser.parse(json).bind() @@ -73,7 +70,7 @@ class XnfSimulator( .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = + private fun simulationFrom(parameters: List<MessageParameters>): Mono<Void> = parameters .map(::asClientToMessages) .groupMessagesByClients() @@ -81,8 +78,7 @@ class XnfSimulator( .toList() .toFlux() .map(::simulate) - .then(Mono.just(Unit)) - .asIo() + .then() private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() = groupBy({ it.first }, { it.second }) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt index 19579431..e50f1e7a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -19,14 +19,13 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters -import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.rx.then import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index fb2c532f..e68dd443 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters import arrow.core.Either -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -48,13 +47,14 @@ internal class XnfApiServer( private val xnfSimulator: XnfSimulator, private val ongoingSimulations: OngoingSimulations) { - fun start(socketAddress: InetSocketAddress): IO<ServerHandle> = IO { - HttpServer.create() - .host(socketAddress.hostName) - .port(socketAddress.port) - .route(::setRoutes) - .let { NettyServerHandle(it.bindNow()) } - } + fun start(socketAddress: InetSocketAddress): Mono<ServerHandle> = + HttpServer.create() + .host(socketAddress.hostName) + .port(socketAddress.port) + .route(::setRoutes) + .bind() + .map { NettyServerHandle(it) } + private fun setRoutes(route: HttpServerRoutes) { route diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt index 5e1c979c..a2501ebb 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono internal class XnfHealthCheckServer { fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config) .start() - .map { logger.info(serverStartedMessage(it)); it } + .doOnNext { logger.info(serverStartedMessage(it)) } private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer { val monitoring = object : PrometheusMetricsProvider { diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index fb71b2cd..3f43ebe0 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -19,44 +19,43 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl -import arrow.effects.IO -import kotlinx.coroutines.asCoroutineDispatcher import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers import java.util.* import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executor -import java.util.concurrent.Executors /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(), +class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(), private val healthState: HealthState = HealthState.INSTANCE) { - private val asyncSimulationContext = executor.asCoroutineDispatcher() private val simulations = ConcurrentHashMap<UUID, Status>() - fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { + fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID { val id = UUID.randomUUID() simulations[id] = StatusOngoing updateHealthState() - simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> - result.fold( - { err -> - logger.withWarn { log("Error", err) } - simulations[id] = StatusFailure(err) - }, - { - logger.info { "Finished sending messages" } - simulations[id] = StatusSuccess - } - ).also { updateHealthState() } - } + simulationIo + .publishOn(scheduler) + .doOnSuccess { + logger.info { "Finished sending messages" } + simulations[id] = StatusSuccess + } + .doOnError { err -> + logger.withWarn { log("Error", err) } + simulations[id] = StatusFailure(err) + } + .doFinally { updateHealthState() } + .subscribe() + return id } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index a1042f3e..4fcb1809 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,8 +19,8 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf -import arrow.effects.IO import io.vavr.collection.HashSet +import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -32,9 +32,8 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -46,36 +45,29 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { - logger.info { "Stopping xNF Simulator API server" } - } - ) +fun main(args: Array<String>): Unit = + ArgXnfSimulatorConfiguration().parse(args) + .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startServers) + .let(ExitCode::doExit) -private fun startServers(config: SimulatorConfiguration): IO<Unit> = - binding { - logger.info { "Using configuration: $config" } +private fun startServers(config: SimulatorConfiguration): ExitCode { + logger.info { "Using configuration: $config" } - XnfHealthCheckServer().startServer(config).bind() + XnfHealthCheckServer().startServer(config).block() - val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) - val xnfSimulator = XnfSimulator( - ClientFactory(clientConfig), - MessageGeneratorFactory(config.maxPayloadSizeBytes) - ) - val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) - .start(config.listenAddress).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) + val xnfSimulator = XnfSimulator( + ClientFactory(clientConfig), + MessageGeneratorFactory(config.maxPayloadSizeBytes) + ) + val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) + .start(config.listenAddress) + .block() - logger.info { "Started xNF Simulator API server" } - HealthState.INSTANCE.changeState(HealthDescription.IDLE) + logger.info { "Started xNF Simulator API server" } + HealthState.INSTANCE.changeState(HealthDescription.IDLE) - xnfApiServerHandler.await().bind() - } + xnfApiServerHandler.await().block() + return ExitSuccess +} diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt index 113c3c42..325d3bb5 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -32,16 +31,17 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import java.util.* -import java.util.concurrent.Executors /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since September 2018 */ internal class OngoingSimulationsTest : Spek({ - val executor = Executors.newSingleThreadExecutor() - val cut = OngoingSimulations(executor) + val scheduler = Schedulers.single() + val cut = OngoingSimulations(scheduler) describe("simulations repository") { given("not existing task task id") { @@ -121,19 +121,22 @@ internal class OngoingSimulationsTest : Spek({ } afterGroup { - executor.shutdown() + scheduler.dispose() } } afterEachTest { cut.clear() } }) -private fun neverendingTask() = IO.async<Unit> { } +private fun neverendingTask() = Mono.never<Void>() -private fun succesfulTask(): IO<Unit> = IO { println("great success!") } +private fun succesfulTask(): Mono<Void> = Mono.empty<Void>() + .doOnSuccess { + println("great success") + } -private fun failingTask(): Pair<RuntimeException, IO<Unit>> { +private fun failingTask(): Pair<RuntimeException, Mono<Void>> { val cause = RuntimeException("facepalm") - val task = IO.raiseError<Unit>(cause) + val task = Mono.error<Void>(cause) return Pair(cause, task) } diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 29281cdc..ea0628c1 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -126,7 +126,7 @@ internal class XnfSimulatorTest : Spek({ whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit)) // when - cut.startSimulation(json).map { it.unsafeRunSync() } + cut.startSimulation(json).map { it.block() } // then verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF)) |