diff options
Diffstat (limited to 'hv-collector-ves-message-generator/src')
6 files changed, 247 insertions, 80 deletions
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt index e52db848..7407f692 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt @@ -20,20 +20,16 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator -import org.onap.ves.VesEventV5 import reactor.core.publisher.Flux -import javax.json.JsonObject /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> - fun parseCommonHeader(json: JsonObject): VesEventV5.VesEvent.CommonEventHeader + fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> companion object { val INSTANCE: MessageGenerator by lazy { diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index 7e80cc66..cc00f5ac 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -17,12 +17,14 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.config +package org.onap.dcae.collectors.veshv.ves.message.generator.api -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1) +data class MessageParameters(val domain: Domain, + val messageType: MessageType, + val amount: Long = -1) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt new file mode 100644 index 00000000..e34ed6d6 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.api + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +enum class MessageType { + VALID, + TOO_BIG_PAYLOAD, + UNSUPPORTED_DOMAIN, + INVALID_WIRE_FRAME, + INVALID_GPB_DATA +} diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index b2f73894..dca573dc 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -20,14 +20,23 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl import com.google.protobuf.ByteString +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadContentType import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.* +import org.onap.ves.HVRanMeasFieldsV5 +import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields +import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import javax.json.JsonObject +import java.nio.charset.Charset /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -35,41 +44,78 @@ import javax.json.JsonObject */ class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> = - Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { - if (messageParameters.amount < 0) - it.repeat() - else - it.repeat(messageParameters.amount) - } + override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux + .fromIterable(messageParameters) + .flatMap { createMessageFlux(it) } - override fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder() - .setVersion(json.getString("version")) - .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain"))) - .setSequence(json.getInt("sequence")) - .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority"))) - .setEventId(json.getString("eventId")) - .setEventName(json.getString("eventName")) - .setEventType(json.getString("eventType")) - .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) - .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) - .setNfNamingCode(json.getString("nfNamingCode")) - .setNfcNamingCode(json.getString("nfcNamingCode")) - .setReportingEntityId(json.getString("reportingEntityId")) - .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) - .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) - .setSourceName(json.getString("sourceName")) - .build() + private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> = + Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) } + .let { + if (parameters.amount < 0) + it.repeat() + else + it.repeat(parameters.amount) + } + private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage = + when (messageType) { + VALID -> + PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload())) + TOO_BIG_PAYLOAD -> + PayloadWireFrameMessage(vesEvent(domain, oversizedPayload())) + UNSUPPORTED_DOMAIN -> + PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload())) + INVALID_WIRE_FRAME -> { + val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload())) + PayloadWireFrameMessage( + payload, + UNSUPPORTED_VERSION, + PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payload.size()) + } + INVALID_GPB_DATA -> + PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) + } - private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage = - PayloadWireFrameMessage(vesMessageBytes(commonHeader)) + private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray { + return vesEvent(domain, hvRanMeasPayload.toByteString()) + } + private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray { + return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray() + } - private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = + private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = VesEvent.newBuilder() - .setCommonEventHeader(commonHeader) - .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString()) + .setCommonEventHeader(commonEventHeader) + .setHvRanMeasFields(payload) .build() - .toByteArray() + + private fun oversizedPayload() = + payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1) + + + private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion("sample-version") + .setDomain(domain) + .setSequence(1) + .setPriority(CommonEventHeader.Priority.NORMAL) + .setEventId("sample-event-id") + .setEventName("sample-event-name") + .setEventType("sample-event-type") + .setStartEpochMicrosec(SAMPLE_START_EPOCH) + .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) + .setNfNamingCode("sample-nf-naming-code") + .setNfcNamingCode("sample-nfc-naming-code") + .setReportingEntityId("sample-reporting-entity-id") + .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setSourceId(ByteString.copyFromUtf8("sample-source-id")) + .setSourceName("sample-source-name") + .build() + + companion object { + private const val UNSUPPORTED_VERSION: Short = 2 + private const val SAMPLE_START_EPOCH = 120034455L + private const val SAMPLE_LAST_EPOCH = 120034455L + } } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt index 66f34e9e..c85ce035 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl +import com.google.protobuf.ByteString import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas @@ -28,6 +29,9 @@ internal class PayloadGenerator { private val randomGenerator = Random() + fun generateRawPayload(size: Int): ByteString = + ByteString.copyFrom(ByteArray(size)) + fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload { val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject) return HVRanMeasPayload.newBuilder() diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index 07027173..fb144616 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -19,66 +19,153 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl -import com.google.protobuf.ByteString +import com.google.protobuf.InvalidProtocolBufferException +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters -import org.onap.ves.VesEventV5 -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.* import reactor.test.test -const val SAMPLE_START_EPOCH: Long = 120034455 -const val SAMPLE_LAST_EPOCH: Long = 120034455 - /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ object MessageGeneratorImplTest : Spek({ describe("message factory") { - val generator = MessageGenerator.INSTANCE + given("single message parameters") { + on("messages amount not specified in parameters") { + it("should create infinite flux") { + val limit = 1000L + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID))) + .take(limit) + .test() + .expectNextCount(limit) + .verifyComplete() + } + } + on("messages amount specified in parameters") { + it("should create message flux of specified size") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5))) + .test() + .expectNextCount(5) + .verifyComplete() + } + } + on("message type requesting valid message") { + it("should create flux of valid messages with given domain") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting too big payload") { + it("should create flux of messages with given domain and payload exceeding threshold") { - given("only common header") { - it("should return infinite flux") { - val limit = 1000L - generator.createMessageFlux(getSampleMessageParameters()).take(limit).test() - .expectNextCount(limit) - .verifyComplete() + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting unsupported domain") { + it("should create flux of messages with domain other than HVRANMEAS") { + + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting invalid GPB data ") { + it("should create flux of messages with invalid payload") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { extractCommonEventHeader(it.payload) } + } + .verifyComplete() + } + } + on("message type requesting invalid wire frame ") { + it("should create flux of messages with invalid version") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isFalse() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION) + } + .verifyComplete() + } } } - given("common header and messages amount") { - it("should return message flux of specified size") { - generator.createMessageFlux((getSampleMessageParameters(5))).test() - .expectNextCount(5) + given("list of message parameters") { + it("should create concatenated flux of messages") { + val singleFluxSize = 5L + val messageParameters = listOf( + MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize), + MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize), + MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize) + ) + generator.createMessageFlux(messageParameters) + .test() + .assertNext { + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .expectNextCount(singleFluxSize - 1) + .assertNext { + assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + } + .expectNextCount(singleFluxSize - 1) + .assertNext { + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT) + } + .expectNextCount(singleFluxSize - 1) .verifyComplete() } } } }) -fun getSampleMessageParameters(amount: Long = -1): MessageParameters { - val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() - .setVersion("sample-version") - .setDomain(HVRANMEAS) - .setSequence(1) - .setPriority(MEDIUM) - .setEventId("sample-event-id") - .setEventName("sample-event-name") - .setEventType("sample-event-type") - .setStartEpochMicrosec(SAMPLE_START_EPOCH) - .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) - .setNfNamingCode("sample-nf-naming-code") - .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) - .setSourceId(ByteString.copyFromUtf8("sample-source-id")) - .setSourceName("sample-source-name") - .build() - - return MessageParameters(commonHeader, amount) -} +fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { + return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader +}
\ No newline at end of file |