From bacba429e2dd6b3048da7e75800f5ad200952599 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Tue, 19 Feb 2019 18:06:33 +0100 Subject: Use sdk/hvves-producer in hvves/xnf-simulator Change-Id: I8f493b0edd2cbaef136a22d914ad24198bb63a7f Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1253 --- .../dcaeapp/impl/MessageStreamValidation.kt | 2 +- .../dcaeapp/impl/MessageStreamValidationTest.kt | 2 +- .../dcae/collectors/veshv/utils/arrow/effects.kt | 8 +- sources/hv-collector-ves-message-generator/pom.xml | 4 + .../ves/message/generator/api/MessageGenerator.kt | 44 --------- .../ves/message/generator/api/MessageParameters.kt | 22 ++++- .../generator/factory/MessageGeneratorFactory.kt | 8 +- .../generator/generators/MessageGenerator.kt | 45 +++++++++ .../generator/generators/RawMessageGenerator.kt | 55 +++++++++++ .../generator/generators/VesEventGenerator.kt | 72 ++++++++++++++ .../generator/impl/CommonEventHeaderParser.kt | 49 ++++++++++ .../generator/impl/MessageParametersParserImpl.kt | 1 - .../ves/message/generator/impl/PayloadGenerator.kt | 46 +++++++++ .../impl/vesevent/CommonEventHeaderParser.kt | 49 ---------- .../generator/impl/vesevent/PayloadGenerator.kt | 46 --------- .../generator/impl/vesevent/VesEventGenerator.kt | 72 -------------- .../generator/impl/wireframe/WireFrameGenerator.kt | 66 ------------- .../generator/impl/raw/RawMessageGeneratorTest.kt | 66 +++++++++++++ .../impl/vesevent/CommonEventHeaderParserTest.kt | 3 +- .../impl/vesevent/PayloadGeneratorTest.kt | 3 +- .../impl/vesevent/VesEventGeneratorTest.kt | 2 + .../impl/wireframe/WireFrameGeneratorTest.kt | 81 ---------------- sources/hv-collector-xnf-simulator/pom.xml | 4 - .../veshv/simulators/xnf/impl/XnfSimulator.kt | 54 ++++++----- .../simulators/xnf/impl/adapters/HvVesClient.kt | 49 ++++++++++ .../simulators/xnf/impl/adapters/VesHvClient.kt | 106 --------------------- .../xnf/impl/config/ClientConfiguration.kt | 31 ++++++ .../simulators/xnf/impl/factory/ClientFactory.kt | 51 ++++++++++ .../dcae/collectors/veshv/simulators/xnf/main.kt | 7 +- .../dcae/collectors/veshv/main/HvVesClientTest.kt | 69 ++++++++++++++ .../dcae/collectors/veshv/main/XnfSimulatorTest.kt | 64 +++++++++---- 31 files changed, 652 insertions(+), 529 deletions(-) delete mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt create mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt create mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt create mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt create mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt create mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt delete mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt delete mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt delete mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt delete mode 100644 sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt create mode 100644 sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt delete mode 100644 sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt create mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt delete mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt create mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt create mode 100644 sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt create mode 100644 sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 5d9a7cfc..47a2d22a 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import java.io.InputStream diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 8fb1b2ef..bff7709d 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index 290ef72c..56825221 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -23,11 +23,10 @@ import arrow.core.Either import arrow.core.Left import arrow.core.Right import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.toMono import kotlin.system.exitProcess /** @@ -62,6 +61,9 @@ fun Mono.asIo() = IO.async { callback -> }) } +fun Publisher.then(callback: () -> Unit): Mono = + toMono().then(Mono.fromCallable(callback)) + fun Flux>.evaluateIo(): Flux = flatMap { io -> io.attempt().unsafeRunSync().fold( diff --git a/sources/hv-collector-ves-message-generator/pom.xml b/sources/hv-collector-ves-message-generator/pom.xml index 29e32f46..e676dfa9 100644 --- a/sources/hv-collector-ves-message-generator/pom.xml +++ b/sources/hv-collector-ves-message-generator/pom.xml @@ -73,6 +73,10 @@ ${project.parent.version} test + + org.onap.dcaegen2.services.sdk + hvvesclient-producer-api + com.google.protobuf protobuf-java-util diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt deleted file mode 100644 index 5f8638f0..00000000 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.api - -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -/** - * @author Piotr Jaszczyk - * @since June 2018 - */ -abstract class MessageGenerator { - abstract fun createMessageFlux(parameters: K): Flux - - protected fun repeatMessage(message: Mono, amount: Long): Flux = when { - amount < 0 -> repeatForever(message) - amount == 0L -> emptyMessageStream() - else -> repeatNTimes(message, amount) - } - - private fun repeatForever(message: Mono) = message.repeat() - - private fun emptyMessageStream() = Flux.empty() - - private fun repeatNTimes(message: Mono, amount: Long) = message.repeat(amount - 1) -} - diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index 82b79c0c..a7187166 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -19,20 +19,36 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion import org.onap.ves.VesEventOuterClass /** * @author Jakub Dudycz * @since June 2018 */ -abstract class MessageParameters(val amount: Long = -1) +sealed class MessageParameters /** * @author Jakub Dudycz * @since February 2019 */ class WireFrameParameters(val messageType: WireFrameType, - amount: Long = -1) : MessageParameters(amount) + val amount: Long = -1) : MessageParameters() { + + val wireFrameVersion: WireFrameVersion + get() = ImmutableWireFrameVersion.builder().let { + if (messageType == INVALID_WIRE_FRAME) + it.major(UNSUPPORTED_MAJOR_VERSION) + else + it + }.build() + + companion object { + private const val UNSUPPORTED_MAJOR_VERSION: Short = 2 + } +} /** * @author Jakub Dudycz @@ -40,4 +56,4 @@ class WireFrameParameters(val messageType: WireFrameType, */ class VesEventParameters(val commonEventHeader: VesEventOuterClass.CommonEventHeader, val messageType: VesEventType, - amount: Long = -1) : MessageParameters(amount) + val amount: Long = -1) : MessageParameters() diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt index aa473796..613f9bd1 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt @@ -19,9 +19,9 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.factory -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.PayloadGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireFrameGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator /** * @author Piotr Jaszczyk @@ -30,5 +30,5 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireF class MessageGeneratorFactory(private val maxPayloadSizeBytes: Int) { fun createVesEventGenerator() = VesEventGenerator(PayloadGenerator(), maxPayloadSizeBytes) - fun createWireFrameGenerator() = WireFrameGenerator() + fun createWireFrameGenerator() = RawMessageGenerator() } diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt new file mode 100644 index 00000000..5682cc4c --- /dev/null +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.generators + +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +abstract class MessageGenerator { + abstract fun createMessageFlux(parameters: K): Flux + + protected fun repeatMessage(message: Mono, amount: Long): Flux = when { + amount < 0 -> repeatForever(message) + amount == 0L -> emptyMessageStream() + else -> repeatNTimes(message, amount) + } + + private fun repeatForever(message: Mono) = message.repeat() + + private fun emptyMessageStream() = Flux.empty() + + private fun repeatNTimes(message: Mono, amount: Long) = message.repeat(amount - 1) +} + diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt new file mode 100644 index 00000000..9f20bd29 --- /dev/null +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.generators + +import io.netty.buffer.Unpooled +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer +import java.nio.charset.Charset + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +class RawMessageGenerator : MessageGenerator() { + + override fun createMessageFlux(parameters: WireFrameParameters): Flux = + parameters.run { + Mono + .fromCallable { createMessage(messageType) } + .let { repeatMessage(it, amount) } + } + + private fun createMessage(messageType: WireFrameType): ByteBuffer = + when (messageType) { + INVALID_WIRE_FRAME -> wrap(VesEvent.getDefaultInstance().toByteArray()) + INVALID_GPB_DATA -> wrap("invalid vesEvent".toByteArray(Charset.defaultCharset())) + } + + private fun wrap(bytes: ByteArray) = Unpooled.wrappedBuffer(bytes).nioBuffer() + + +} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt new file mode 100644 index 00000000..a6669e7d --- /dev/null +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.generators + +import com.google.protobuf.ByteString +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.TOO_BIG_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +class VesEventGenerator internal constructor( + private val payloadGenerator: PayloadGenerator, + private val maxPayloadSizeBytes: Int +) : MessageGenerator() { + + override fun createMessageFlux(parameters: VesEventParameters): Flux = + parameters.run { + Mono + .fromCallable { createMessage(commonEventHeader, messageType) } + .let { repeatMessage(it, amount) } + } + + private fun createMessage(commonEventHeader: CommonEventHeader, messageType: VesEventType): VesEvent = + when (messageType) { + VALID -> vesEvent(commonEventHeader, payloadGenerator.generatePayload()) + TOO_BIG_PAYLOAD -> vesEvent(commonEventHeader, oversizedPayload()) + FIXED_PAYLOAD -> vesEvent(commonEventHeader, fixedPayload()) + } + + private fun vesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = + VesEvent.newBuilder() + .setCommonEventHeader(commonEventHeader) + .setEventFields(payload) + .build() + + private fun oversizedPayload(): ByteString = + payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1) + + private fun fixedPayload(): ByteString = + payloadGenerator.generateRawPayload(FIXED_PAYLOAD_SIZE) + + companion object { + const val FIXED_PAYLOAD_SIZE = 100 + } +} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt new file mode 100644 index 00000000..f0ae4607 --- /dev/null +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.impl + +import arrow.core.Option +import com.google.protobuf.util.JsonFormat +import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import javax.json.JsonObject + +/** + * @author Jakub Dudycz + * @since July 2018 + */ +internal class CommonEventHeaderParser { + fun parse(json: JsonObject): Option = Option.fromNullable( + CommonEventHeader.newBuilder() + .apply { JsonFormat.parser().merge(json.toString(), this) } + .build() + .takeUnless { !isValid(it) } + ) + + + private fun isValid(header: CommonEventHeader): Boolean = + allMandatoryFieldsArePresent(header) + + + private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = + headerRequiredFieldDescriptors + .all { fieldDescriptor -> header.hasField(fieldDescriptor) } + +} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt index 174a01fd..7d6087c2 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt @@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.Com import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.Companion.isWireFrameType -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.CommonEventHeaderParser import javax.json.JsonArray import javax.json.JsonObject import javax.json.JsonValue diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt new file mode 100644 index 00000000..5891e7bc --- /dev/null +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.impl + +import com.google.protobuf.ByteString +import java.util.* +import kotlin.streams.asSequence + +internal class PayloadGenerator { + + private val randomGenerator = Random() + + fun generateRawPayload(size: Int): ByteString = + ByteString.copyFrom(ByteArray(size)) + + fun generatePayload(numOfCountMeasurements: Long = 2): ByteString = + ByteString.copyFrom( + randomGenerator + .ints(numOfCountMeasurements, MIN_BYTE_VALUE, MAX_BYTE_VALUE) + .asSequence() + .toString() + .toByteArray() + ) + + companion object { + private const val MIN_BYTE_VALUE = 0 + private const val MAX_BYTE_VALUE = 256 + } +} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt deleted file mode 100644 index 05938924..00000000 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent - -import arrow.core.Option -import com.google.protobuf.util.JsonFormat -import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import javax.json.JsonObject - -/** - * @author Jakub Dudycz - * @since July 2018 - */ -class CommonEventHeaderParser { - fun parse(json: JsonObject): Option = Option.fromNullable( - CommonEventHeader.newBuilder() - .apply { JsonFormat.parser().merge(json.toString(), this) } - .build() - .takeUnless { !isValid(it) } - ) - - - private fun isValid(header: CommonEventHeader): Boolean = - allMandatoryFieldsArePresent(header) - - - private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = - headerRequiredFieldDescriptors - .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - -} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt deleted file mode 100644 index ed521054..00000000 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent - -import com.google.protobuf.ByteString -import java.util.* -import kotlin.streams.asSequence - -internal class PayloadGenerator { - - private val randomGenerator = Random() - - fun generateRawPayload(size: Int): ByteString = - ByteString.copyFrom(ByteArray(size)) - - fun generatePayload(numOfCountMeasurements: Long = 2): ByteString = - ByteString.copyFrom( - randomGenerator - .ints(numOfCountMeasurements, MIN_BYTE_VALUE, MAX_BYTE_VALUE) - .asSequence() - .toString() - .toByteArray() - ) - - companion object { - private const val MIN_BYTE_VALUE = 0 - private const val MAX_BYTE_VALUE = 256 - } -} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt deleted file mode 100644 index 7abd6054..00000000 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent - -import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType -import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD -import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.TOO_BIG_PAYLOAD -import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -/** - * @author Jakub Dudycz - * @since June 2018 - */ -class VesEventGenerator internal constructor( - private val payloadGenerator: PayloadGenerator, - private val maxPayloadSizeBytes: Int -) : MessageGenerator() { - - override fun createMessageFlux(parameters: VesEventParameters): Flux = - parameters.run { - Mono - .fromCallable { createMessage(commonEventHeader, messageType) } - .let { repeatMessage(it, amount) } - } - - private fun createMessage(commonEventHeader: CommonEventHeader, messageType: VesEventType): VesEvent = - when (messageType) { - VALID -> vesEvent(commonEventHeader, payloadGenerator.generatePayload()) - TOO_BIG_PAYLOAD -> vesEvent(commonEventHeader, oversizedPayload()) - FIXED_PAYLOAD -> vesEvent(commonEventHeader, fixedPayload()) - } - - private fun vesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = - VesEvent.newBuilder() - .setCommonEventHeader(commonEventHeader) - .setEventFields(payload) - .build() - - private fun oversizedPayload(): ByteString = - payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1) - - private fun fixedPayload(): ByteString = - payloadGenerator.generateRawPayload(FIXED_PAYLOAD_SIZE) - - companion object { - const val FIXED_PAYLOAD_SIZE = 100 - } -} diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt deleted file mode 100644 index ad45bc5c..00000000 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe - -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.PayloadContentType -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType -import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA -import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.nio.charset.Charset - -/** - * @author Jakub Dudycz - * @since February 2019 - */ -class WireFrameGenerator : MessageGenerator() { - - override fun createMessageFlux(parameters: WireFrameParameters): Flux = - parameters.run { - Mono - .fromCallable { createMessage(messageType) } - .let { repeatMessage(it, amount) } - } - - private fun createMessage(messageType: WireFrameType): WireFrameMessage = - when (messageType) { - INVALID_WIRE_FRAME -> { - val payload = ByteData(VesEvent.getDefaultInstance().toByteArray()) - WireFrameMessage( - payload, - UNSUPPORTED_VERSION, - UNSUPPORTED_VERSION, - PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, - payload.size()) - } - INVALID_GPB_DATA -> - WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) - } - - companion object { - private const val UNSUPPORTED_VERSION: Short = 2 - } -} diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt new file mode 100644 index 00000000..8c8c8357 --- /dev/null +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.impl.raw + +import com.google.protobuf.InvalidProtocolBufferException +import org.assertj.core.api.Assertions +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator +import org.onap.ves.VesEventOuterClass +import reactor.test.test +import java.nio.ByteBuffer +import java.nio.charset.Charset + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +object WireFrameGeneratorTest : Spek({ + + val maxPayloadSizeBytes = 1024 + val cut = RawMessageGenerator() + + on("message type requesting invalid GPB data ") { + it("should createVesEventGenerator flux of messages with invalid payload") { + cut + .createMessageFlux(WireFrameParameters( + WireFrameType.INVALID_GPB_DATA, 1 + )) + .test() + .assertNext { + val decodedBytes = it.array().toString(Charset.defaultCharset()) + assertThat(decodedBytes).isEqualTo("invalid vesEvent") + assertThat(it.capacity()).isLessThan(maxPayloadSizeBytes) + + Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { extractCommonEventHeader(it) } + } + .verifyComplete() + } + } +}) + +private fun extractCommonEventHeader(bytes: ByteBuffer): VesEventOuterClass.CommonEventHeader = + VesEventOuterClass.VesEvent.parseFrom(bytes).commonEventHeader diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt index 04222d1e..09635afd 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParser import org.onap.ves.VesEventOuterClass.CommonEventHeader import java.io.ByteArrayInputStream import javax.json.Json diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt index 2d77bb9f..4558bb1a 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator object PayloadGeneratorTest : Spek({ diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt index 2f13c52e..fa99bfb6 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt @@ -28,8 +28,10 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator import reactor.test.test /** diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt deleted file mode 100644 index f8c84c39..00000000 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2010 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe - -import com.google.protobuf.InvalidProtocolBufferException -import org.assertj.core.api.Assertions -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType -import org.onap.ves.VesEventOuterClass -import reactor.test.test -import kotlin.test.assertTrue - -/** - * @author Jakub Dudycz - * @since February 2019 - */ -object WireFrameGeneratorTest : Spek({ - - val maxPayloadSizeBytes = 1024 - val cut = WireFrameGenerator() - - on("message type requesting invalid GPB data ") { - it("should createVesEventGenerator flux of messages with invalid payload") { - cut - .createMessageFlux(WireFrameParameters( - WireFrameType.INVALID_GPB_DATA, 1 - )) - .test() - .assertNext { - assertTrue(it.validate().isRight()) - assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) - Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java) - .isThrownBy { extractCommonEventHeader(it.payload) } - } - .verifyComplete() - } - } - - on("message type requesting invalid wire frame ") { - it("should createVesEventGenerator flux of messages with invalid version") { - cut - .createMessageFlux(WireFrameParameters( - WireFrameType.INVALID_WIRE_FRAME, 1 - )) - .test() - .assertNext { - assertTrue(it.validate().isLeft()) - assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) - assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR) - } - .verifyComplete() - } - } - -}) - -fun extractCommonEventHeader(bytes: ByteData): VesEventOuterClass.CommonEventHeader = - VesEventOuterClass.VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml index a8134100..69ca53b2 100644 --- a/sources/hv-collector-xnf-simulator/pom.xml +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -109,10 +109,6 @@ org.onap.dcaegen2.services.sdk hvvesclient-producer-impl - - org.onap.dcaegen2.services.sdk - hvvesclient-producer-api - ${project.parent.groupId} hv-collector-test-utils diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 4dfdb845..812afe19 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,12 +26,15 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient -import org.onap.dcae.collectors.veshv.ves.message.generator.api.* +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream import javax.json.Json @@ -42,18 +45,18 @@ import javax.json.JsonArray * @since August 2018 */ class XnfSimulator( - private val vesClient: VesHvClient, + private val clientFactory: ClientFactory, private val generatorFactory: MessageGeneratorFactory, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } + private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + fun startSimulation(messageParameters: InputStream): Either> = Either.monad().binding { - val json = parseJsonArray(messageParameters).bind() - messageParametersParser.parse(json).bind() - .toFlux() - .flatMap(::generateMessages) - .let { vesClient.sendIo(it) } + val parameters = messageParametersParser.parse(json).bind() + simulationFrom(parameters) }.fix() private fun parseJsonArray(jsonStream: InputStream): Either = @@ -61,18 +64,23 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun generateMessages(parameters: MessageParameters): Flux = + private fun simulationFrom(parameters: List): IO = parameters + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun simulate(parameters: MessageParameters): Mono = when (parameters) { - is VesEventParameters -> generatorFactory - .createVesEventGenerator() - .createMessageFlux(parameters) - .map(::encodeToWireFrame) - is WireFrameParameters -> generatorFactory - .createWireFrameGenerator() - .createMessageFlux(parameters) - else -> throw IllegalStateException("Invalid parameters type") + is VesEventParameters -> { + val messages = vesEventGenerator.createMessageFlux(parameters) + val client = clientFactory.create() + client.sendVesEvents(messages) + } + is WireFrameParameters -> { + val messages = wireFrameGenerator.createMessageFlux(parameters) + val client = clientFactory.create(parameters.wireFrameVersion) + client.sendRawPayload(messages) + } } - - private fun encodeToWireFrame(event: VesEvent): WireFrameMessage = - WireFrameMessage(event.toByteArray()) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt new file mode 100644 index 00000000..afc157c4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +class HvVesClient(private val producer: HvVesProducer) { + + fun sendVesEvents(messages: Flux): Mono = + producer.send(messages) + .then { logger.info { "Ves Events have been sent" } } + + + fun sendRawPayload(messages: Flux): Mono = + producer.sendRaw(messages, PayloadType.UNDEFINED) + .then { logger.info { "Raw messages have been sent" } } + + companion object { + private val logger = Logger(HvVesClient::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt deleted file mode 100644 index eba8ed88..00000000 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters - -import arrow.core.Option -import arrow.core.getOrElse -import io.netty.handler.ssl.SslContext -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpClient -import reactor.util.concurrent.Queues.XS_BUFFER_SIZE - -/** - * @author Jakub Dudycz - * @since June 2018 - */ -class VesHvClient(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.create() - .addressSupplier { configuration.hvVesAddress } - .configureSsl() - - private fun TcpClient.configureSsl() = - createSslContext(configuration.security) - .map { sslContext -> this.secure(sslContext) } - .getOrElse { this } - - fun sendIo(messages: Flux) = - sendRx(messages).then(Mono.just(Unit)).asIo() - - private fun sendRx(messages: Flux): Mono { - val complete = ReplayProcessor.create(1) - client - .handle { _, output -> handler(complete, messages, output) } - .connect() - .doOnError { - logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" } - } - .subscribe { - logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" } - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor, - messages: Flux, - nettyOutbound: NettyOutbound): Publisher { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(XS_BUFFER_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .then { - logger.info { "Messages have been sent" } - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): Option = - SslContextFactory().createClientContext(config) - - private fun NettyOutbound.logConnectionClosed() = - withConnection { conn -> - conn.onDispose { - logger.info { "Connection to ${conn.address()} has been closed" } - } - } - - companion object { - private val logger = Logger(VesHvClient::class) - } -} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt new file mode 100644 index 00000000..1db66f11 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import io.vavr.collection.Set +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +data class ClientConfiguration(val collectorAddresses: Set, + val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt new file mode 100644 index 00000000..a91fccd4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory + +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +class ClientFactory(configuration: ClientConfiguration) { + + private val partialConfig = ImmutableProducerOptions + .builder() + .collectorAddresses(configuration.collectorAddresses) + .let { producerOptions -> + configuration.security.keys.fold( + { producerOptions }, + { producerOptions.securityKeys(it) }) + } + + fun create(wireFrameVersion: WireFrameVersion): HvVesClient = + buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) + + + fun create(): HvVesClient = buildClient(partialConfig) + + private fun buildClient(config: ImmutableProducerOptions.Builder) = + HvVesClient(HvVesProducerFactory.create(config.build())) +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index ef627304..366c7e66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -23,15 +23,17 @@ import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monad.monad import arrow.typeclasses.binding +import io.vavr.collection.HashSet import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried @@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) val xnfSimulator = XnfSimulator( - VesHvClient(config), + ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) ) XnfApiServer(xnfSimulator, OngoingSimulations()) diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt new file mode 100644 index 00000000..daf30617 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.ves.VesEventOuterClass +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + + +/** + * @author Jakub Dudycz + * @since February 2019 + */ +internal class HvVesClientTest : Spek({ + describe("HvVesClient") { + val hvVesProducer: HvVesProducer = mock() + val cut = HvVesClient(hvVesProducer) + + describe("handling ves events stream") { + + val vesEvents = Flux.empty() + whenever(hvVesProducer.send(any())).thenReturn(Mono.empty()) + cut.sendVesEvents(vesEvents) + + it("should perform sending operation") { + verify(hvVesProducer).send(vesEvents) + } + } + + describe("handling raw message stream") { + + val rawMessages = Flux.empty() + whenever(hvVesProducer.sendRaw(any(), any())).thenReturn(Mono.empty()) + cut.sendRawPayload(rawMessages) + + it("should perform sending operation") { + verify(hvVesProducer).sendRaw(eq(rawMessages), any()) + } + } + } +}) \ No newline at end of file diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 192725b9..123f12ae 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -21,18 +21,28 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Left import arrow.core.None +import arrow.core.Right import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.ves.VesEventOuterClass +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.io.ByteArrayInputStream /** @@ -41,15 +51,15 @@ import java.io.ByteArrayInputStream */ internal class XnfSimulatorTest : Spek({ lateinit var cut: XnfSimulator - lateinit var vesClient: VesHvClient + lateinit var clientFactory: ClientFactory lateinit var messageParametersParser: MessageParametersParser lateinit var generatorFactory: MessageGeneratorFactory beforeEachTest { - vesClient = mock() + clientFactory = mock() messageParametersParser = mock() generatorFactory = mock() - cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser) + cut = XnfSimulator(clientFactory, generatorFactory, messageParametersParser) } describe("startSimulation") { @@ -89,22 +99,34 @@ internal class XnfSimulatorTest : Spek({ assertThat(result).left().isEqualTo(cause) } - // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator -// it("should return generated messages") { -// // given -// val json = "[true]".byteInputStream() -// val messageParams = listOf() -// val generatedMessages = Flux.empty() -// val sendingIo = IO {} -// whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) -// whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) -// whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) -// -// // when -// val result = cut.startSimulation(json) -// -// // then -// assertThat(result).right().isSameAs(sendingIo) -// } + it("should return generated ves messages") { + // given + val vesEventGenerator: VesEventGenerator = mock() + val vesClient: HvVesClient = mock() + + val json = "[true]".byteInputStream() + + val vesEventParams = VesEventParameters( + CommonEventHeader.getDefaultInstance(), + VesEventType.VALID, + 1 + ) + val messageParams = listOf(vesEventParams) + + val generatedMessages = Flux.empty() + + + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator) + whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages) + whenever(clientFactory.create()).thenReturn(vesClient) + whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit)) + + // when + cut.startSimulation(json).map { it.unsafeRunSync() } + + // then + verify(vesClient).sendVesEvents(generatedMessages) + } } }) \ No newline at end of file -- cgit 1.2.3-korg