diff options
25 files changed, 351 insertions, 228 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 5d9a7cfc..47a2d22a 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import java.io.InputStream diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 8fb1b2ef..bff7709d 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index 290ef72c..56825221 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -23,11 +23,10 @@ import arrow.core.Either import arrow.core.Left import arrow.core.Right import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.toMono import kotlin.system.exitProcess /** @@ -62,6 +61,9 @@ fun <T> Mono<T>.asIo() = IO.async<T> { callback -> }) } +fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> = + toMono().then(Mono.fromCallable(callback)) + fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = flatMap { io -> io.attempt().unsafeRunSync().fold( diff --git a/sources/hv-collector-ves-message-generator/pom.xml b/sources/hv-collector-ves-message-generator/pom.xml index 29e32f46..e676dfa9 100644 --- a/sources/hv-collector-ves-message-generator/pom.xml +++ b/sources/hv-collector-ves-message-generator/pom.xml @@ -74,6 +74,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.onap.dcaegen2.services.sdk</groupId> + <artifactId>hvvesclient-producer-api</artifactId> + </dependency> + <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> </dependency> diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index 82b79c0c..a7187166 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -19,20 +19,36 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion import org.onap.ves.VesEventOuterClass /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -abstract class MessageParameters(val amount: Long = -1) +sealed class MessageParameters /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since February 2019 */ class WireFrameParameters(val messageType: WireFrameType, - amount: Long = -1) : MessageParameters(amount) + val amount: Long = -1) : MessageParameters() { + + val wireFrameVersion: WireFrameVersion + get() = ImmutableWireFrameVersion.builder().let { + if (messageType == INVALID_WIRE_FRAME) + it.major(UNSUPPORTED_MAJOR_VERSION) + else + it + }.build() + + companion object { + private const val UNSUPPORTED_MAJOR_VERSION: Short = 2 + } +} /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -40,4 +56,4 @@ class WireFrameParameters(val messageType: WireFrameType, */ class VesEventParameters(val commonEventHeader: VesEventOuterClass.CommonEventHeader, val messageType: VesEventType, - amount: Long = -1) : MessageParameters(amount) + val amount: Long = -1) : MessageParameters() diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt index aa473796..613f9bd1 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt @@ -19,9 +19,9 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.factory -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.PayloadGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireFrameGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -30,5 +30,5 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireF class MessageGeneratorFactory(private val maxPayloadSizeBytes: Int) { fun createVesEventGenerator() = VesEventGenerator(PayloadGenerator(), maxPayloadSizeBytes) - fun createWireFrameGenerator() = WireFrameGenerator() + fun createWireFrameGenerator() = RawMessageGenerator() } diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt index 5f8638f0..5682cc4c 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt @@ -17,8 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.api +package org.onap.dcae.collectors.veshv.ves.message.generator.generators +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import reactor.core.publisher.Flux import reactor.core.publisher.Mono diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt index ad45bc5c..9f20bd29 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt @@ -17,12 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe +package org.onap.dcae.collectors.veshv.ves.message.generator.generators -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.PayloadContentType -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import io.netty.buffer.Unpooled import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA @@ -30,37 +27,29 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.IN import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.nio.ByteBuffer import java.nio.charset.Charset /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since February 2019 */ -class WireFrameGenerator : MessageGenerator<WireFrameParameters, WireFrameMessage>() { +class RawMessageGenerator : MessageGenerator<WireFrameParameters, ByteBuffer>() { - override fun createMessageFlux(parameters: WireFrameParameters): Flux<WireFrameMessage> = + override fun createMessageFlux(parameters: WireFrameParameters): Flux<ByteBuffer> = parameters.run { Mono .fromCallable { createMessage(messageType) } .let { repeatMessage(it, amount) } } - private fun createMessage(messageType: WireFrameType): WireFrameMessage = + private fun createMessage(messageType: WireFrameType): ByteBuffer = when (messageType) { - INVALID_WIRE_FRAME -> { - val payload = ByteData(VesEvent.getDefaultInstance().toByteArray()) - WireFrameMessage( - payload, - UNSUPPORTED_VERSION, - UNSUPPORTED_VERSION, - PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, - payload.size()) - } - INVALID_GPB_DATA -> - WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) + INVALID_WIRE_FRAME -> wrap(VesEvent.getDefaultInstance().toByteArray()) + INVALID_GPB_DATA -> wrap("invalid vesEvent".toByteArray(Charset.defaultCharset())) } - companion object { - private const val UNSUPPORTED_VERSION: Short = 2 - } + private fun wrap(bytes: ByteArray) = Unpooled.wrappedBuffer(bytes).nioBuffer() + + } diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt index 7abd6054..a6669e7d 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt @@ -17,15 +17,15 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent +package org.onap.dcae.collectors.veshv.ves.message.generator.generators import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.TOO_BIG_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt index 05938924..f0ae4607 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent +package org.onap.dcae.collectors.veshv.ves.message.generator.impl import arrow.core.Option import com.google.protobuf.util.JsonFormat @@ -29,7 +29,7 @@ import javax.json.JsonObject * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since July 2018 */ -class CommonEventHeaderParser { +internal class CommonEventHeaderParser { fun parse(json: JsonObject): Option<CommonEventHeader> = Option.fromNullable( CommonEventHeader.newBuilder() .apply { JsonFormat.parser().merge(json.toString(), this) } diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt index 174a01fd..7d6087c2 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt @@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.Com import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.Companion.isWireFrameType -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.CommonEventHeaderParser import javax.json.JsonArray import javax.json.JsonObject import javax.json.JsonValue diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt index ed521054..5891e7bc 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent +package org.onap.dcae.collectors.veshv.ves.message.generator.impl import com.google.protobuf.ByteString import java.util.* diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt index f8c84c39..8c8c8357 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2010 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe +package org.onap.dcae.collectors.veshv.ves.message.generator.impl.raw import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions @@ -25,13 +25,13 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator import org.onap.ves.VesEventOuterClass import reactor.test.test -import kotlin.test.assertTrue +import java.nio.ByteBuffer +import java.nio.charset.Charset /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -40,7 +40,7 @@ import kotlin.test.assertTrue object WireFrameGeneratorTest : Spek({ val maxPayloadSizeBytes = 1024 - val cut = WireFrameGenerator() + val cut = RawMessageGenerator() on("message type requesting invalid GPB data ") { it("should createVesEventGenerator flux of messages with invalid payload") { @@ -50,32 +50,17 @@ object WireFrameGeneratorTest : Spek({ )) .test() .assertNext { - assertTrue(it.validate().isRight()) - assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) - Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java) - .isThrownBy { extractCommonEventHeader(it.payload) } - } - .verifyComplete() - } - } + val decodedBytes = it.array().toString(Charset.defaultCharset()) + assertThat(decodedBytes).isEqualTo("invalid vesEvent") + assertThat(it.capacity()).isLessThan(maxPayloadSizeBytes) - on("message type requesting invalid wire frame ") { - it("should createVesEventGenerator flux of messages with invalid version") { - cut - .createMessageFlux(WireFrameParameters( - WireFrameType.INVALID_WIRE_FRAME, 1 - )) - .test() - .assertNext { - assertTrue(it.validate().isLeft()) - assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) - assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR) + Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { extractCommonEventHeader(it) } } .verifyComplete() } } - }) -fun extractCommonEventHeader(bytes: ByteData): VesEventOuterClass.CommonEventHeader = - VesEventOuterClass.VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader +private fun extractCommonEventHeader(bytes: ByteBuffer): VesEventOuterClass.CommonEventHeader = + VesEventOuterClass.VesEvent.parseFrom(bytes).commonEventHeader diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt index 04222d1e..09635afd 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParser import org.onap.ves.VesEventOuterClass.CommonEventHeader import java.io.ByteArrayInputStream import javax.json.Json diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt index 2d77bb9f..4558bb1a 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator object PayloadGeneratorTest : Spek({ diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt index 2f13c52e..fa99bfb6 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt @@ -28,8 +28,10 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator import reactor.test.test /** diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml index a8134100..69ca53b2 100644 --- a/sources/hv-collector-xnf-simulator/pom.xml +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -110,10 +110,6 @@ <artifactId>hvvesclient-producer-impl</artifactId> </dependency> <dependency> - <groupId>org.onap.dcaegen2.services.sdk</groupId> - <artifactId>hvvesclient-producer-api</artifactId> - </dependency> - <dependency> <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-test-utils</artifactId> <version>${project.parent.version}</version> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 4dfdb845..812afe19 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,12 +26,15 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient -import org.onap.dcae.collectors.veshv.ves.message.generator.api.* +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream import javax.json.Json @@ -42,18 +45,18 @@ import javax.json.JsonArray * @since August 2018 */ class XnfSimulator( - private val vesClient: VesHvClient, + private val clientFactory: ClientFactory, private val generatorFactory: MessageGeneratorFactory, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } + private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { - val json = parseJsonArray(messageParameters).bind() - messageParametersParser.parse(json).bind() - .toFlux() - .flatMap(::generateMessages) - .let { vesClient.sendIo(it) } + val parameters = messageParametersParser.parse(json).bind() + simulationFrom(parameters) }.fix() private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> = @@ -61,18 +64,23 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> = + private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun simulate(parameters: MessageParameters): Mono<Unit> = when (parameters) { - is VesEventParameters -> generatorFactory - .createVesEventGenerator() - .createMessageFlux(parameters) - .map(::encodeToWireFrame) - is WireFrameParameters -> generatorFactory - .createWireFrameGenerator() - .createMessageFlux(parameters) - else -> throw IllegalStateException("Invalid parameters type") + is VesEventParameters -> { + val messages = vesEventGenerator.createMessageFlux(parameters) + val client = clientFactory.create() + client.sendVesEvents(messages) + } + is WireFrameParameters -> { + val messages = wireFrameGenerator.createMessageFlux(parameters) + val client = clientFactory.create(parameters.wireFrameVersion) + client.sendRawPayload(messages) + } } - - private fun encodeToWireFrame(event: VesEvent): WireFrameMessage = - WireFrameMessage(event.toByteArray()) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt new file mode 100644 index 00000000..afc157c4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class HvVesClient(private val producer: HvVesProducer) { + + fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> = + producer.send(messages) + .then { logger.info { "Ves Events have been sent" } } + + + fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> = + producer.sendRaw(messages, PayloadType.UNDEFINED) + .then { logger.info { "Raw messages have been sent" } } + + companion object { + private val logger = Logger(HvVesClient::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt deleted file mode 100644 index eba8ed88..00000000 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters - -import arrow.core.Option -import arrow.core.getOrElse -import io.netty.handler.ssl.SslContext -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpClient -import reactor.util.concurrent.Queues.XS_BUFFER_SIZE - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since June 2018 - */ -class VesHvClient(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.create() - .addressSupplier { configuration.hvVesAddress } - .configureSsl() - - private fun TcpClient.configureSsl() = - createSslContext(configuration.security) - .map { sslContext -> this.secure(sslContext) } - .getOrElse { this } - - fun sendIo(messages: Flux<WireFrameMessage>) = - sendRx(messages).then(Mono.just(Unit)).asIo() - - private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { - val complete = ReplayProcessor.create<Void>(1) - client - .handle { _, output -> handler(complete, messages, output) } - .connect() - .doOnError { - logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" } - } - .subscribe { - logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" } - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<WireFrameMessage>, - nettyOutbound: NettyOutbound): Publisher<Void> { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(XS_BUFFER_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .then { - logger.info { "Messages have been sent" } - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): Option<SslContext> = - SslContextFactory().createClientContext(config) - - private fun NettyOutbound.logConnectionClosed() = - withConnection { conn -> - conn.onDispose { - logger.info { "Connection to ${conn.address()} has been closed" } - } - } - - companion object { - private val logger = Logger(VesHvClient::class) - } -} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt new file mode 100644 index 00000000..1db66f11 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import io.vavr.collection.Set +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since February 2019 + */ +data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>, + val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt new file mode 100644 index 00000000..a91fccd4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory + +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since February 2019 + */ +class ClientFactory(configuration: ClientConfiguration) { + + private val partialConfig = ImmutableProducerOptions + .builder() + .collectorAddresses(configuration.collectorAddresses) + .let { producerOptions -> + configuration.security.keys.fold( + { producerOptions }, + { producerOptions.securityKeys(it) }) + } + + fun create(wireFrameVersion: WireFrameVersion): HvVesClient = + buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) + + + fun create(): HvVesClient = buildClient(partialConfig) + + private fun buildClient(config: ImmutableProducerOptions.Builder) = + HvVesClient(HvVesProducerFactory.create(config.build())) +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index ef627304..366c7e66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -23,15 +23,17 @@ import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monad.monad import arrow.typeclasses.binding +import io.vavr.collection.HashSet import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried @@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) val xnfSimulator = XnfSimulator( - VesHvClient(config), + ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) ) XnfApiServer(xnfSimulator, OngoingSimulations()) diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt new file mode 100644 index 00000000..daf30617 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.ves.VesEventOuterClass +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since February 2019 + */ +internal class HvVesClientTest : Spek({ + describe("HvVesClient") { + val hvVesProducer: HvVesProducer = mock() + val cut = HvVesClient(hvVesProducer) + + describe("handling ves events stream") { + + val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>() + whenever(hvVesProducer.send(any())).thenReturn(Mono.empty()) + cut.sendVesEvents(vesEvents) + + it("should perform sending operation") { + verify(hvVesProducer).send(vesEvents) + } + } + + describe("handling raw message stream") { + + val rawMessages = Flux.empty<ByteBuffer>() + whenever(hvVesProducer.sendRaw(any(), any())).thenReturn(Mono.empty()) + cut.sendRawPayload(rawMessages) + + it("should perform sending operation") { + verify(hvVesProducer).sendRaw(eq(rawMessages), any()) + } + } + } +})
\ No newline at end of file diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 192725b9..123f12ae 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -21,18 +21,28 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Left import arrow.core.None +import arrow.core.Right import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.ves.VesEventOuterClass +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.io.ByteArrayInputStream /** @@ -41,15 +51,15 @@ import java.io.ByteArrayInputStream */ internal class XnfSimulatorTest : Spek({ lateinit var cut: XnfSimulator - lateinit var vesClient: VesHvClient + lateinit var clientFactory: ClientFactory lateinit var messageParametersParser: MessageParametersParser lateinit var generatorFactory: MessageGeneratorFactory beforeEachTest { - vesClient = mock() + clientFactory = mock() messageParametersParser = mock() generatorFactory = mock() - cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser) + cut = XnfSimulator(clientFactory, generatorFactory, messageParametersParser) } describe("startSimulation") { @@ -89,22 +99,34 @@ internal class XnfSimulatorTest : Spek({ assertThat(result).left().isEqualTo(cause) } - // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator -// it("should return generated messages") { -// // given -// val json = "[true]".byteInputStream() -// val messageParams = listOf<MessageParameters>() -// val generatedMessages = Flux.empty<WireFrameMessage>() -// val sendingIo = IO {} -// whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) -// whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) -// whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) -// -// // when -// val result = cut.startSimulation(json) -// -// // then -// assertThat(result).right().isSameAs(sendingIo) -// } + it("should return generated ves messages") { + // given + val vesEventGenerator: VesEventGenerator = mock() + val vesClient: HvVesClient = mock() + + val json = "[true]".byteInputStream() + + val vesEventParams = VesEventParameters( + CommonEventHeader.getDefaultInstance(), + VesEventType.VALID, + 1 + ) + val messageParams = listOf(vesEventParams) + + val generatedMessages = Flux.empty<VesEventOuterClass.VesEvent>() + + + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator) + whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages) + whenever(clientFactory.create()).thenReturn(vesClient) + whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit)) + + // when + cut.startSimulation(json).map { it.unsafeRunSync() } + + // then + verify(vesClient).sendVesEvents(generatedMessages) + } } })
\ No newline at end of file |