summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator/src/main
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-02-04 15:20:14 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-02-15 15:09:48 +0100
commitdf17f466577b97a12fac39b64b5d113f32b82f2e (patch)
tree0a8999e593c90f97ed1b4f45b6e8adbbc110a787 /sources/hv-collector-xnf-simulator/src/main
parente7204cbcf6af61856330cffc541b6f5c78476a09 (diff)
Generate VesEvents in hv-ves/message-generator
- Split message generator on two specialized generators for VesEvent and WireFrame related message types - Refactor whole message-generator module Change-Id: I1266b549a9a4d27213d03e8921298deab2dacb59 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1162
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt45
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt7
2 files changed, 37 insertions, 15 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index ee4734ae..4dfdb845 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,12 +26,16 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.toFlux
import java.io.InputStream
import javax.json.Json
+import javax.json.JsonArray
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,19 +43,36 @@ import javax.json.Json
*/
class XnfSimulator(
private val vesClient: VesHvClient,
- private val messageGenerator: MessageGenerator,
+ private val generatorFactory: MessageGeneratorFactory,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
+
val json = parseJsonArray(messageParameters).bind()
- val parsed = messageParametersParser.parse(json).bind()
- val generatedMessages = messageGenerator.createMessageFlux(parsed)
- vesClient.sendIo(generatedMessages)
+ messageParametersParser.parse(json).bind()
+ .toFlux()
+ .flatMap(::generateMessages)
+ .let { vesClient.sendIo(it) }
}.fix()
- private fun parseJsonArray(jsonStream: InputStream) =
- Try {
- Json.createReader(jsonStream).readArray()
- }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
+ private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
+ Try { Json.createReader(jsonStream).readArray() }
+ .toEither()
+ .mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
+
+ private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+ when (parameters) {
+ is VesEventParameters -> generatorFactory
+ .createVesEventGenerator()
+ .createMessageFlux(parameters)
+ .map(::encodeToWireFrame)
+ is WireFrameParameters -> generatorFactory
+ .createWireFrameGenerator()
+ .createMessageFlux(parameters)
+ else -> throw IllegalStateException("Invalid parameters type")
+ }
+
+ private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
+ WireFrameMessage(event.toByteArray())
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 308c6864..ef627304 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -27,10 +27,10 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
@@ -67,7 +67,8 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
XnfHealthCheckServer().startServer(config).bind()
val xnfSimulator = XnfSimulator(
VesHvClient(config),
- MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ MessageGeneratorFactory(config.maxPayloadSizeBytes)
+ )
XnfApiServer(xnfSimulator, OngoingSimulations())
.start(config.listenAddress)
.bind()