summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt48
1 files changed, 38 insertions, 10 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 53a8826c..93c43173 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -26,6 +26,7 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
@@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
+import java.nio.ByteBuffer
import javax.json.Json
import javax.json.JsonArray
@@ -52,6 +58,8 @@ class XnfSimulator(
private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+ private val defaultHvVesClient by lazy { clientFactory.create() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
val json = parseJsonArray(messageParameters).bind()
@@ -64,23 +72,43 @@ class XnfSimulator(
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
- .toFlux()
- .flatMap(::simulate)
- .then(Mono.just(Unit))
- .asIo()
- private fun simulate(parameters: MessageParameters): Mono<Unit> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+ parameters
+ .map(::asClientToMessages)
+ .groupMessagesByClients()
+ .flattenValuesToFlux()
+ .toList()
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
+ groupBy({ it.first }, { it.second })
+
+ private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> =
+ mapValues { Flux.concat(it.value) }
+
+ private fun asClientToMessages(parameters: MessageParameters) =
when (parameters) {
is VesEventParameters -> {
- val messages = vesEventGenerator.createMessageFlux(parameters)
- val client = clientFactory.create()
- client.sendVesEvents(messages)
+ val messages = vesEventGenerator
+ .createMessageFlux(parameters)
+ .map(VesEventOuterClass.VesEvent::toByteBuffer)
+ Pair(defaultHvVesClient, messages)
}
is WireFrameParameters -> {
val messages = wireFrameGenerator.createMessageFlux(parameters)
val client = clientFactory.create(parameters.wireFrameVersion)
- client.sendRawPayload(messages)
+ Pair(client, messages)
}
}
+
+ private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable =
+ pair.first
+ .sendRawPayload(pair.second, PayloadType.PROTOBUF)
+ .subscribe()
}
+
+internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer()