summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-commandline/pom.xml9
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt2
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt19
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt9
-rw-r--r--sources/hv-collector-core/pom.xml20
-rw-r--r--sources/hv-collector-ct/pom.xml2
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt1
-rw-r--r--sources/hv-collector-dcae-app-simulator/pom.xml25
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt2
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt17
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt29
-rw-r--r--sources/hv-collector-domain/pom.xml2
-rw-r--r--sources/hv-collector-health-check/pom.xml5
-rw-r--r--sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt9
-rw-r--r--sources/hv-collector-main/pom.xml2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt4
-rw-r--r--sources/hv-collector-ssl/pom.xml6
-rw-r--r--sources/hv-collector-utils/pom.xml12
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt12
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt3
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt (renamed from sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt)45
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt33
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt9
-rw-r--r--sources/hv-collector-xnf-simulator/pom.xml29
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt12
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt3
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt16
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt37
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt54
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt21
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt2
33 files changed, 166 insertions, 289 deletions
diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml
index 078a3cb5..a2ab34fa 100644
--- a/sources/hv-collector-commandline/pom.xml
+++ b/sources/hv-collector-commandline/pom.xml
@@ -37,20 +37,11 @@
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-reflect</artifactId>
- </dependency>
- <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.jetbrains.spek</groupId>
<artifactId>spek-api</artifactId>
<scope>test</scope>
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
index d8c83ea0..4656d46b 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
@@ -36,7 +36,7 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
Try { parseArgumentsArray(args) }
.toEither()
.mapLeft { WrongArgumentError(it, cmdLineOptionsList) }
- .map(this::getConfiguration)
+ .map(::getConfiguration)
.flatMap {
it.toEither {
WrongArgumentError(
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt
index f3749b35..3e4814f8 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.commandline
-import arrow.core.Option
import org.apache.commons.cli.HelpFormatter
import org.apache.commons.cli.Options
@@ -53,17 +52,15 @@ data class WrongArgumentError(
private fun getOptions() = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption)
companion object {
- fun generateRequiredParametersNote(cmdLineOptionsList: List<CommandLineOption>): String {
- val requiredParams = Option.fromNullable(cmdLineOptionsList.filter { it.required }
- .takeUnless { it.isEmpty() })
- return requiredParams.fold(
- { "" },
- {
- it.map { commandLineOption -> commandLineOption.option.opt }
+ fun generateRequiredParametersNote(cmdLineOptionsList: List<CommandLineOption>): String =
+ cmdLineOptionsList.filter { it.required }.let { requiredParams ->
+ if (requiredParams.isEmpty())
+ ""
+ else
+ requiredParams.map { commandLineOption -> commandLineOption.option.opt }
.joinToString(prefix = "Required parameters: ", separator = ", ")
- }
- )
- }
+ }
+
}
}
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
index 48cac69a..6d8ba3ff 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
@@ -21,10 +21,10 @@ package org.onap.dcae.collectors.veshv.commandline
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.effects.IO
import arrow.syntax.function.curried
import org.apache.commons.cli.CommandLine
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
/**
@@ -34,10 +34,11 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried()
-fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO<Unit> = IO {
+fun handleWrongArgumentError(programName: String, err: WrongArgumentError): ExitCode {
err.printMessage()
err.printHelp(programName)
-}.flatMap { ExitFailure(2).io() }
+ return ExitFailure(2)
+}
fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
longValue(cmdLineOpt).getOrElse { default }
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index e15592f3..e1e35d8b 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -71,11 +71,6 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-health-check</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-utils</artifactId>
<version>${project.parent.version}</version>
</dependency>
@@ -90,24 +85,19 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-reflect</artifactId>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core-data</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-core</artifactId>
+ <artifactId>arrow-extras-data</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor.addons</groupId>
- <artifactId>reactor-extra</artifactId>
- </dependency>
- <dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
@@ -115,10 +105,6 @@
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
- <dependency>
- <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>cbs-client</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-ct/pom.xml b/sources/hv-collector-ct/pom.xml
index 86103d0e..c8461973 100644
--- a/sources/hv-collector-ct/pom.xml
+++ b/sources/hv-collector-ct/pom.xml
@@ -67,7 +67,7 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-xnf-simulator</artifactId>
+ <artifactId>hv-collector-ves-message-generator</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index f1b1ba2d..a32d4454 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml
index 832a5785..8bedc886 100644
--- a/sources/hv-collector-dcae-app-simulator/pom.xml
+++ b/sources/hv-collector-dcae-app-simulator/pom.xml
@@ -77,11 +77,6 @@
<dependencies>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-commandline</artifactId>
<version>${project.parent.version}</version>
</dependency>
@@ -97,30 +92,10 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-reactor</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-syntax</artifactId>
- </dependency>
- <dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java-util</artifactId>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<optional>true</optional>
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 28866f36..93c12d25 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -22,9 +22,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
import java.io.InputStream
-import java.lang.IllegalArgumentException
import java.util.concurrent.atomic.AtomicReference
/**
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index 5d2977e4..f3fd56bb 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -19,11 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.http.*
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.netty.http.server.HttpServer
@@ -50,14 +54,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
)
}
- fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
- IO {
+ fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
+ Mono.defer {
simulator.listenToTopics(kafkaTopics)
HttpServer.create()
.host(socketAddress.hostName)
.port(socketAddress.port)
.route(::setRoutes)
- .let { NettyServerHandle(it.bindNow()) }
+ .bind()
+ .map { NettyServerHandle(it) }
}
private fun setRoutes(route: HttpServerRoutes) {
@@ -66,7 +71,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
req
.receive().aggregate().asString()
.flatMap {
- res.sendOrError{ simulator.listenToTopics(it) }
+ res.sendOrError { simulator.listenToTopics(it) }
}
}
.delete("/messages") { _, res ->
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 4ad92712..7f4e62bb 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
@@ -27,35 +26,29 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValid
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
private val logger = Logger(PACKAGE_NAME)
const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
-fun main(args: Array<String>) =
+fun main(args: Array<String>): Unit =
ArgDcaeAppSimConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startApp)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- {
- logger.info { "Started DCAE-APP Simulator API server" }
- }
- )
+ .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp)
+ .let(ExitCode::doExit)
-private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- logger.info { "Using configuration: $config" }
+
+private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess {
+ logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" }
val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes)
val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
- return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
+ DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiAddress, config.kafkaTopics)
.flatMap { it.await() }
+ .block()
+ return ExitSuccess
}
diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml
index 63fee2d9..40e7c936 100644
--- a/sources/hv-collector-domain/pom.xml
+++ b/sources/hv-collector-domain/pom.xml
@@ -71,7 +71,7 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-core</artifactId>
+ <artifactId>arrow-core-data</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
diff --git a/sources/hv-collector-health-check/pom.xml b/sources/hv-collector-health-check/pom.xml
index 68915939..90ec958d 100644
--- a/sources/hv-collector-health-check/pom.xml
+++ b/sources/hv-collector-health-check/pom.xml
@@ -49,15 +49,10 @@
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index fb5bb9a2..cff8160c 100644
--- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.factory
-import arrow.effects.IO
import io.netty.handler.codec.http.HttpResponseStatus
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -45,9 +44,9 @@ class HealthCheckApiServer(private val healthState: HealthState,
private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
- fun start(): IO<ServerHandle> = IO {
+ fun start(): Mono<ServerHandle> = Mono.defer {
healthState().subscribe(healthDescription::set)
- val ctx = HttpServer.create()
+ HttpServer.create()
.tcpConfiguration {
it.addressSupplier { listenAddress }
.doOnUnbound { logClose() }
@@ -57,7 +56,9 @@ class HealthCheckApiServer(private val healthState: HealthState,
routes.get("/health/alive", ::livenessHandler)
routes.get("/monitoring/prometheus", ::monitoringHandler)
}
- NettyServerHandle(ctx.bindNow())
+ .bind()
+ .map { NettyServerHandle(it) }
+
}
private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
index 57f21a66..3fe8932f 100644
--- a/sources/hv-collector-main/pom.xml
+++ b/sources/hv-collector-main/pom.xml
@@ -93,7 +93,7 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-core</artifactId>
+ <artifactId>arrow-core-data</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 8b0a38bb..dfb388d8 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -50,7 +50,7 @@ fun main(args: Array<String>) {
}
}
- HealthCheckServer.start(configurationModule.healthCheckPort(args))
+ HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
configurationModule
.hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
.publishOn(Schedulers.single(Schedulers.elastic()))
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
index 9b58dcc9..c970e5c8 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.net.InetSocketAddress
@@ -39,8 +38,7 @@ object HealthCheckServer {
fun start(port: Int) =
createHealthCheckServer(port)
.start()
- .then(::logServerStarted)
- .unsafeRunSync()
+ .doOnSuccess(::logServerStarted)
private fun createHealthCheckServer(listenPort: Int) =
HealthCheckApiServer(
diff --git a/sources/hv-collector-ssl/pom.xml b/sources/hv-collector-ssl/pom.xml
index a4bc7c7f..0ba609e5 100644
--- a/sources/hv-collector-ssl/pom.xml
+++ b/sources/hv-collector-ssl/pom.xml
@@ -78,11 +78,7 @@
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-syntax</artifactId>
+ <artifactId>arrow-core-data</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml
index e85b8ee4..5053cf00 100644
--- a/sources/hv-collector-utils/pom.xml
+++ b/sources/hv-collector-utils/pom.xml
@@ -65,26 +65,22 @@
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-instances-data</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
+ <artifactId>arrow-typeclasses</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
+ <artifactId>arrow-core-extensions</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
- <groupId>org.jetbrains.kotlinx</groupId>
- <artifactId>kotlinx-coroutines-core</artifactId>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<optional>true</optional>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index 47b3d559..cfed7f32 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -23,16 +23,11 @@ import arrow.core.Either
import arrow.core.ForOption
import arrow.core.Option
import arrow.core.Try
+import arrow.core.extensions.option.monad.monad
import arrow.core.fix
import arrow.core.identity
-import arrow.effects.ForIO
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.instances.option.monad.monad
import arrow.syntax.collections.firstOption
import arrow.typeclasses.MonadContinuation
-import arrow.typeclasses.binding
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.util.concurrent.atomic.AtomicReference
@@ -47,11 +42,6 @@ object OptionUtils {
: Option<A> = Option.monad().binding(c).fix()
}
-object IOUtils {
- fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A)
- : IO<A> = IO.monad().binding(c).fix()
-}
-
fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
index 9023528e..ac39100d 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
@@ -28,7 +28,8 @@ sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<Str
object Entry : Marker(ENTRY)
object Exit : Marker(EXIT)
- class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) {
+ class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) :
+ Marker(INVOKE, mdc(id, timestamp)) {
companion object {
private fun mdc(id: UUID, timestamp: Instant) = mapOf(
OnapMdc.INVOCATION_ID to id.toString(),
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt
index 56825221..58859462 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt
@@ -17,16 +17,8 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.utils.arrow
+package org.onap.dcae.collectors.veshv.utils.process
-import arrow.core.Either
-import arrow.core.Left
-import arrow.core.Right
-import arrow.effects.IO
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.toMono
import kotlin.system.exitProcess
/**
@@ -37,9 +29,8 @@ import kotlin.system.exitProcess
sealed class ExitCode {
abstract val code: Int
- fun io() = IO {
- exitProcess(code)
- }
+ fun doExit(): Nothing = exitProcess(code)
+
}
object ExitSuccess : ExitCode() {
@@ -47,33 +38,3 @@ object ExitSuccess : ExitCode() {
}
data class ExitFailure(override val code: Int) : ExitCode()
-
-inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
- flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
-
-fun IO<Any>.unit() = map { Unit }
-
-fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
- subscribe({
- callback(Right(it))
- }, {
- callback(Left(it))
- })
-}
-
-fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
- toMono().then(Mono.fromCallable(callback))
-
-fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
- flatMap { io ->
- io.attempt().unsafeRunSync().fold(
- { Flux.error<T>(it) },
- { Flux.just<T>(it) }
- )
- }
-
-inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> =
- map {
- block(it)
- it
- }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
new file mode 100644
index 00000000..ceccbcba
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+package org.onap.dcae.collectors.veshv.utils.rx
+
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
+
+fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
+ toMono().then(Mono.fromCallable(callback))
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index 670ab4ac..728d62bb 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
@@ -29,7 +28,7 @@ import reactor.netty.DisposableServer
* @since August 2018
*/
abstract class ServerHandle(val host: String, val port: Int) : Closeable {
- abstract fun await(): IO<Unit>
+ abstract fun await(): Mono<Void>
}
/**
@@ -58,8 +57,10 @@ class NettyServerHandle(private val ctx: DisposableServer,
}
}
- override fun await() = IO<Unit> {
- ctx.channel().closeFuture().sync()
+ override fun await(): Mono<Void> = Mono.create { callback ->
+ ctx.channel().closeFuture().addListener {
+ callback.success()
+ }
}
companion object {
diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml
index c17d29f6..a9ac0bc8 100644
--- a/sources/hv-collector-xnf-simulator/pom.xml
+++ b/sources/hv-collector-xnf-simulator/pom.xml
@@ -78,11 +78,6 @@
<dependencies>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-ssl</artifactId>
<version>${project.parent.version}</version>
</dependency>
@@ -112,18 +107,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlinx</groupId>
- <artifactId>kotlinx-coroutines-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
@@ -136,18 +119,6 @@
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
- <!-- See comment in main pom
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport-native-epoll</artifactId>
- <classifier>${os.detected.classifier}</classifier>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-tcnative-boringssl-static</artifactId>
- <classifier>${os.detected.classifier}</classifier>
- </dependency>
- -->
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 93c43173..49d6a470 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.core.Either
import arrow.core.Some
import arrow.core.Try
+import arrow.core.extensions.either.monad.monad
import arrow.core.fix
-import arrow.effects.IO
-import arrow.instances.either.monad.monad
-import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
@@ -60,7 +57,7 @@ class XnfSimulator(
private val defaultHvVesClient by lazy { clientFactory.create() }
- fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+ fun startSimulation(messageParameters: InputStream): Either<ParsingError, Mono<Void>> =
Either.monad<ParsingError>().binding {
val json = parseJsonArray(messageParameters).bind()
val parameters = messageParametersParser.parse(json).bind()
@@ -73,7 +70,7 @@ class XnfSimulator(
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+ private fun simulationFrom(parameters: List<MessageParameters>): Mono<Void> =
parameters
.map(::asClientToMessages)
.groupMessagesByClients()
@@ -81,8 +78,7 @@ class XnfSimulator(
.toList()
.toFlux()
.map(::simulate)
- .then(Mono.just(Unit))
- .asIo()
+ .then()
private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
groupBy({ it.first }, { it.second })
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
index 19579431..e50f1e7a 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -19,14 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
-import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.rx.then
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
index fb2c532f..e68dd443 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
import arrow.core.Either
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -48,13 +47,14 @@ internal class XnfApiServer(
private val xnfSimulator: XnfSimulator,
private val ongoingSimulations: OngoingSimulations) {
- fun start(socketAddress: InetSocketAddress): IO<ServerHandle> = IO {
- HttpServer.create()
- .host(socketAddress.hostName)
- .port(socketAddress.port)
- .route(::setRoutes)
- .let { NettyServerHandle(it.bindNow()) }
- }
+ fun start(socketAddress: InetSocketAddress): Mono<ServerHandle> =
+ HttpServer.create()
+ .host(socketAddress.hostName)
+ .port(socketAddress.port)
+ .route(::setRoutes)
+ .bind()
+ .map { NettyServerHandle(it) }
+
private fun setRoutes(route: HttpServerRoutes) {
route
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
index 5e1c979c..a2501ebb 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
@@ -32,7 +32,7 @@ import reactor.core.publisher.Mono
internal class XnfHealthCheckServer {
fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config)
.start()
- .map { logger.info(serverStartedMessage(it)); it }
+ .doOnNext { logger.info(serverStartedMessage(it)) }
private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer {
val monitoring = object : PrometheusMetricsProvider {
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
index fb71b2cd..3f43ebe0 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -19,44 +19,43 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-import arrow.effects.IO
-import kotlinx.coroutines.asCoroutineDispatcher
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Scheduler
+import reactor.core.scheduler.Schedulers
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Executor
-import java.util.concurrent.Executors
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
private val healthState: HealthState = HealthState.INSTANCE) {
- private val asyncSimulationContext = executor.asCoroutineDispatcher()
private val simulations = ConcurrentHashMap<UUID, Status>()
- fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
val id = UUID.randomUUID()
simulations[id] = StatusOngoing
updateHealthState()
- simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
- result.fold(
- { err ->
- logger.withWarn { log("Error", err) }
- simulations[id] = StatusFailure(err)
- },
- {
- logger.info { "Finished sending messages" }
- simulations[id] = StatusSuccess
- }
- ).also { updateHealthState() }
- }
+ simulationIo
+ .publishOn(scheduler)
+ .doOnSuccess {
+ logger.info { "Finished sending messages" }
+ simulations[id] = StatusSuccess
+ }
+ .doOnError { err ->
+ logger.withWarn { log("Error", err) }
+ simulations[id] = StatusFailure(err)
+ }
+ .doFinally { updateHealthState() }
+ .subscribe()
+
return id
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index a1042f3e..4fcb1809 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,8 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
-import arrow.effects.IO
import io.vavr.collection.HashSet
+import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -32,9 +32,8 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulator
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -46,36 +45,29 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- {
- logger.info { "Stopping xNF Simulator API server" }
- }
- )
+fun main(args: Array<String>): Unit =
+ ArgXnfSimulatorConfiguration().parse(args)
+ .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startServers)
+ .let(ExitCode::doExit)
-private fun startServers(config: SimulatorConfiguration): IO<Unit> =
- binding {
- logger.info { "Using configuration: $config" }
+private fun startServers(config: SimulatorConfiguration): ExitCode {
+ logger.info { "Using configuration: $config" }
- XnfHealthCheckServer().startServer(config).bind()
+ XnfHealthCheckServer().startServer(config).block()
- val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
- val xnfSimulator = XnfSimulator(
- ClientFactory(clientConfig),
- MessageGeneratorFactory(config.maxPayloadSizeBytes)
- )
- val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
- .start(config.listenAddress).bind()
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
+ val xnfSimulator = XnfSimulator(
+ ClientFactory(clientConfig),
+ MessageGeneratorFactory(config.maxPayloadSizeBytes)
+ )
+ val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
+ .start(config.listenAddress)
+ .block()
- logger.info { "Started xNF Simulator API server" }
- HealthState.INSTANCE.changeState(HealthDescription.IDLE)
+ logger.info { "Started xNF Simulator API server" }
+ HealthState.INSTANCE.changeState(HealthDescription.IDLE)
- xnfApiServerHandler.await().bind()
- }
+ xnfApiServerHandler.await().block()
+ return ExitSuccess
+}
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
index 113c3c42..325d3bb5 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -32,16 +31,17 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
import java.util.*
-import java.util.concurrent.Executors
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since September 2018
*/
internal class OngoingSimulationsTest : Spek({
- val executor = Executors.newSingleThreadExecutor()
- val cut = OngoingSimulations(executor)
+ val scheduler = Schedulers.single()
+ val cut = OngoingSimulations(scheduler)
describe("simulations repository") {
given("not existing task task id") {
@@ -121,19 +121,22 @@ internal class OngoingSimulationsTest : Spek({
}
afterGroup {
- executor.shutdown()
+ scheduler.dispose()
}
}
afterEachTest { cut.clear() }
})
-private fun neverendingTask() = IO.async<Unit> { }
+private fun neverendingTask() = Mono.never<Void>()
-private fun succesfulTask(): IO<Unit> = IO { println("great success!") }
+private fun succesfulTask(): Mono<Void> = Mono.empty<Void>()
+ .doOnSuccess {
+ println("great success")
+ }
-private fun failingTask(): Pair<RuntimeException, IO<Unit>> {
+private fun failingTask(): Pair<RuntimeException, Mono<Void>> {
val cause = RuntimeException("facepalm")
- val task = IO.raiseError<Unit>(cause)
+ val task = Mono.error<Void>(cause)
return Pair(cause, task)
}
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 29281cdc..ea0628c1 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -126,7 +126,7 @@ internal class XnfSimulatorTest : Spek({
whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
// when
- cut.startSimulation(json).map { it.unsafeRunSync() }
+ cut.startSimulation(json).map { it.block() }
// then
verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))