diff options
13 files changed, 325 insertions, 114 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 4953d8f3..928c62fb 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -20,7 +20,12 @@ package org.onap.dcae.collectors.veshv.tests.component import arrow.syntax.function.partially1 -import io.netty.buffer.* +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -31,6 +36,8 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID +import org.onap.ves.VesEventV5 +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.core.publisher.Flux import reactor.math.sum @@ -57,9 +64,10 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) val params = MessageParameters( - domain = HVRANMEAS, + commonEventHeader = createSampleCommonHeader(HVRANMEAS), messageType = VALID, - amount = numMessages) + amount = numMessages + ) val fluxes = (1.rangeTo(runs)).map { sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) @@ -86,9 +94,10 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofSeconds(30) val params = MessageParameters( - domain = HVRANMEAS, + commonEventHeader = createSampleCommonHeader(HVRANMEAS), messageType = VALID, - amount = numMessages) + amount = numMessages + ) val dataStream = generateDataStream(sut.alloc, params) .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) @@ -193,3 +202,21 @@ private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { sink.complete() } } + +private fun createSampleCommonHeader(domain: Domain): VesEventV5.VesEvent.CommonEventHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() + .setVersion("sample-version") + .setDomain(domain) + .setSequence(1) + .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL) + .setEventId("sample-event-id") + .setEventName("sample-event-name") + .setEventType("sample-event-type") + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034455) + .setNfNamingCode("sample-nf-naming-code") + .setNfcNamingCode("sample-nfc-naming-code") + .setReportingEntityId("sample-reporting-entity-id") + .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setSourceId(ByteString.copyFromUtf8("sample-source-id")) + .setSourceName("sample-source-name") + .build() diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml index ea19ba3f..d0e44932 100644 --- a/hv-collector-utils/pom.xml +++ b/hv-collector-utils/pom.xml @@ -60,6 +60,20 @@ <dependencies> <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-domain</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-ves-message-generator</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt new file mode 100644 index 00000000..d115675d --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.messages + +import com.google.protobuf.ByteString +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder +import javax.json.JsonObject + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +class CommonEventHeaderParser { + fun parse(json: JsonObject): CommonEventHeader = newBuilder() + .setVersion(json.getString("version")) + .setDomain(Domain.valueOf(json.getString("domain"))) + .setSequence(json.getInt("sequence")) + .setPriority(Priority.forNumber(json.getInt("priority"))) + .setEventId(json.getString("version")) + .setEventName(json.getString("version")) + .setEventType(json.getString("version")) + .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) + .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) + .setNfNamingCode(json.getString("nfNamingCode")) + .setNfcNamingCode(json.getString("nfcNamingCode")) + .setReportingEntityId(json.getString("reportingEntityId")) + .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) + .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) + .setSourceName(json.getString("sourceName")) + .build() + +} diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt new file mode 100644 index 00000000..24c2cbfa --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.messages + +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import javax.json.JsonArray + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +class MessageParametersParser( + private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()) { + + fun parse(request: JsonArray): List<MessageParameters> = + try { + request + .map { it.asJsonObject() } + .map { + val commonEventHeader = commonEventHeaderParser.parse(it.getJsonObject("commonEventHeader")) + val messageType = MessageType.valueOf(it.getString("messageType")) + val messagesAmount = it.getJsonNumber("messagesAmount").longValue() + MessageParameters(commonEventHeader, messageType, messagesAmount) + } + } catch (e: Exception) { + throw ParsingException("Parsing request body failed", e) + } + + internal class ParsingException(message: String?, cause: Exception) : Exception(message, cause) +} diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml index dfa30b10..f049d78f 100644 --- a/hv-collector-ves-message-generator/pom.xml +++ b/hv-collector-ves-message-generator/pom.xml @@ -63,11 +63,6 @@ <version>${project.parent.version}</version> </dependency> <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt new file mode 100644 index 00000000..605b1729 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.api + +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParserImpl +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import javax.json.JsonObject + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +interface CommonEventHeaderParser { + + fun parse(json: JsonObject): CommonEventHeader + + companion object { + val INSTANCE: CommonEventHeaderParser by lazy { + CommonEventHeaderParserImpl() + } + } +}
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index cc00f5ac..8d989cc5 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -19,12 +19,12 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -data class MessageParameters(val domain: Domain, +data class MessageParameters(val commonEventHeader: CommonEventHeader, val messageType: MessageType, val amount: Long = -1) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt index e34ed6d6..0ac90544 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt @@ -26,7 +26,6 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api enum class MessageType { VALID, TOO_BIG_PAYLOAD, - UNSUPPORTED_DOMAIN, INVALID_WIRE_FRAME, INVALID_GPB_DATA } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt new file mode 100644 index 00000000..61f5f2f3 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.impl + +import com.google.protobuf.ByteString +import org.onap.dcae.collectors.veshv.ves.message.generator.api.CommonEventHeaderParser +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority +import javax.json.JsonObject + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +class CommonEventHeaderParserImpl : CommonEventHeaderParser { + + override fun parse(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion(json.getString("version")) + .setDomain(Domain.valueOf(json.getString("domain"))) + .setSequence(json.getInt("sequence")) + .setPriority(Priority.forNumber(json.getInt("priority"))) + .setEventId(json.getString("version")) + .setEventName(json.getString("version")) + .setEventType(json.getString("version")) + .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) + .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) + .setNfNamingCode(json.getString("nfNamingCode")) + .setNfcNamingCode(json.getString("nfcNamingCode")) + .setReportingEntityId(json.getString("reportingEntityId")) + .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) + .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) + .setSourceName(json.getString("sourceName")) + .build() +}
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index dca573dc..e9db716d 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -26,14 +26,14 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.* -import org.onap.ves.HVRanMeasFieldsV5 -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.charset.Charset @@ -49,7 +49,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa .flatMap { createMessageFlux(it) } private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> = - Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) } + Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) } .let { if (parameters.amount < 0) it.repeat() @@ -57,16 +57,14 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa it.repeat(parameters.amount) } - private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage = + private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): PayloadWireFrameMessage = when (messageType) { VALID -> - PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload())) + PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) TOO_BIG_PAYLOAD -> - PayloadWireFrameMessage(vesEvent(domain, oversizedPayload())) - UNSUPPORTED_DOMAIN -> - PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload())) + PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload())) INVALID_WIRE_FRAME -> { - val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload())) + val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) PayloadWireFrameMessage( payload, UNSUPPORTED_VERSION, @@ -77,12 +75,12 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) } - private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray { - return vesEvent(domain, hvRanMeasPayload.toByteString()) + private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray { + return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString()) } - private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray { - return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray() + private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray { + return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray() } private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = @@ -94,28 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa private fun oversizedPayload() = payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1) - - private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder() - .setVersion("sample-version") - .setDomain(domain) - .setSequence(1) - .setPriority(CommonEventHeader.Priority.NORMAL) - .setEventId("sample-event-id") - .setEventName("sample-event-name") - .setEventType("sample-event-type") - .setStartEpochMicrosec(SAMPLE_START_EPOCH) - .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) - .setNfNamingCode("sample-nf-naming-code") - .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) - .setSourceId(ByteString.copyFromUtf8("sample-source-id")) - .setSourceName("sample-source-name") - .build() - companion object { private const val UNSUPPORTED_VERSION: Short = 2 - private const val SAMPLE_START_EPOCH = 120034455L - private const val SAMPLE_LAST_EPOCH = 120034455L } } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index fb144616..b2490dd1 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl +import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -34,7 +35,9 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.* +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.test.test /** @@ -49,7 +52,10 @@ object MessageGeneratorImplTest : Spek({ it("should create infinite flux") { val limit = 1000L generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.VALID + ))) .take(limit) .test() .expectNextCount(limit) @@ -59,7 +65,11 @@ object MessageGeneratorImplTest : Spek({ on("messages amount specified in parameters") { it("should create message flux of specified size") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.VALID, + 5 + ))) .test() .expectNextCount(5) .verifyComplete() @@ -68,12 +78,16 @@ object MessageGeneratorImplTest : Spek({ on("message type requesting valid message") { it("should create flux of valid messages with given domain") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(FAULT), + MessageType.VALID, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isTrue() assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) } .verifyComplete() } @@ -82,7 +96,11 @@ object MessageGeneratorImplTest : Spek({ it("should create flux of messages with given domain and payload exceeding threshold") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.TOO_BIG_PAYLOAD, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isTrue() @@ -92,24 +110,14 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } - on("message type requesting unsupported domain") { - it("should create flux of messages with domain other than HVRANMEAS") { - - generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1))) - .test() - .assertNext { - assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS) - } - .verifyComplete() - } - } on("message type requesting invalid GPB data ") { it("should create flux of messages with invalid payload") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.INVALID_GPB_DATA, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isTrue() @@ -123,7 +131,11 @@ object MessageGeneratorImplTest : Spek({ on("message type requesting invalid wire frame ") { it("should create flux of messages with invalid version") { generator - .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1))) + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(HVRANMEAS), + MessageType.INVALID_WIRE_FRAME, + 1 + ))) .test() .assertNext { assertThat(it.isValid()).isFalse() @@ -139,9 +151,9 @@ object MessageGeneratorImplTest : Spek({ it("should create concatenated flux of messages") { val singleFluxSize = 5L val messageParameters = listOf( - MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize), - MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize), - MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize) + MessageParameters(createSampleCommonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize), + MessageParameters(createSampleCommonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize), + MessageParameters(createSampleCommonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize) ) generator.createMessageFlux(messageParameters) .test() @@ -168,4 +180,24 @@ object MessageGeneratorImplTest : Spek({ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader -}
\ No newline at end of file +} + +private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion("sample-version") + .setDomain(domain) + .setSequence(1) + .setPriority(CommonEventHeader.Priority.NORMAL) + .setEventId("sample-event-id") + .setEventName("sample-event-name") + .setEventType("sample-event-type") + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034455) + .setNfNamingCode("sample-nf-naming-code") + .setNfcNamingCode("sample-nfc-naming-code") + .setReportingEntityId("sample-reporting-entity-id") + .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setSourceId(ByteString.copyFromUtf8("sample-source-id")) + .setSourceName("sample-source-name") + .build() + + diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index 24ef578d..de686bc5 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -20,27 +20,22 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import ratpack.exec.Promise import ratpack.handling.Chain import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.ServerConfig -import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers import javax.json.Json -import javax.json.JsonArray /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -internal class HttpServer(private val vesClient: XnfSimulator) { +internal class HttpServer(private val vesClient: XnfSimulator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser()) { fun start(port: Int): IO<RatpackServer> = IO { RatpackServer.start { server -> @@ -52,14 +47,20 @@ internal class HttpServer(private val vesClient: XnfSimulator) { private fun configureHandlers(chain: Chain) { chain .post("simulator/sync") { ctx -> - createMessageFlux(ctx) + ctx.request.body + .map { Json.createReader(it.inputStream).readArray() } + .map { messageParametersParser.parse(it) } + .map { MessageGenerator.INSTANCE.createMessageFlux(it) } .map { vesClient.sendIo(it) } .map { it.unsafeRunSync() } .onError { handleException(it, ctx) } .then { sendAcceptedResponse(ctx) } } .post("simulator/async") { ctx -> - createMessageFlux(ctx) + ctx.request.body + .map { Json.createReader(it.inputStream).readArray() } + .map { messageParametersParser.parse(it) } + .map { MessageGenerator.INSTANCE.createMessageFlux(it) } .map { vesClient.sendRx(it) } .map { it.subscribeOn(Schedulers.elastic()).subscribe() } .onError { handleException(it, ctx) } @@ -67,28 +68,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> { - return ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { extractMessageParameters(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - } - - private fun extractMessageParameters(request: JsonArray): List<MessageParameters> = - try { - request - .map { it.asJsonObject() } - .map { - - val domain = Domain.valueOf(it.getString("domain")) - val messageType = MessageType.valueOf(it.getString("messageType")) - val messagesAmount = it.getJsonNumber("messagesAmount").longValue() - MessageParameters(domain, messageType, messagesAmount) - } - } catch (e: Exception) { - throw ValidationException("Validating request body failed", e) - } - private fun sendAcceptedResponse(ctx: Context) { ctx.response .status(STATUS_OK) @@ -117,5 +96,3 @@ internal class HttpServer(private val vesClient: XnfSimulator) { const val CONTENT_TYPE_APPLICATION_JSON = "application/json" } } - -internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause) diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 19c52efa..fa6d626b 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -38,10 +38,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map { - XnfSimulator(it) - .run(::HttpServer) - .start(it.listenPort) + .map {config -> + XnfSimulator(config) + .let { HttpServer(it) } + .start(config.listenPort) .void() } .unsafeRunEitherSync( @@ -53,4 +53,3 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) logger.info("Started xNF Simulator API server") } ) - |