aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-29 13:24:59 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-09-06 07:43:30 +0200
commitd6f5bfa934b9aa0571e853fc5432ab84eceb9db1 (patch)
tree2422451223385dba0a6b8f5714762a57cf6c002a /hv-collector-xnf-simulator
parent199edc49a418ab015ad3a54a5750f1a3f485b7e7 (diff)
Improve coverage of xNF simulator
Also refactor to make it possible. Change-Id: I6da6d3f33e57c524a7e353ecebd3e045d8ceed2a Issue-ID: DCAEGEN2-739 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-xnf-simulator')
-rw-r--r--hv-collector-xnf-simulator/pom.xml4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt102
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt122
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt115
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt95
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt (renamed from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt)2
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt (renamed from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt)4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt76
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt15
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt107
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt114
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt6
12 files changed, 557 insertions, 205 deletions
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml
index d44e2511..cfe1dc14 100644
--- a/hv-collector-xnf-simulator/pom.xml
+++ b/hv-collector-xnf-simulator/pom.xml
@@ -106,6 +106,10 @@
<artifactId>arrow-effects</artifactId>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
deleted file mode 100644
index 02e6ee72..00000000
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.scheduler.Schedulers
-import javax.json.Json
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-internal class HttpServer(private val vesClient: XnfSimulator,
- private val messageParametersParser: MessageParametersParser = INSTANCE) {
-
- fun start(port: Int): IO<RatpackServer> = IO {
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(this::configureHandlers)
- }
- }
-
- private fun configureHandlers(chain: Chain) {
- chain
- .post("simulator/sync") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- .map { vesClient.sendIo(it) }
- .map { it.unsafeRunSync() }
- .onError { handleException(it, ctx) }
- .then { sendAcceptedResponse(ctx) }
- }
- .post("simulator/async") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- .map { vesClient.sendRx(it) }
- .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
- .onError { handleException(it, ctx) }
- .then { sendAcceptedResponse(ctx) }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(STATUS_OK).send()
- }
- }
-
- private fun sendAcceptedResponse(ctx: Context) {
- ctx.response
- .status(STATUS_OK)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request accepted")
- .build()
- .toString())
- }
-
- private fun handleException(t: Throwable, ctx: Context) {
- logger.warn("Failed to process the request - ${t.localizedMessage}")
- logger.debug("Exception thrown when processing the request", t)
- ctx.response
- .status(STATUS_BAD_REQUEST)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request was not accepted")
- .add("exception", t.localizedMessage)
- .build()
- .toString())
- }
-
- companion object {
- private val logger = Logger(HttpServer::class)
- const val STATUS_OK = 200
- const val STATUS_BAD_REQUEST = 400
- const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
- }
-}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index e8a474d0..558bd1c1 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -19,98 +19,40 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.core.Either
+import arrow.core.Some
+import arrow.core.Try
+import arrow.core.fix
+import arrow.core.flatMap
+import arrow.core.monad
import arrow.effects.IO
-import io.netty.handler.ssl.ClientAuth
-import io.netty.handler.ssl.SslContext
-import io.netty.handler.ssl.SslContextBuilder
-import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
-
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import java.io.InputStream
+import javax.json.Json
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
*/
-internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
-
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security))
- }
- .build()
-
- fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
- sendRx(messages).block()
- }
-
- fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
- val complete = ReplayProcessor.create<Void>(1)
- client
- .newHandler { _, output -> handler(complete, messages, output) }
- .doOnError {
- logger.info("Failed to connect to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
- }
- .subscribe {
- logger.info("Connected to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
- }
- return complete.then()
- }
-
- private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<PayloadWireFrameMessage>,
- nettyOutbound: NettyOutbound): Publisher<Void> {
-
- val allocator = nettyOutbound.alloc()
- val encoder = WireFrameEncoder(allocator)
- val frames = messages
- .map(encoder::encode)
- .window(MAX_BATCH_SIZE)
-
- return nettyOutbound
- .logConnectionClosed()
- .options { it.flushOnBoundary() }
- .sendGroups(frames)
- .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
- .then {
- logger.info("Messages have been sent")
- complete.onComplete()
- }
- .then()
- }
-
- private fun createSslContext(config: SecurityConfiguration): SslContext =
- SslContextBuilder.forClient()
- .keyManager(config.cert.toFile(), config.privateKey.toFile())
- .trustManager(config.trustedCert.toFile())
- .sslProvider(SslProvider.OPENSSL)
- .clientAuth(ClientAuth.REQUIRE)
- .build()
-
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
-
- companion object {
- private val logger = Logger(XnfSimulator::class)
- private const val MAX_BATCH_SIZE = 128
- private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
- }
+class XnfSimulator(
+ private val vesClient: VesHvClient,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+ private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+ fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+ Either.monad<ParsingError>().binding {
+ val json = parseJsonArray(messageParameters).bind()
+ val parsed = messageParametersParser.parse(json).bind()
+ val generatedMessages = messageGenerator.createMessageFlux(parsed)
+ vesClient.sendIo(generatedMessages)
+ }.fix()
+
+ private fun parseJsonArray(jsonStream: InputStream) =
+ Try {
+ Json.createReader(jsonStream).readArray()
+ }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
new file mode 100644
index 00000000..22e47d75
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import io.netty.handler.ssl.ClientAuth
+import io.netty.handler.ssl.SslContext
+import io.netty.handler.ssl.SslContextBuilder
+import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpClient
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(private val configuration: SimulatorConfiguration) {
+
+ private val client: TcpClient = TcpClient.builder()
+ .options { opts ->
+ opts.host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .sslContext(createSslContext(configuration.security))
+ }
+ .build()
+
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+ sendRx(messages).then(Mono.just(Unit)).asIo()
+
+ private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+ val complete = ReplayProcessor.create<Void>(1)
+ client
+ .newHandler { _, output -> handler(complete, messages, output) }
+ .doOnError {
+ logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ .subscribe {
+ logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ return complete.then()
+ }
+
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound): Publisher<Void> {
+
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
+ val frames = messages
+ .map(encoder::encode)
+ .window(MAX_BATCH_SIZE)
+
+ return nettyOutbound
+ .logConnectionClosed()
+ .options { it.flushOnBoundary() }
+ .sendGroups(frames)
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
+ .then {
+ logger.info("Messages have been sent")
+ complete.onComplete()
+ }
+ .then()
+ }
+
+ private fun createSslContext(config: SecurityConfiguration): SslContext =
+ SslContextBuilder.forClient()
+ .keyManager(config.cert.toFile(), config.privateKey.toFile())
+ .trustManager(config.trustedCert.toFile())
+ .sslProvider(SslProvider.OPENSSL)
+ .clientAuth(ClientAuth.REQUIRE)
+ .build()
+
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
+ companion object {
+ private val logger = Logger(VesHvClient::class)
+ private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
+ }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
new file mode 100644
index 00000000..54ead6f7
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.utils.http.Content
+import org.onap.dcae.collectors.veshv.utils.http.ContentType
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.TypedData
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal class XnfApiServer(
+ private val xnfSimulator: XnfSimulator,
+ private val ongoingSimulations: OngoingSimulations) {
+
+ fun start(port: Int): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
+ }
+ }
+
+ private fun configureHandlers(chain: Chain) {
+ chain
+ .post("simulator", ::startSimulationHandler)
+ .post("simulator/async", ::startSimulationHandler)
+ .get("simulator/:id", ::simulatorStatusHandler)
+ .get("healthcheck") { ctx ->
+ logger.info("Checking health")
+ ctx.response.status(HttpConstants.STATUS_OK).send()
+ }
+ }
+
+ private fun startSimulationHandler(ctx: Context) {
+ logger.info("Starting asynchronous scenario")
+ ctx.request.body.then { body ->
+ val id = startSimulation(body)
+ ctx.response.sendEitherErrorOrResponse(id)
+ }
+ }
+
+ private fun startSimulation(body: TypedData): Either<ParsingError, Response> {
+ return xnfSimulator.startSimulation(body.inputStream)
+ .map(ongoingSimulations::startAsynchronousSimulation)
+ .map(Responses::acceptedResponse)
+ }
+
+ private fun simulatorStatusHandler(ctx: Context) {
+ val id = UUID.fromString(ctx.pathTokens["id"])
+ val status = ongoingSimulations.status(id)
+ val response = Responses.statusResponse(status.toString(), status.message)
+ ctx.response.sendAndHandleErrors(IO.just(response))
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index 999d0327..56d6212a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import arrow.core.ForOption
import arrow.core.Option
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 708ffd13..9b6ef209 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
@@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-internal data class SimulatorConfiguration(
+data class SimulatorConfiguration(
val listenPort: Int,
val vesHost: String,
val vesPort: Int,
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
new file mode 100644
index 00000000..95bb4897
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import arrow.effects.IO
+import kotlinx.coroutines.experimental.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executor
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+ private val asyncSimulationContext = executor.asCoroutineDispatcher()
+ private val simulations = ConcurrentHashMap<UUID, Status>()
+
+ fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ val id = UUID.randomUUID()
+ simulations[id] = StatusOngoing
+
+ simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
+ result.fold(
+ { err ->
+ logger.warn("Error", err)
+ simulations[id] = StatusFailure(err)
+ },
+ {
+ logger.info("Finished sending messages")
+ simulations[id] = StatusSuccess
+ }
+ )
+ }
+ return id
+ }
+
+ fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+ internal fun clear() {
+ simulations.clear()
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
+
+sealed class Status(val message: String) {
+ override fun toString() = this::class.simpleName ?: "null"
+}
+
+object StatusNotFound : Status("not found")
+object StatusOngoing : Status("ongoing")
+object StatusSuccess : Status("success")
+data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index fa6d626b..c9e900ac 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,12 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
*/
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map {config ->
- XnfSimulator(config)
- .let { HttpServer(it) }
+ .map { config ->
+ XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
.start(config.listenPort)
- .void()
+ .unit()
}
.unsafeRunEitherSync(
{ ex ->
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
new file mode 100644
index 00000000..70d8ba83
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.effects.IO
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
+import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class OngoingSimulationsTest : Spek({
+ val executor = Executors.newSingleThreadExecutor()
+ val cut = OngoingSimulations(executor)
+
+ describe("simulations repository") {
+ given("not existing task task id") {
+ val id = UUID.randomUUID()
+
+ on("status") {
+ val result = cut.status(id)
+
+ it("should have 'not found' status") {
+ assertThat(result).isEqualTo(StatusNotFound)
+ }
+ }
+ }
+
+ given("never ending task") {
+ val task = IO.async<Unit> { }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have ongoing status") {
+ assertThat(cut.status(result)).isEqualTo(StatusOngoing)
+ }
+ }
+ }
+
+ given("failing task") {
+ val cause = RuntimeException("facepalm")
+ val task = IO.raiseError<Unit>(cause)
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have failing status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusFailure(cause))
+ }
+ }
+ }
+ }
+
+ given("successful task") {
+ val task = IO { println("great success!") }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have successful status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusSuccess)
+ }
+ }
+ }
+ }
+
+ afterGroup {
+ executor.shutdown()
+ }
+ }
+
+ afterEachTest { cut.clear() }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
new file mode 100644
index 00000000..80f39579
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Right
+import arrow.effects.IO
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class XnfSimulatorTest : Spek({
+ lateinit var cut: XnfSimulator
+ lateinit var vesClient: VesHvClient
+ lateinit var messageParametersParser: MessageParametersParser
+ lateinit var messageGenerator: MessageGenerator
+
+ beforeEachTest {
+ vesClient = mock()
+ messageParametersParser = mock()
+ messageGenerator = mock()
+ cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+ }
+
+ describe("startSimulation") {
+ it("should fail when empty input stream") {
+ // given
+ val emptyInputStream = ByteInputStream()
+
+ // when
+ val result = cut.startSimulation(emptyInputStream)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when invalid JSON") {
+ // given
+ val invalidJson = "invalid json".byteInputStream()
+
+ // when
+ val result = cut.startSimulation(invalidJson)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when JSON syntax is valid but content is invalid") {
+ // given
+ val json = "[1,2,3]".byteInputStream()
+ val cause = ParsingError("epic fail", None)
+ whenever(messageParametersParser.parse(any())).thenReturn(
+ Left(cause))
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).left().isEqualTo(cause)
+ }
+
+ it("should return generated messages") {
+ // given
+ val json = "[true]".byteInputStream()
+ val messageParams = listOf<MessageParameters>()
+ val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+ val sendingIo = IO {}
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+ whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
+ whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).right().isSameAs(sendingIo)
+ }
+ }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
index 8749dc5b..69caf727 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
@@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError