diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-03 15:07:22 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-04 12:57:15 +0200 |
commit | c775e8677cdbf69f2b1c1390d225329c658c0ee2 (patch) | |
tree | 707456ffe0817ed2d29d9a62ccf98b9d267283ba /sources/hv-collector-xnf-simulator/src | |
parent | c7a3e0738abf581640059587dbb81790339340c9 (diff) |
Get rid of arrow-effects usage
Also clean-up dependencies + use Kotlin BOM to force single
kotlin-stdlib on classpath.
Issue-ID: DCAEGEN2-1392
Change-Id: I447c4686707de81f35f7734255ce0b13c997c4a4
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src')
8 files changed, 68 insertions, 79 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 93c43173..49d6a470 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.core.Either import arrow.core.Some import arrow.core.Try +import arrow.core.extensions.either.monad.monad import arrow.core.fix -import arrow.effects.IO -import arrow.instances.either.monad.monad -import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError @@ -60,7 +57,7 @@ class XnfSimulator( private val defaultHvVesClient by lazy { clientFactory.create() } - fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = + fun startSimulation(messageParameters: InputStream): Either<ParsingError, Mono<Void>> = Either.monad<ParsingError>().binding { val json = parseJsonArray(messageParameters).bind() val parameters = messageParametersParser.parse(json).bind() @@ -73,7 +70,7 @@ class XnfSimulator( .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = + private fun simulationFrom(parameters: List<MessageParameters>): Mono<Void> = parameters .map(::asClientToMessages) .groupMessagesByClients() @@ -81,8 +78,7 @@ class XnfSimulator( .toList() .toFlux() .map(::simulate) - .then(Mono.just(Unit)) - .asIo() + .then() private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() = groupBy({ it.first }, { it.second }) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt index 19579431..e50f1e7a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -19,14 +19,13 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters -import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.rx.then import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index fb2c532f..e68dd443 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters import arrow.core.Either -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -48,13 +47,14 @@ internal class XnfApiServer( private val xnfSimulator: XnfSimulator, private val ongoingSimulations: OngoingSimulations) { - fun start(socketAddress: InetSocketAddress): IO<ServerHandle> = IO { - HttpServer.create() - .host(socketAddress.hostName) - .port(socketAddress.port) - .route(::setRoutes) - .let { NettyServerHandle(it.bindNow()) } - } + fun start(socketAddress: InetSocketAddress): Mono<ServerHandle> = + HttpServer.create() + .host(socketAddress.hostName) + .port(socketAddress.port) + .route(::setRoutes) + .bind() + .map { NettyServerHandle(it) } + private fun setRoutes(route: HttpServerRoutes) { route diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt index 5e1c979c..a2501ebb 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono internal class XnfHealthCheckServer { fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config) .start() - .map { logger.info(serverStartedMessage(it)); it } + .doOnNext { logger.info(serverStartedMessage(it)) } private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer { val monitoring = object : PrometheusMetricsProvider { diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index fb71b2cd..3f43ebe0 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -19,44 +19,43 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl -import arrow.effects.IO -import kotlinx.coroutines.asCoroutineDispatcher import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers import java.util.* import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executor -import java.util.concurrent.Executors /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(), +class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(), private val healthState: HealthState = HealthState.INSTANCE) { - private val asyncSimulationContext = executor.asCoroutineDispatcher() private val simulations = ConcurrentHashMap<UUID, Status>() - fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { + fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID { val id = UUID.randomUUID() simulations[id] = StatusOngoing updateHealthState() - simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> - result.fold( - { err -> - logger.withWarn { log("Error", err) } - simulations[id] = StatusFailure(err) - }, - { - logger.info { "Finished sending messages" } - simulations[id] = StatusSuccess - } - ).also { updateHealthState() } - } + simulationIo + .publishOn(scheduler) + .doOnSuccess { + logger.info { "Finished sending messages" } + simulations[id] = StatusSuccess + } + .doOnError { err -> + logger.withWarn { log("Error", err) } + simulations[id] = StatusFailure(err) + } + .doFinally { updateHealthState() } + .subscribe() + return id } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index a1042f3e..4fcb1809 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,8 +19,8 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf -import arrow.effects.IO import io.vavr.collection.HashSet +import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -32,9 +32,8 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -46,36 +45,29 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { - logger.info { "Stopping xNF Simulator API server" } - } - ) +fun main(args: Array<String>): Unit = + ArgXnfSimulatorConfiguration().parse(args) + .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startServers) + .let(ExitCode::doExit) -private fun startServers(config: SimulatorConfiguration): IO<Unit> = - binding { - logger.info { "Using configuration: $config" } +private fun startServers(config: SimulatorConfiguration): ExitCode { + logger.info { "Using configuration: $config" } - XnfHealthCheckServer().startServer(config).bind() + XnfHealthCheckServer().startServer(config).block() - val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) - val xnfSimulator = XnfSimulator( - ClientFactory(clientConfig), - MessageGeneratorFactory(config.maxPayloadSizeBytes) - ) - val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) - .start(config.listenAddress).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) + val xnfSimulator = XnfSimulator( + ClientFactory(clientConfig), + MessageGeneratorFactory(config.maxPayloadSizeBytes) + ) + val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) + .start(config.listenAddress) + .block() - logger.info { "Started xNF Simulator API server" } - HealthState.INSTANCE.changeState(HealthDescription.IDLE) + logger.info { "Started xNF Simulator API server" } + HealthState.INSTANCE.changeState(HealthDescription.IDLE) - xnfApiServerHandler.await().bind() - } + xnfApiServerHandler.await().block() + return ExitSuccess +} diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt index 113c3c42..325d3bb5 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -32,16 +31,17 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import java.util.* -import java.util.concurrent.Executors /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since September 2018 */ internal class OngoingSimulationsTest : Spek({ - val executor = Executors.newSingleThreadExecutor() - val cut = OngoingSimulations(executor) + val scheduler = Schedulers.single() + val cut = OngoingSimulations(scheduler) describe("simulations repository") { given("not existing task task id") { @@ -121,19 +121,22 @@ internal class OngoingSimulationsTest : Spek({ } afterGroup { - executor.shutdown() + scheduler.dispose() } } afterEachTest { cut.clear() } }) -private fun neverendingTask() = IO.async<Unit> { } +private fun neverendingTask() = Mono.never<Void>() -private fun succesfulTask(): IO<Unit> = IO { println("great success!") } +private fun succesfulTask(): Mono<Void> = Mono.empty<Void>() + .doOnSuccess { + println("great success") + } -private fun failingTask(): Pair<RuntimeException, IO<Unit>> { +private fun failingTask(): Pair<RuntimeException, Mono<Void>> { val cause = RuntimeException("facepalm") - val task = IO.raiseError<Unit>(cause) + val task = Mono.error<Void>(cause) return Pair(cause, task) } diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 29281cdc..ea0628c1 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -126,7 +126,7 @@ internal class XnfSimulatorTest : Spek({ whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit)) // when - cut.startSimulation(json).map { it.unsafeRunSync() } + cut.startSimulation(json).map { it.block() } // then verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF)) |