diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-02-04 15:20:14 +0100 |
---|---|---|
committer | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-02-15 15:09:48 +0100 |
commit | df17f466577b97a12fac39b64b5d113f32b82f2e (patch) | |
tree | 0a8999e593c90f97ed1b4f45b6e8adbbc110a787 /sources/hv-collector-xnf-simulator | |
parent | e7204cbcf6af61856330cffc541b6f5c78476a09 (diff) |
Generate VesEvents in hv-ves/message-generator
- Split message generator on two specialized generators
for VesEvent and WireFrame related message types
- Refactor whole message-generator module
Change-Id: I1266b549a9a4d27213d03e8921298deab2dacb59
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1162
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
3 files changed, 60 insertions, 42 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index ee4734ae..4dfdb845 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,16 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.* +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.toFlux import java.io.InputStream import javax.json.Json +import javax.json.JsonArray /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -39,19 +43,36 @@ import javax.json.Json */ class XnfSimulator( private val vesClient: VesHvClient, - private val messageGenerator: MessageGenerator, + private val generatorFactory: MessageGeneratorFactory, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { + val json = parseJsonArray(messageParameters).bind() - val parsed = messageParametersParser.parse(json).bind() - val generatedMessages = messageGenerator.createMessageFlux(parsed) - vesClient.sendIo(generatedMessages) + messageParametersParser.parse(json).bind() + .toFlux() + .flatMap(::generateMessages) + .let { vesClient.sendIo(it) } }.fix() - private fun parseJsonArray(jsonStream: InputStream) = - Try { - Json.createReader(jsonStream).readArray() - }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) } + private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> = + Try { Json.createReader(jsonStream).readArray() } + .toEither() + .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } + + private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> = + when (parameters) { + is VesEventParameters -> generatorFactory + .createVesEventGenerator() + .createMessageFlux(parameters) + .map(::encodeToWireFrame) + is WireFrameParameters -> generatorFactory + .createWireFrameGenerator() + .createMessageFlux(parameters) + else -> throw IllegalStateException("Invalid parameters type") + } + + private fun encodeToWireFrame(event: VesEvent): WireFrameMessage = + WireFrameMessage(event.toByteArray()) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 308c6864..ef627304 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -27,10 +27,10 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync @@ -67,7 +67,8 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = XnfHealthCheckServer().startServer(config).bind() val xnfSimulator = XnfSimulator( VesHvClient(config), - MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + MessageGeneratorFactory(config.maxPayloadSizeBytes) + ) XnfApiServer(xnfSimulator, OngoingSimulations()) .start(config.listenAddress) .bind() diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 95510e77..192725b9 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,23 +21,18 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Left import arrow.core.None -import arrow.core.Right -import arrow.effects.IO import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError -import reactor.core.publisher.Flux +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory import java.io.ByteArrayInputStream /** @@ -48,13 +43,13 @@ internal class XnfSimulatorTest : Spek({ lateinit var cut: XnfSimulator lateinit var vesClient: VesHvClient lateinit var messageParametersParser: MessageParametersParser - lateinit var messageGenerator: MessageGenerator + lateinit var generatorFactory: MessageGeneratorFactory beforeEachTest { vesClient = mock() messageParametersParser = mock() - messageGenerator = mock() - cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser) + generatorFactory = mock() + cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser) } describe("startSimulation") { @@ -94,21 +89,22 @@ internal class XnfSimulatorTest : Spek({ assertThat(result).left().isEqualTo(cause) } - it("should return generated messages") { - // given - val json = "[true]".byteInputStream() - val messageParams = listOf<MessageParameters>() - val generatedMessages = Flux.empty<WireFrameMessage>() - val sendingIo = IO {} - whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) - whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) - whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) - - // when - val result = cut.startSimulation(json) - - // then - assertThat(result).right().isSameAs(sendingIo) - } + // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator +// it("should return generated messages") { +// // given +// val json = "[true]".byteInputStream() +// val messageParams = listOf<MessageParameters>() +// val generatedMessages = Flux.empty<WireFrameMessage>() +// val sendingIo = IO {} +// whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) +// whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) +// whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) +// +// // when +// val result = cut.startSimulation(json) +// +// // then +// assertThat(result).right().isSameAs(sendingIo) +// } } -}) +})
\ No newline at end of file |