summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-02-19 18:06:33 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-02-21 12:54:54 +0100
commitbacba429e2dd6b3048da7e75800f5ad200952599 (patch)
treef62d89972415e8e8da51fd042b980048efea0ecd /sources/hv-collector-xnf-simulator
parent82b27ff5bccc925fe03d05f259cf881fafc8a1ce (diff)
Use sdk/hvves-producer in hvves/xnf-simulator
Change-Id: I8f493b0edd2cbaef136a22d914ad24198bb63a7f Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1253
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
-rw-r--r--sources/hv-collector-xnf-simulator/pom.xml4
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt54
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt49
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt106
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt31
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt51
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt7
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt69
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt64
9 files changed, 279 insertions, 156 deletions
diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml
index a8134100..69ca53b2 100644
--- a/sources/hv-collector-xnf-simulator/pom.xml
+++ b/sources/hv-collector-xnf-simulator/pom.xml
@@ -110,10 +110,6 @@
<artifactId>hvvesclient-producer-impl</artifactId>
</dependency>
<dependency>
- <groupId>org.onap.dcaegen2.services.sdk</groupId>
- <artifactId>hvvesclient-producer-api</artifactId>
- </dependency>
- <dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-test-utils</artifactId>
<version>${project.parent.version}</version>
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 4dfdb845..812afe19 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -26,12 +26,15 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
import javax.json.Json
@@ -42,18 +45,18 @@ import javax.json.JsonArray
* @since August 2018
*/
class XnfSimulator(
- private val vesClient: VesHvClient,
+ private val clientFactory: ClientFactory,
private val generatorFactory: MessageGeneratorFactory,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
+ private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
+ private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
-
val json = parseJsonArray(messageParameters).bind()
- messageParametersParser.parse(json).bind()
- .toFlux()
- .flatMap(::generateMessages)
- .let { vesClient.sendIo(it) }
+ val parameters = messageParametersParser.parse(json).bind()
+ simulationFrom(parameters)
}.fix()
private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
@@ -61,18 +64,23 @@ class XnfSimulator(
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun simulate(parameters: MessageParameters): Mono<Unit> =
when (parameters) {
- is VesEventParameters -> generatorFactory
- .createVesEventGenerator()
- .createMessageFlux(parameters)
- .map(::encodeToWireFrame)
- is WireFrameParameters -> generatorFactory
- .createWireFrameGenerator()
- .createMessageFlux(parameters)
- else -> throw IllegalStateException("Invalid parameters type")
+ is VesEventParameters -> {
+ val messages = vesEventGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create()
+ client.sendVesEvents(messages)
+ }
+ is WireFrameParameters -> {
+ val messages = wireFrameGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create(parameters.wireFrameVersion)
+ client.sendRawPayload(messages)
+ }
}
-
- private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
- WireFrameMessage(event.toByteArray())
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
new file mode 100644
index 00000000..afc157c4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class HvVesClient(private val producer: HvVesProducer) {
+
+ fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
+ producer.send(messages)
+ .then { logger.info { "Ves Events have been sent" } }
+
+
+ fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
+ producer.sendRaw(messages, PayloadType.UNDEFINED)
+ .then { logger.info { "Raw messages have been sent" } }
+
+ companion object {
+ private val logger = Logger(HvVesClient::class)
+ }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
deleted file mode 100644
index eba8ed88..00000000
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
-
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpClient
-import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-class VesHvClient(private val configuration: SimulatorConfiguration) {
-
- private val client: TcpClient = TcpClient.create()
- .addressSupplier { configuration.hvVesAddress }
- .configureSsl()
-
- private fun TcpClient.configureSsl() =
- createSslContext(configuration.security)
- .map { sslContext -> this.secure(sslContext) }
- .getOrElse { this }
-
- fun sendIo(messages: Flux<WireFrameMessage>) =
- sendRx(messages).then(Mono.just(Unit)).asIo()
-
- private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
- val complete = ReplayProcessor.create<Void>(1)
- client
- .handle { _, output -> handler(complete, messages, output) }
- .connect()
- .doOnError {
- logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
- }
- .subscribe {
- logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
- }
- return complete.then()
- }
-
- private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<WireFrameMessage>,
- nettyOutbound: NettyOutbound): Publisher<Void> {
-
- val allocator = nettyOutbound.alloc()
- val encoder = WireFrameEncoder(allocator)
- val frames = messages
- .map(encoder::encode)
- .window(XS_BUFFER_SIZE)
-
- return nettyOutbound
- .logConnectionClosed()
- .options { it.flushOnBoundary() }
- .sendGroups(frames)
- .then {
- logger.info { "Messages have been sent" }
- complete.onComplete()
- }
- .then()
- }
-
- private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
- SslContextFactory().createClientContext(config)
-
- private fun NettyOutbound.logConnectionClosed() =
- withConnection { conn ->
- conn.onDispose {
- logger.info { "Connection to ${conn.address()} has been closed" }
- }
- }
-
- companion object {
- private val logger = Logger(VesHvClient::class)
- }
-}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
new file mode 100644
index 00000000..1db66f11
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
+
+import io.vavr.collection.Set
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
+ val security: SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
new file mode 100644
index 00000000..a91fccd4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class ClientFactory(configuration: ClientConfiguration) {
+
+ private val partialConfig = ImmutableProducerOptions
+ .builder()
+ .collectorAddresses(configuration.collectorAddresses)
+ .let { producerOptions ->
+ configuration.security.keys.fold(
+ { producerOptions },
+ { producerOptions.securityKeys(it) })
+ }
+
+ fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
+ buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
+
+
+ fun create(): HvVesClient = buildClient(partialConfig)
+
+ private fun buildClient(config: ImmutableProducerOptions.Builder) =
+ HvVesClient(HvVesProducerFactory.create(config.build()))
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index ef627304..366c7e66 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -23,15 +23,17 @@ import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monad.monad
import arrow.typeclasses.binding
+import io.vavr.collection.HashSet
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
@@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
IO.monad().binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
val xnfSimulator = XnfSimulator(
- VesHvClient(config),
+ ClientFactory(clientConfig),
MessageGeneratorFactory(config.maxPayloadSizeBytes)
)
XnfApiServer(xnfSimulator, OngoingSimulations())
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
new file mode 100644
index 00000000..daf30617
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.ves.VesEventOuterClass
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+internal class HvVesClientTest : Spek({
+ describe("HvVesClient") {
+ val hvVesProducer: HvVesProducer = mock()
+ val cut = HvVesClient(hvVesProducer)
+
+ describe("handling ves events stream") {
+
+ val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
+ whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
+ cut.sendVesEvents(vesEvents)
+
+ it("should perform sending operation") {
+ verify(hvVesProducer).send(vesEvents)
+ }
+ }
+
+ describe("handling raw message stream") {
+
+ val rawMessages = Flux.empty<ByteBuffer>()
+ whenever(hvVesProducer.sendRaw(any(), any())).thenReturn(Mono.empty())
+ cut.sendRawPayload(rawMessages)
+
+ it("should perform sending operation") {
+ verify(hvVesProducer).sendRaw(eq(rawMessages), any())
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 192725b9..123f12ae 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -21,18 +21,28 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Left
import arrow.core.None
+import arrow.core.Right
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.ves.VesEventOuterClass
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.io.ByteArrayInputStream
/**
@@ -41,15 +51,15 @@ import java.io.ByteArrayInputStream
*/
internal class XnfSimulatorTest : Spek({
lateinit var cut: XnfSimulator
- lateinit var vesClient: VesHvClient
+ lateinit var clientFactory: ClientFactory
lateinit var messageParametersParser: MessageParametersParser
lateinit var generatorFactory: MessageGeneratorFactory
beforeEachTest {
- vesClient = mock()
+ clientFactory = mock()
messageParametersParser = mock()
generatorFactory = mock()
- cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser)
+ cut = XnfSimulator(clientFactory, generatorFactory, messageParametersParser)
}
describe("startSimulation") {
@@ -89,22 +99,34 @@ internal class XnfSimulatorTest : Spek({
assertThat(result).left().isEqualTo(cause)
}
- // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator
-// it("should return generated messages") {
-// // given
-// val json = "[true]".byteInputStream()
-// val messageParams = listOf<MessageParameters>()
-// val generatedMessages = Flux.empty<WireFrameMessage>()
-// val sendingIo = IO {}
-// whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
-// whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
-// whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
-//
-// // when
-// val result = cut.startSimulation(json)
-//
-// // then
-// assertThat(result).right().isSameAs(sendingIo)
-// }
+ it("should return generated ves messages") {
+ // given
+ val vesEventGenerator: VesEventGenerator = mock()
+ val vesClient: HvVesClient = mock()
+
+ val json = "[true]".byteInputStream()
+
+ val vesEventParams = VesEventParameters(
+ CommonEventHeader.getDefaultInstance(),
+ VesEventType.VALID,
+ 1
+ )
+ val messageParams = listOf(vesEventParams)
+
+ val generatedMessages = Flux.empty<VesEventOuterClass.VesEvent>()
+
+
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+ whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
+ whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
+ whenever(clientFactory.create()).thenReturn(vesClient)
+ whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+ // when
+ cut.startSimulation(json).map { it.unsafeRunSync() }
+
+ // then
+ verify(vesClient).sendVesEvents(generatedMessages)
+ }
}
}) \ No newline at end of file