diff options
Diffstat (limited to 'hv-collector-ves-message-generator')
7 files changed, 164 insertions, 70 deletions
diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml index dfa30b10..f049d78f 100644 --- a/hv-collector-ves-message-generator/pom.xml +++ b/hv-collector-ves-message-generator/pom.xml @@ -63,11 +63,6 @@ <version>${project.parent.version}</version> </dependency> <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt new file mode 100644 index 00000000..605b1729 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.api + +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParserImpl +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import javax.json.JsonObject + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +interface CommonEventHeaderParser { + + fun parse(json: JsonObject): CommonEventHeader + + companion object { + val INSTANCE: CommonEventHeaderParser by lazy { + CommonEventHeaderParserImpl() + } + } +}
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index cc00f5ac..8d989cc5 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -19,12 +19,12 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -data class MessageParameters(val domain: Domain, +data class MessageParameters(val commonEventHeader: CommonEventHeader, val messageType: MessageType, val amount: Long = -1) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt index e34ed6d6..0ac90544 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt @@ -26,7 +26,6 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api enum class MessageType { VALID, TOO_BIG_PAYLOAD, - UNSUPPORTED_DOMAIN, INVALID_WIRE_FRAME, INVALID_GPB_DATA } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt new file mode 100644 index 00000000..61f5f2f3 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.impl + +import com.google.protobuf.ByteString +import org.onap.dcae.collectors.veshv.ves.message.generator.api.CommonEventHeaderParser +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority +import javax.json.JsonObject + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +class CommonEventHeaderParserImpl : CommonEventHeaderParser { + + override fun parse(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion(json.getString("version")) + .setDomain(Domain.valueOf(json.getString("domain"))) + .setSequence(json.getInt("sequence")) + .setPriority(Priority.forNumber(json.getInt("priority"))) + .setEventId(json.getString("version")) + .setEventName(json.getString("version")) + .setEventType(json.getString("version")) + .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) + .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) + .setNfNamingCode(json.getString("nfNamingCode")) + .setNfcNamingCode(json.getString("nfcNamingCode")) + .setReportingEntityId(json.getString("reportingEntityId")) + .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) + .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) + .setSourceName(json.getString("sourceName")) + .build() +}
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index dca573dc..e9db716d 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -26,14 +26,14 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.* -import org.onap.ves.HVRanMeasFieldsV5 -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.charset.Charset @@ -49,7 +49,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa .flatMap { createMessageFlux(it) } private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> = - Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) } + Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) } .let { if (parameters.amount < 0) it.repeat() @@ -57,16 +57,14 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa it.repeat(parameters.amount) } - private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage = + private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): PayloadWireFrameMessage = when (messageType) { VALID -> - PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload())) + PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) TOO_BIG_PAYLOAD -> - PayloadWireFrameMessage(vesEvent(domain, oversizedPayload())) - UNSUPPORTED_DOMAIN -> - PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload())) + PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload())) INVALID_WIRE_FRAME -> { - val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload())) + val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) PayloadWireFrameMessage( payload, UNSUPPORTED_VERSION, @@ -77,12 +75,12 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) } - private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray { - return vesEvent(domain, hvRanMeasPayload.toByteString()) + private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray { + return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString()) } - private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray { - return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray() + private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray { + return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray() } private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = @@ -94,28 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa private fun oversizedPayload() = payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1) - - private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder() - .setVersion("sample-version") - .setDomain(domain) - .setSequence(1) - .setPriority(CommonEventHeader.Priority.NORMAL) - .setEventId("sample-event-id") - .setEventName("sample-event-name") - .setEventType("sample-event-type") - .setStartEpochMicrosec(SAMPLE_START_EPOCH) - .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) - .setNfNamingCode("sample-nf-naming-code") - .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) - .setSourceId(ByteString.copyFromUtf8("sample-source-id")) - .setSourceName("sample-source-name") - .build() - companion object { private const val UNSUPPORTED_VERSION: Short = 2 - private const val SAMPLE_START_EPOCH = 120034455L - private const val SAMPLE_LAST_EPOCH = 120034455L } } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index fb144616..b2490dd1 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl +import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -34,7 +35,9 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.* +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.test.test /** @@ -49,7 +52,10 @@ object MessageGeneratorImplTest : Spek({ it("should create infinite flux") { val limit = 1000L generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.VALID + ))) .take(limit) .test() .expectNextCount(limit) @@ -59,7 +65,11 @@ object MessageGeneratorImplTest : Spek({ on("messages amount specified in parameters") { it("should create message flux of specified size") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.VALID, + 5 + ))) .test() .expectNextCount(5) .verifyComplete() @@ -68,12 +78,16 @@ object MessageGeneratorImplTest : Spek({ on("message type requesting valid message") { it("should create flux of valid messages with given domain") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(FAULT), + MessageType.VALID, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isTrue() assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) } .verifyComplete() } @@ -82,7 +96,11 @@ object MessageGeneratorImplTest : Spek({ it("should create flux of messages with given domain and payload exceeding threshold") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.TOO_BIG_PAYLOAD, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isTrue() @@ -92,24 +110,14 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } - on("message type requesting unsupported domain") { - it("should create flux of messages with domain other than HVRANMEAS") { - - generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1))) - .test() - .assertNext { - assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS) - } - .verifyComplete() - } - } on("message type requesting invalid GPB data ") { it("should create flux of messages with invalid payload") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.INVALID_GPB_DATA, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isTrue() @@ -123,7 +131,11 @@ object MessageGeneratorImplTest : Spek({ on("message type requesting invalid wire frame ") { it("should create flux of messages with invalid version") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.INVALID_WIRE_FRAME, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isFalse() @@ -139,9 +151,9 @@ object MessageGeneratorImplTest : Spek({ it("should create concatenated flux of messages") { val singleFluxSize = 5L val messageParameters = listOf( - MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize), - MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize), - MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize) + MessageParameters(createSampleCommonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize), + MessageParameters(createSampleCommonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize), + MessageParameters(createSampleCommonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize) ) generator.createMessageFlux(messageParameters) .test() @@ -168,4 +180,24 @@ object MessageGeneratorImplTest : Spek({ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader -}
\ No newline at end of file +} + +private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion("sample-version") + .setDomain(domain) + .setSequence(1) + .setPriority(CommonEventHeader.Priority.NORMAL) + .setEventId("sample-event-id") + .setEventName("sample-event-name") + .setEventType("sample-event-type") + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034455) + .setNfNamingCode("sample-nf-naming-code") + .setNfcNamingCode("sample-nfc-naming-code") + .setReportingEntityId("sample-reporting-entity-id") + .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setSourceId(ByteString.copyFromUtf8("sample-source-id")) + .setSourceName("sample-source-name") + .build() + + |