diff options
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt')
-rw-r--r-- | sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt | 45 |
1 files changed, 33 insertions, 12 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index ee4734ae..4dfdb845 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,16 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.* +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.toFlux import java.io.InputStream import javax.json.Json +import javax.json.JsonArray /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -39,19 +43,36 @@ import javax.json.Json */ class XnfSimulator( private val vesClient: VesHvClient, - private val messageGenerator: MessageGenerator, + private val generatorFactory: MessageGeneratorFactory, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { + val json = parseJsonArray(messageParameters).bind() - val parsed = messageParametersParser.parse(json).bind() - val generatedMessages = messageGenerator.createMessageFlux(parsed) - vesClient.sendIo(generatedMessages) + messageParametersParser.parse(json).bind() + .toFlux() + .flatMap(::generateMessages) + .let { vesClient.sendIo(it) } }.fix() - private fun parseJsonArray(jsonStream: InputStream) = - Try { - Json.createReader(jsonStream).readArray() - }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) } + private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> = + Try { Json.createReader(jsonStream).readArray() } + .toEither() + .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } + + private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> = + when (parameters) { + is VesEventParameters -> generatorFactory + .createVesEventGenerator() + .createMessageFlux(parameters) + .map(::encodeToWireFrame) + is WireFrameParameters -> generatorFactory + .createWireFrameGenerator() + .createMessageFlux(parameters) + else -> throw IllegalStateException("Invalid parameters type") + } + + private fun encodeToWireFrame(event: VesEvent): WireFrameMessage = + WireFrameMessage(event.toByteArray()) } |