aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt45
1 files changed, 33 insertions, 12 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index ee4734ae..4dfdb845 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,12 +26,16 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.toFlux
import java.io.InputStream
import javax.json.Json
+import javax.json.JsonArray
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,19 +43,36 @@ import javax.json.Json
*/
class XnfSimulator(
private val vesClient: VesHvClient,
- private val messageGenerator: MessageGenerator,
+ private val generatorFactory: MessageGeneratorFactory,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
+
val json = parseJsonArray(messageParameters).bind()
- val parsed = messageParametersParser.parse(json).bind()
- val generatedMessages = messageGenerator.createMessageFlux(parsed)
- vesClient.sendIo(generatedMessages)
+ messageParametersParser.parse(json).bind()
+ .toFlux()
+ .flatMap(::generateMessages)
+ .let { vesClient.sendIo(it) }
}.fix()
- private fun parseJsonArray(jsonStream: InputStream) =
- Try {
- Json.createReader(jsonStream).readArray()
- }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
+ private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
+ Try { Json.createReader(jsonStream).readArray() }
+ .toEither()
+ .mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
+
+ private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+ when (parameters) {
+ is VesEventParameters -> generatorFactory
+ .createVesEventGenerator()
+ .createMessageFlux(parameters)
+ .map(::encodeToWireFrame)
+ is WireFrameParameters -> generatorFactory
+ .createWireFrameGenerator()
+ .createMessageFlux(parameters)
+ else -> throw IllegalStateException("Invalid parameters type")
+ }
+
+ private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
+ WireFrameMessage(event.toByteArray())
}