From bacba429e2dd6b3048da7e75800f5ad200952599 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Tue, 19 Feb 2019 18:06:33 +0100 Subject: Use sdk/hvves-producer in hvves/xnf-simulator Change-Id: I8f493b0edd2cbaef136a22d914ad24198bb63a7f Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1253 --- sources/hv-collector-xnf-simulator/pom.xml | 4 - .../veshv/simulators/xnf/impl/XnfSimulator.kt | 54 ++++++----- .../simulators/xnf/impl/adapters/HvVesClient.kt | 49 ++++++++++ .../simulators/xnf/impl/adapters/VesHvClient.kt | 106 --------------------- .../xnf/impl/config/ClientConfiguration.kt | 31 ++++++ .../simulators/xnf/impl/factory/ClientFactory.kt | 51 ++++++++++ .../dcae/collectors/veshv/simulators/xnf/main.kt | 7 +- .../dcae/collectors/veshv/main/HvVesClientTest.kt | 69 ++++++++++++++ .../dcae/collectors/veshv/main/XnfSimulatorTest.kt | 64 +++++++++---- 9 files changed, 279 insertions(+), 156 deletions(-) create mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt delete mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt create mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt create mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt create mode 100644 sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt (limited to 'sources/hv-collector-xnf-simulator') diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml index a8134100..69ca53b2 100644 --- a/sources/hv-collector-xnf-simulator/pom.xml +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -109,10 +109,6 @@ org.onap.dcaegen2.services.sdk hvvesclient-producer-impl - - org.onap.dcaegen2.services.sdk - hvvesclient-producer-api - ${project.parent.groupId} hv-collector-test-utils diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 4dfdb845..812afe19 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,12 +26,15 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient -import org.onap.dcae.collectors.veshv.ves.message.generator.api.* +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream import javax.json.Json @@ -42,18 +45,18 @@ import javax.json.JsonArray * @since August 2018 */ class XnfSimulator( - private val vesClient: VesHvClient, + private val clientFactory: ClientFactory, private val generatorFactory: MessageGeneratorFactory, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } + private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + fun startSimulation(messageParameters: InputStream): Either> = Either.monad().binding { - val json = parseJsonArray(messageParameters).bind() - messageParametersParser.parse(json).bind() - .toFlux() - .flatMap(::generateMessages) - .let { vesClient.sendIo(it) } + val parameters = messageParametersParser.parse(json).bind() + simulationFrom(parameters) }.fix() private fun parseJsonArray(jsonStream: InputStream): Either = @@ -61,18 +64,23 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun generateMessages(parameters: MessageParameters): Flux = + private fun simulationFrom(parameters: List): IO = parameters + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun simulate(parameters: MessageParameters): Mono = when (parameters) { - is VesEventParameters -> generatorFactory - .createVesEventGenerator() - .createMessageFlux(parameters) - .map(::encodeToWireFrame) - is WireFrameParameters -> generatorFactory - .createWireFrameGenerator() - .createMessageFlux(parameters) - else -> throw IllegalStateException("Invalid parameters type") + is VesEventParameters -> { + val messages = vesEventGenerator.createMessageFlux(parameters) + val client = clientFactory.create() + client.sendVesEvents(messages) + } + is WireFrameParameters -> { + val messages = wireFrameGenerator.createMessageFlux(parameters) + val client = clientFactory.create(parameters.wireFrameVersion) + client.sendRawPayload(messages) + } } - - private fun encodeToWireFrame(event: VesEvent): WireFrameMessage = - WireFrameMessage(event.toByteArray()) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt new file mode 100644 index 00000000..afc157c4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +class HvVesClient(private val producer: HvVesProducer) { + + fun sendVesEvents(messages: Flux): Mono = + producer.send(messages) + .then { logger.info { "Ves Events have been sent" } } + + + fun sendRawPayload(messages: Flux): Mono = + producer.sendRaw(messages, PayloadType.UNDEFINED) + .then { logger.info { "Raw messages have been sent" } } + + companion object { + private val logger = Logger(HvVesClient::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt deleted file mode 100644 index eba8ed88..00000000 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters - -import arrow.core.Option -import arrow.core.getOrElse -import io.netty.handler.ssl.SslContext -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpClient -import reactor.util.concurrent.Queues.XS_BUFFER_SIZE - -/** - * @author Jakub Dudycz - * @since June 2018 - */ -class VesHvClient(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.create() - .addressSupplier { configuration.hvVesAddress } - .configureSsl() - - private fun TcpClient.configureSsl() = - createSslContext(configuration.security) - .map { sslContext -> this.secure(sslContext) } - .getOrElse { this } - - fun sendIo(messages: Flux) = - sendRx(messages).then(Mono.just(Unit)).asIo() - - private fun sendRx(messages: Flux): Mono { - val complete = ReplayProcessor.create(1) - client - .handle { _, output -> handler(complete, messages, output) } - .connect() - .doOnError { - logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" } - } - .subscribe { - logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" } - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor, - messages: Flux, - nettyOutbound: NettyOutbound): Publisher { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(XS_BUFFER_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .then { - logger.info { "Messages have been sent" } - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): Option = - SslContextFactory().createClientContext(config) - - private fun NettyOutbound.logConnectionClosed() = - withConnection { conn -> - conn.onDispose { - logger.info { "Connection to ${conn.address()} has been closed" } - } - } - - companion object { - private val logger = Logger(VesHvClient::class) - } -} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt new file mode 100644 index 00000000..1db66f11 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import io.vavr.collection.Set +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +data class ClientConfiguration(val collectorAddresses: Set, + val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt new file mode 100644 index 00000000..a91fccd4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory + +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +class ClientFactory(configuration: ClientConfiguration) { + + private val partialConfig = ImmutableProducerOptions + .builder() + .collectorAddresses(configuration.collectorAddresses) + .let { producerOptions -> + configuration.security.keys.fold( + { producerOptions }, + { producerOptions.securityKeys(it) }) + } + + fun create(wireFrameVersion: WireFrameVersion): HvVesClient = + buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) + + + fun create(): HvVesClient = buildClient(partialConfig) + + private fun buildClient(config: ImmutableProducerOptions.Builder) = + HvVesClient(HvVesProducerFactory.create(config.build())) +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index ef627304..366c7e66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -23,15 +23,17 @@ import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monad.monad import arrow.typeclasses.binding +import io.vavr.collection.HashSet import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried @@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) val xnfSimulator = XnfSimulator( - VesHvClient(config), + ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) ) XnfApiServer(xnfSimulator, OngoingSimulations()) diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt new file mode 100644 index 00000000..daf30617 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.ves.VesEventOuterClass +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +internal class HvVesClientTest : Spek({ + describe("HvVesClient") { + val hvVesProducer: HvVesProducer = mock() + val cut = HvVesClient(hvVesProducer) + + describe("handling ves events stream") { + + val vesEvents = Flux.empty() + whenever(hvVesProducer.send(any())).thenReturn(Mono.empty()) + cut.sendVesEvents(vesEvents) + + it("should perform sending operation") { + verify(hvVesProducer).send(vesEvents) + } + } + + describe("handling raw message stream") { + + val rawMessages = Flux.empty() + whenever(hvVesProducer.sendRaw(any(), any())).thenReturn(Mono.empty()) + cut.sendRawPayload(rawMessages) + + it("should perform sending operation") { + verify(hvVesProducer).sendRaw(eq(rawMessages), any()) + } + } + } +}) \ No newline at end of file diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 192725b9..123f12ae 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -21,18 +21,28 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Left import arrow.core.None +import arrow.core.Right import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.ves.VesEventOuterClass +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.io.ByteArrayInputStream /** @@ -41,15 +51,15 @@ import java.io.ByteArrayInputStream */ internal class XnfSimulatorTest : Spek({ lateinit var cut: XnfSimulator - lateinit var vesClient: VesHvClient + lateinit var clientFactory: ClientFactory lateinit var messageParametersParser: MessageParametersParser lateinit var generatorFactory: MessageGeneratorFactory beforeEachTest { - vesClient = mock() + clientFactory = mock() messageParametersParser = mock() generatorFactory = mock() - cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser) + cut = XnfSimulator(clientFactory, generatorFactory, messageParametersParser) } describe("startSimulation") { @@ -89,22 +99,34 @@ internal class XnfSimulatorTest : Spek({ assertThat(result).left().isEqualTo(cause) } - // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator -// it("should return generated messages") { -// // given -// val json = "[true]".byteInputStream() -// val messageParams = listOf() -// val generatedMessages = Flux.empty() -// val sendingIo = IO {} -// whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) -// whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) -// whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) -// -// // when -// val result = cut.startSimulation(json) -// -// // then -// assertThat(result).right().isSameAs(sendingIo) -// } + it("should return generated ves messages") { + // given + val vesEventGenerator: VesEventGenerator = mock() + val vesClient: HvVesClient = mock() + + val json = "[true]".byteInputStream() + + val vesEventParams = VesEventParameters( + CommonEventHeader.getDefaultInstance(), + VesEventType.VALID, + 1 + ) + val messageParams = listOf(vesEventParams) + + val generatedMessages = Flux.empty() + + + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator) + whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages) + whenever(clientFactory.create()).thenReturn(vesClient) + whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit)) + + // when + cut.startSimulation(json).map { it.unsafeRunSync() } + + // then + verify(vesClient).sendVesEvents(generatedMessages) + } } }) \ No newline at end of file -- cgit 1.2.3-korg