From f4a58fbdbcaaba92a4daae0e2807536c3da4c857 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Wed, 18 Jul 2018 14:33:10 +0200 Subject: Support scenarios for continuous streaming test Added support for below scenarios -too big payloads -invalid wire frames -invalid GPB data -unsupported domains Changed input json for xnf simulator endpoint Closes ONAP-500 Change-Id: I19e84a76cef501e274ea8152f3c33c95dddcaac9 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-601 --- .../impl/impl/MessageGeneratorImplTest.kt | 167 ++++++++++++++++----- 1 file changed, 127 insertions(+), 40 deletions(-) (limited to 'hv-collector-ves-message-generator/src/test') diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index 07027173..fb144616 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -19,66 +19,153 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl -import com.google.protobuf.ByteString +import com.google.protobuf.InvalidProtocolBufferException +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters -import org.onap.ves.VesEventV5 -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.* import reactor.test.test -const val SAMPLE_START_EPOCH: Long = 120034455 -const val SAMPLE_LAST_EPOCH: Long = 120034455 - /** * @author Jakub Dudycz * @since June 2018 */ object MessageGeneratorImplTest : Spek({ describe("message factory") { - val generator = MessageGenerator.INSTANCE + given("single message parameters") { + on("messages amount not specified in parameters") { + it("should create infinite flux") { + val limit = 1000L + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID))) + .take(limit) + .test() + .expectNextCount(limit) + .verifyComplete() + } + } + on("messages amount specified in parameters") { + it("should create message flux of specified size") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5))) + .test() + .expectNextCount(5) + .verifyComplete() + } + } + on("message type requesting valid message") { + it("should create flux of valid messages with given domain") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting too big payload") { + it("should create flux of messages with given domain and payload exceeding threshold") { - given("only common header") { - it("should return infinite flux") { - val limit = 1000L - generator.createMessageFlux(getSampleMessageParameters()).take(limit).test() - .expectNextCount(limit) - .verifyComplete() + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting unsupported domain") { + it("should create flux of messages with domain other than HVRANMEAS") { + + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting invalid GPB data ") { + it("should create flux of messages with invalid payload") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { extractCommonEventHeader(it.payload) } + } + .verifyComplete() + } + } + on("message type requesting invalid wire frame ") { + it("should create flux of messages with invalid version") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isFalse() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION) + } + .verifyComplete() + } } } - given("common header and messages amount") { - it("should return message flux of specified size") { - generator.createMessageFlux((getSampleMessageParameters(5))).test() - .expectNextCount(5) + given("list of message parameters") { + it("should create concatenated flux of messages") { + val singleFluxSize = 5L + val messageParameters = listOf( + MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize), + MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize), + MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize) + ) + generator.createMessageFlux(messageParameters) + .test() + .assertNext { + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .expectNextCount(singleFluxSize - 1) + .assertNext { + assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + } + .expectNextCount(singleFluxSize - 1) + .assertNext { + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT) + } + .expectNextCount(singleFluxSize - 1) .verifyComplete() } } } }) -fun getSampleMessageParameters(amount: Long = -1): MessageParameters { - val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() - .setVersion("sample-version") - .setDomain(HVRANMEAS) - .setSequence(1) - .setPriority(MEDIUM) - .setEventId("sample-event-id") - .setEventName("sample-event-name") - .setEventType("sample-event-type") - .setStartEpochMicrosec(SAMPLE_START_EPOCH) - .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) - .setNfNamingCode("sample-nf-naming-code") - .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) - .setSourceId(ByteString.copyFromUtf8("sample-source-id")) - .setSourceName("sample-source-name") - .build() - - return MessageParameters(commonHeader, amount) -} +fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { + return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader +} \ No newline at end of file -- cgit 1.2.3-korg