summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt12
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt3
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt16
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt37
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt54
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt21
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt2
8 files changed, 68 insertions, 79 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 93c43173..49d6a470 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.core.Either
import arrow.core.Some
import arrow.core.Try
+import arrow.core.extensions.either.monad.monad
import arrow.core.fix
-import arrow.effects.IO
-import arrow.instances.either.monad.monad
-import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
@@ -60,7 +57,7 @@ class XnfSimulator(
private val defaultHvVesClient by lazy { clientFactory.create() }
- fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+ fun startSimulation(messageParameters: InputStream): Either<ParsingError, Mono<Void>> =
Either.monad<ParsingError>().binding {
val json = parseJsonArray(messageParameters).bind()
val parameters = messageParametersParser.parse(json).bind()
@@ -73,7 +70,7 @@ class XnfSimulator(
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+ private fun simulationFrom(parameters: List<MessageParameters>): Mono<Void> =
parameters
.map(::asClientToMessages)
.groupMessagesByClients()
@@ -81,8 +78,7 @@ class XnfSimulator(
.toList()
.toFlux()
.map(::simulate)
- .then(Mono.just(Unit))
- .asIo()
+ .then()
private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
groupBy({ it.first }, { it.second })
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
index 19579431..e50f1e7a 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -19,14 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
-import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.rx.then
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
index fb2c532f..e68dd443 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
import arrow.core.Either
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -48,13 +47,14 @@ internal class XnfApiServer(
private val xnfSimulator: XnfSimulator,
private val ongoingSimulations: OngoingSimulations) {
- fun start(socketAddress: InetSocketAddress): IO<ServerHandle> = IO {
- HttpServer.create()
- .host(socketAddress.hostName)
- .port(socketAddress.port)
- .route(::setRoutes)
- .let { NettyServerHandle(it.bindNow()) }
- }
+ fun start(socketAddress: InetSocketAddress): Mono<ServerHandle> =
+ HttpServer.create()
+ .host(socketAddress.hostName)
+ .port(socketAddress.port)
+ .route(::setRoutes)
+ .bind()
+ .map { NettyServerHandle(it) }
+
private fun setRoutes(route: HttpServerRoutes) {
route
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
index 5e1c979c..a2501ebb 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
@@ -32,7 +32,7 @@ import reactor.core.publisher.Mono
internal class XnfHealthCheckServer {
fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config)
.start()
- .map { logger.info(serverStartedMessage(it)); it }
+ .doOnNext { logger.info(serverStartedMessage(it)) }
private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer {
val monitoring = object : PrometheusMetricsProvider {
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
index fb71b2cd..3f43ebe0 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -19,44 +19,43 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-import arrow.effects.IO
-import kotlinx.coroutines.asCoroutineDispatcher
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Scheduler
+import reactor.core.scheduler.Schedulers
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Executor
-import java.util.concurrent.Executors
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
private val healthState: HealthState = HealthState.INSTANCE) {
- private val asyncSimulationContext = executor.asCoroutineDispatcher()
private val simulations = ConcurrentHashMap<UUID, Status>()
- fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
val id = UUID.randomUUID()
simulations[id] = StatusOngoing
updateHealthState()
- simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
- result.fold(
- { err ->
- logger.withWarn { log("Error", err) }
- simulations[id] = StatusFailure(err)
- },
- {
- logger.info { "Finished sending messages" }
- simulations[id] = StatusSuccess
- }
- ).also { updateHealthState() }
- }
+ simulationIo
+ .publishOn(scheduler)
+ .doOnSuccess {
+ logger.info { "Finished sending messages" }
+ simulations[id] = StatusSuccess
+ }
+ .doOnError { err ->
+ logger.withWarn { log("Error", err) }
+ simulations[id] = StatusFailure(err)
+ }
+ .doFinally { updateHealthState() }
+ .subscribe()
+
return id
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index a1042f3e..4fcb1809 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,8 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
-import arrow.effects.IO
import io.vavr.collection.HashSet
+import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -32,9 +32,8 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulator
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -46,36 +45,29 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- {
- logger.info { "Stopping xNF Simulator API server" }
- }
- )
+fun main(args: Array<String>): Unit =
+ ArgXnfSimulatorConfiguration().parse(args)
+ .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startServers)
+ .let(ExitCode::doExit)
-private fun startServers(config: SimulatorConfiguration): IO<Unit> =
- binding {
- logger.info { "Using configuration: $config" }
+private fun startServers(config: SimulatorConfiguration): ExitCode {
+ logger.info { "Using configuration: $config" }
- XnfHealthCheckServer().startServer(config).bind()
+ XnfHealthCheckServer().startServer(config).block()
- val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
- val xnfSimulator = XnfSimulator(
- ClientFactory(clientConfig),
- MessageGeneratorFactory(config.maxPayloadSizeBytes)
- )
- val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
- .start(config.listenAddress).bind()
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
+ val xnfSimulator = XnfSimulator(
+ ClientFactory(clientConfig),
+ MessageGeneratorFactory(config.maxPayloadSizeBytes)
+ )
+ val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
+ .start(config.listenAddress)
+ .block()
- logger.info { "Started xNF Simulator API server" }
- HealthState.INSTANCE.changeState(HealthDescription.IDLE)
+ logger.info { "Started xNF Simulator API server" }
+ HealthState.INSTANCE.changeState(HealthDescription.IDLE)
- xnfApiServerHandler.await().bind()
- }
+ xnfApiServerHandler.await().block()
+ return ExitSuccess
+}
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
index 113c3c42..325d3bb5 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -32,16 +31,17 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
import java.util.*
-import java.util.concurrent.Executors
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since September 2018
*/
internal class OngoingSimulationsTest : Spek({
- val executor = Executors.newSingleThreadExecutor()
- val cut = OngoingSimulations(executor)
+ val scheduler = Schedulers.single()
+ val cut = OngoingSimulations(scheduler)
describe("simulations repository") {
given("not existing task task id") {
@@ -121,19 +121,22 @@ internal class OngoingSimulationsTest : Spek({
}
afterGroup {
- executor.shutdown()
+ scheduler.dispose()
}
}
afterEachTest { cut.clear() }
})
-private fun neverendingTask() = IO.async<Unit> { }
+private fun neverendingTask() = Mono.never<Void>()
-private fun succesfulTask(): IO<Unit> = IO { println("great success!") }
+private fun succesfulTask(): Mono<Void> = Mono.empty<Void>()
+ .doOnSuccess {
+ println("great success")
+ }
-private fun failingTask(): Pair<RuntimeException, IO<Unit>> {
+private fun failingTask(): Pair<RuntimeException, Mono<Void>> {
val cause = RuntimeException("facepalm")
- val task = IO.raiseError<Unit>(cause)
+ val task = Mono.error<Void>(cause)
return Pair(cause, task)
}
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 29281cdc..ea0628c1 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -126,7 +126,7 @@ internal class XnfSimulatorTest : Spek({
whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
// when
- cut.startSimulation(json).map { it.unsafeRunSync() }
+ cut.startSimulation(json).map { it.block() }
// then
verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))