From c775e8677cdbf69f2b1c1390d225329c658c0ee2 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 3 Apr 2019 15:07:22 +0200 Subject: Get rid of arrow-effects usage Also clean-up dependencies + use Kotlin BOM to force single kotlin-stdlib on classpath. Issue-ID: DCAEGEN2-1392 Change-Id: I447c4686707de81f35f7734255ce0b13c997c4a4 Signed-off-by: Piotr Jaszczyk --- .../veshv/simulators/xnf/impl/XnfSimulator.kt | 12 ++--- .../simulators/xnf/impl/adapters/HvVesClient.kt | 3 +- .../simulators/xnf/impl/adapters/XnfApiServer.kt | 16 +++---- .../xnf/impl/adapters/XnfHealthCheckServer.kt | 2 +- .../veshv/simulators/xnf/impl/simulations.kt | 37 ++++++++------- .../dcae/collectors/veshv/simulators/xnf/main.kt | 54 +++++++++------------- .../veshv/main/OngoingSimulationsTest.kt | 21 +++++---- .../dcae/collectors/veshv/main/XnfSimulatorTest.kt | 2 +- 8 files changed, 68 insertions(+), 79 deletions(-) (limited to 'sources/hv-collector-xnf-simulator/src') diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 93c43173..49d6a470 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.core.Either import arrow.core.Some import arrow.core.Try +import arrow.core.extensions.either.monad.monad import arrow.core.fix -import arrow.effects.IO -import arrow.instances.either.monad.monad -import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError @@ -60,7 +57,7 @@ class XnfSimulator( private val defaultHvVesClient by lazy { clientFactory.create() } - fun startSimulation(messageParameters: InputStream): Either> = + fun startSimulation(messageParameters: InputStream): Either> = Either.monad().binding { val json = parseJsonArray(messageParameters).bind() val parameters = messageParametersParser.parse(json).bind() @@ -73,7 +70,7 @@ class XnfSimulator( .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List): IO = + private fun simulationFrom(parameters: List): Mono = parameters .map(::asClientToMessages) .groupMessagesByClients() @@ -81,8 +78,7 @@ class XnfSimulator( .toList() .toFlux() .map(::simulate) - .then(Mono.just(Unit)) - .asIo() + .then() private fun List>.groupMessagesByClients() = groupBy({ it.first }, { it.second }) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt index 19579431..e50f1e7a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -19,14 +19,13 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters -import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.rx.then import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong /** * @author Jakub Dudycz diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index fb2c532f..e68dd443 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters import arrow.core.Either -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -48,13 +47,14 @@ internal class XnfApiServer( private val xnfSimulator: XnfSimulator, private val ongoingSimulations: OngoingSimulations) { - fun start(socketAddress: InetSocketAddress): IO = IO { - HttpServer.create() - .host(socketAddress.hostName) - .port(socketAddress.port) - .route(::setRoutes) - .let { NettyServerHandle(it.bindNow()) } - } + fun start(socketAddress: InetSocketAddress): Mono = + HttpServer.create() + .host(socketAddress.hostName) + .port(socketAddress.port) + .route(::setRoutes) + .bind() + .map { NettyServerHandle(it) } + private fun setRoutes(route: HttpServerRoutes) { route diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt index 5e1c979c..a2501ebb 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono internal class XnfHealthCheckServer { fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config) .start() - .map { logger.info(serverStartedMessage(it)); it } + .doOnNext { logger.info(serverStartedMessage(it)) } private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer { val monitoring = object : PrometheusMetricsProvider { diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index fb71b2cd..3f43ebe0 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -19,44 +19,43 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl -import arrow.effects.IO -import kotlinx.coroutines.asCoroutineDispatcher import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers import java.util.* import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executor -import java.util.concurrent.Executors /** * @author Piotr Jaszczyk * @since August 2018 */ -class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(), +class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(), private val healthState: HealthState = HealthState.INSTANCE) { - private val asyncSimulationContext = executor.asCoroutineDispatcher() private val simulations = ConcurrentHashMap() - fun startAsynchronousSimulation(simulationIo: IO): UUID { + fun startAsynchronousSimulation(simulationIo: Mono): UUID { val id = UUID.randomUUID() simulations[id] = StatusOngoing updateHealthState() - simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> - result.fold( - { err -> - logger.withWarn { log("Error", err) } - simulations[id] = StatusFailure(err) - }, - { - logger.info { "Finished sending messages" } - simulations[id] = StatusSuccess - } - ).also { updateHealthState() } - } + simulationIo + .publishOn(scheduler) + .doOnSuccess { + logger.info { "Finished sending messages" } + simulations[id] = StatusSuccess + } + .doOnError { err -> + logger.withWarn { log("Error", err) } + simulations[id] = StatusFailure(err) + } + .doFinally { updateHealthState() } + .subscribe() + return id } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index a1042f3e..4fcb1809 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,8 +19,8 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf -import arrow.effects.IO import io.vavr.collection.HashSet +import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -32,9 +32,8 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -46,36 +45,29 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" * @author Jakub Dudycz * @since June 2018 */ -fun main(args: Array) = ArgXnfSimulatorConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { - logger.info { "Stopping xNF Simulator API server" } - } - ) +fun main(args: Array): Unit = + ArgXnfSimulatorConfiguration().parse(args) + .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startServers) + .let(ExitCode::doExit) -private fun startServers(config: SimulatorConfiguration): IO = - binding { - logger.info { "Using configuration: $config" } +private fun startServers(config: SimulatorConfiguration): ExitCode { + logger.info { "Using configuration: $config" } - XnfHealthCheckServer().startServer(config).bind() + XnfHealthCheckServer().startServer(config).block() - val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) - val xnfSimulator = XnfSimulator( - ClientFactory(clientConfig), - MessageGeneratorFactory(config.maxPayloadSizeBytes) - ) - val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) - .start(config.listenAddress).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) + val xnfSimulator = XnfSimulator( + ClientFactory(clientConfig), + MessageGeneratorFactory(config.maxPayloadSizeBytes) + ) + val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) + .start(config.listenAddress) + .block() - logger.info { "Started xNF Simulator API server" } - HealthState.INSTANCE.changeState(HealthDescription.IDLE) + logger.info { "Started xNF Simulator API server" } + HealthState.INSTANCE.changeState(HealthDescription.IDLE) - xnfApiServerHandler.await().bind() - } + xnfApiServerHandler.await().block() + return ExitSuccess +} diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt index 113c3c42..325d3bb5 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -32,16 +31,17 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import java.util.* -import java.util.concurrent.Executors /** * @author Piotr Jaszczyk * @since September 2018 */ internal class OngoingSimulationsTest : Spek({ - val executor = Executors.newSingleThreadExecutor() - val cut = OngoingSimulations(executor) + val scheduler = Schedulers.single() + val cut = OngoingSimulations(scheduler) describe("simulations repository") { given("not existing task task id") { @@ -121,19 +121,22 @@ internal class OngoingSimulationsTest : Spek({ } afterGroup { - executor.shutdown() + scheduler.dispose() } } afterEachTest { cut.clear() } }) -private fun neverendingTask() = IO.async { } +private fun neverendingTask() = Mono.never() -private fun succesfulTask(): IO = IO { println("great success!") } +private fun succesfulTask(): Mono = Mono.empty() + .doOnSuccess { + println("great success") + } -private fun failingTask(): Pair> { +private fun failingTask(): Pair> { val cause = RuntimeException("facepalm") - val task = IO.raiseError(cause) + val task = Mono.error(cause) return Pair(cause, task) } diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 29281cdc..ea0628c1 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -126,7 +126,7 @@ internal class XnfSimulatorTest : Spek({ whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit)) // when - cut.startSimulation(json).map { it.unsafeRunSync() } + cut.startSimulation(json).map { it.block() } // then verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF)) -- cgit 1.2.3-korg