diff options
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
9 files changed, 78 insertions, 60 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 53a8826c..93c43173 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,6 +26,7 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters @@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass +import reactor.core.Disposable +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream +import java.nio.ByteBuffer import javax.json.Json import javax.json.JsonArray @@ -52,6 +58,8 @@ class XnfSimulator( private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + private val defaultHvVesClient by lazy { clientFactory.create() } + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { val json = parseJsonArray(messageParameters).bind() @@ -64,23 +72,43 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters - .toFlux() - .flatMap(::simulate) - .then(Mono.just(Unit)) - .asIo() - private fun simulate(parameters: MessageParameters): Mono<Unit> = + private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = + parameters + .map(::asClientToMessages) + .groupMessagesByClients() + .flattenValuesToFlux() + .toList() + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() = + groupBy({ it.first }, { it.second }) + + private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> = + mapValues { Flux.concat(it.value) } + + private fun asClientToMessages(parameters: MessageParameters) = when (parameters) { is VesEventParameters -> { - val messages = vesEventGenerator.createMessageFlux(parameters) - val client = clientFactory.create() - client.sendVesEvents(messages) + val messages = vesEventGenerator + .createMessageFlux(parameters) + .map(VesEventOuterClass.VesEvent::toByteBuffer) + Pair(defaultHvVesClient, messages) } is WireFrameParameters -> { val messages = wireFrameGenerator.createMessageFlux(parameters) val client = clientFactory.create(parameters.wireFrameVersion) - client.sendRawPayload(messages) + Pair(client, messages) } } + + private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable = + pair.first + .sendRawPayload(pair.second, PayloadType.PROTOBUF) + .subscribe() } + +internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt index afc157c4..19579431 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -23,10 +23,10 @@ import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType -import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -34,15 +34,11 @@ import java.nio.ByteBuffer */ class HvVesClient(private val producer: HvVesProducer) { - fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> = - producer.send(messages) - .then { logger.info { "Ves Events have been sent" } } + fun sendRawPayload(messages: Flux<ByteBuffer>, payloadType: PayloadType = PayloadType.UNDEFINED): Mono<Unit> = + producer.sendRaw(messages, payloadType) + .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } } - fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> = - producer.sendRaw(messages, PayloadType.UNDEFINED) - .then { logger.info { "Raw messages have been sent" } } - companion object { private val logger = Logger(HvVesClient::class) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index b5751a3f..0891e499 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -26,19 +26,19 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -71,14 +71,14 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) - val security = createSecurityConfiguration(cmdLine) - .doOnFailure { ex -> - logger.withError { - log("Could not read security keys", ex) + val security = createSecurityConfigurationProvider(cmdLine) + .doOnFailure { ex -> + logger.withError { + log("Could not read security keys", ex) + } } - } - .toOption() - .bind() + .toOption() + .bind() SimulatorConfiguration( InetSocketAddress(listenPort), diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt index 1db66f11..e9fecd66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -28,4 +28,4 @@ import java.net.InetSocketAddress * @since February 2019 */ data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>, - val security: SecurityConfiguration) + val securityProvider: () -> SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 5a0e73c7..0021ed82 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -31,4 +31,4 @@ data class SimulatorConfiguration( val healthCheckApiListenAddress: InetSocketAddress, val hvVesAddress: InetSocketAddress, val maxPayloadSizeBytes: Int, - val security: SecurityConfiguration) + val securityProvider: () -> SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt index a91fccd4..72a1165e 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -29,23 +29,25 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since February 2019 */ -class ClientFactory(configuration: ClientConfiguration) { +class ClientFactory(private val configuration: ClientConfiguration) { - private val partialConfig = ImmutableProducerOptions + fun create() = hvVesClient(partialConfiguration().build()) + + fun create(wireFrameVersion: WireFrameVersion) = hvVesClient( + partialConfiguration() + .wireFrameVersion(wireFrameVersion) + .build()) + + private fun partialConfiguration() = ImmutableProducerOptions .builder() .collectorAddresses(configuration.collectorAddresses) - .let { producerOptions -> - configuration.security.keys.fold( - { producerOptions }, - { producerOptions.securityKeys(it) }) + .let { options -> + configuration.securityProvider().keys.fold( + { options }, + { options.securityKeys(it) }) } - fun create(wireFrameVersion: WireFrameVersion): HvVesClient = - buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) - - - fun create(): HvVesClient = buildClient(partialConfig) + private fun hvVesClient(producerOptions: ImmutableProducerOptions) = + HvVesClient(HvVesProducerFactory.create(producerOptions)) - private fun buildClient(config: ImmutableProducerOptions.Builder) = - HvVesClient(HvVesProducerFactory.create(config.build())) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 366c7e66..baa231c5 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -67,7 +67,7 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() - val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) val xnfSimulator = XnfSimulator( ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt index daf30617..14061532 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt @@ -44,17 +44,6 @@ internal class HvVesClientTest : Spek({ val hvVesProducer: HvVesProducer = mock() val cut = HvVesClient(hvVesProducer) - describe("handling ves events stream") { - - val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>() - whenever(hvVesProducer.send(any())).thenReturn(Mono.empty()) - cut.sendVesEvents(vesEvents) - - it("should perform sending operation") { - verify(hvVesProducer).send(vesEvents) - } - } - describe("handling raw message stream") { val rawMessages = Flux.empty<ByteBuffer>() diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 123f12ae..29281cdc 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -23,6 +23,7 @@ import arrow.core.Left import arrow.core.None import arrow.core.Right import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever @@ -39,6 +40,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParamete import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType import org.onap.ves.VesEventOuterClass import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux @@ -120,13 +122,14 @@ internal class XnfSimulatorTest : Spek({ whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator) whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages) whenever(clientFactory.create()).thenReturn(vesClient) - whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit)) + + whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit)) // when cut.startSimulation(json).map { it.unsafeRunSync() } // then - verify(vesClient).sendVesEvents(generatedMessages) + verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF)) } } })
\ No newline at end of file |