diff options
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt')
-rw-r--r-- | sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt | 48 |
1 files changed, 38 insertions, 10 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 53a8826c..93c43173 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,6 +26,7 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters @@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass +import reactor.core.Disposable +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream +import java.nio.ByteBuffer import javax.json.Json import javax.json.JsonArray @@ -52,6 +58,8 @@ class XnfSimulator( private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + private val defaultHvVesClient by lazy { clientFactory.create() } + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { val json = parseJsonArray(messageParameters).bind() @@ -64,23 +72,43 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters - .toFlux() - .flatMap(::simulate) - .then(Mono.just(Unit)) - .asIo() - private fun simulate(parameters: MessageParameters): Mono<Unit> = + private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = + parameters + .map(::asClientToMessages) + .groupMessagesByClients() + .flattenValuesToFlux() + .toList() + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() = + groupBy({ it.first }, { it.second }) + + private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> = + mapValues { Flux.concat(it.value) } + + private fun asClientToMessages(parameters: MessageParameters) = when (parameters) { is VesEventParameters -> { - val messages = vesEventGenerator.createMessageFlux(parameters) - val client = clientFactory.create() - client.sendVesEvents(messages) + val messages = vesEventGenerator + .createMessageFlux(parameters) + .map(VesEventOuterClass.VesEvent::toByteBuffer) + Pair(defaultHvVesClient, messages) } is WireFrameParameters -> { val messages = wireFrameGenerator.createMessageFlux(parameters) val client = clientFactory.create(parameters.wireFrameVersion) - client.sendRawPayload(messages) + Pair(client, messages) } } + + private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable = + pair.first + .sendRawPayload(pair.second, PayloadType.PROTOBUF) + .subscribe() } + +internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer() |