diff options
8 files changed, 277 insertions, 99 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index b793f3aa..4953d8f3 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -29,7 +29,9 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.core.publisher.Flux import reactor.math.sum import java.security.MessageDigest @@ -55,8 +57,10 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) val params = MessageParameters( - commonEventHeader = vesEvent().commonEventHeader, + domain = HVRANMEAS, + messageType = VALID, amount = numMessages) + val fluxes = (1.rangeTo(runs)).map { sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) } @@ -82,7 +86,8 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofSeconds(30) val params = MessageParameters( - commonEventHeader = vesEvent().commonEventHeader, + domain = HVRANMEAS, + messageType = VALID, amount = numMessages) val dataStream = generateDataStream(sut.alloc, params) @@ -162,7 +167,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = WireFrameEncoder(alloc).let { encoder -> MessageGenerator.INSTANCE - .createMessageFlux(params) + .createMessageFlux(listOf(params)) .map(encoder::encode) .transform { simulateRemoteTcp(alloc, 1000, it) } } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt index e52db848..7407f692 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt @@ -20,20 +20,16 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator -import org.onap.ves.VesEventV5 import reactor.core.publisher.Flux -import javax.json.JsonObject /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> - fun parseCommonHeader(json: JsonObject): VesEventV5.VesEvent.CommonEventHeader + fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> companion object { val INSTANCE: MessageGenerator by lazy { diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index 7e80cc66..cc00f5ac 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -17,12 +17,14 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.config +package org.onap.dcae.collectors.veshv.ves.message.generator.api -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1) +data class MessageParameters(val domain: Domain, + val messageType: MessageType, + val amount: Long = -1) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt new file mode 100644 index 00000000..e34ed6d6 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.api + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +enum class MessageType { + VALID, + TOO_BIG_PAYLOAD, + UNSUPPORTED_DOMAIN, + INVALID_WIRE_FRAME, + INVALID_GPB_DATA +} diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index b2f73894..dca573dc 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -20,14 +20,23 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl import com.google.protobuf.ByteString +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadContentType import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.* +import org.onap.ves.HVRanMeasFieldsV5 +import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields +import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import javax.json.JsonObject +import java.nio.charset.Charset /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -35,41 +44,78 @@ import javax.json.JsonObject */ class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> = - Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { - if (messageParameters.amount < 0) - it.repeat() - else - it.repeat(messageParameters.amount) - } + override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux + .fromIterable(messageParameters) + .flatMap { createMessageFlux(it) } - override fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder() - .setVersion(json.getString("version")) - .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain"))) - .setSequence(json.getInt("sequence")) - .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority"))) - .setEventId(json.getString("eventId")) - .setEventName(json.getString("eventName")) - .setEventType(json.getString("eventType")) - .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) - .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) - .setNfNamingCode(json.getString("nfNamingCode")) - .setNfcNamingCode(json.getString("nfcNamingCode")) - .setReportingEntityId(json.getString("reportingEntityId")) - .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) - .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) - .setSourceName(json.getString("sourceName")) - .build() + private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> = + Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) } + .let { + if (parameters.amount < 0) + it.repeat() + else + it.repeat(parameters.amount) + } + private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage = + when (messageType) { + VALID -> + PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload())) + TOO_BIG_PAYLOAD -> + PayloadWireFrameMessage(vesEvent(domain, oversizedPayload())) + UNSUPPORTED_DOMAIN -> + PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload())) + INVALID_WIRE_FRAME -> { + val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload())) + PayloadWireFrameMessage( + payload, + UNSUPPORTED_VERSION, + PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payload.size()) + } + INVALID_GPB_DATA -> + PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) + } - private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage = - PayloadWireFrameMessage(vesMessageBytes(commonHeader)) + private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray { + return vesEvent(domain, hvRanMeasPayload.toByteString()) + } + private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray { + return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray() + } - private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = + private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = VesEvent.newBuilder() - .setCommonEventHeader(commonHeader) - .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString()) + .setCommonEventHeader(commonEventHeader) + .setHvRanMeasFields(payload) .build() - .toByteArray() + + private fun oversizedPayload() = + payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1) + + + private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion("sample-version") + .setDomain(domain) + .setSequence(1) + .setPriority(CommonEventHeader.Priority.NORMAL) + .setEventId("sample-event-id") + .setEventName("sample-event-name") + .setEventType("sample-event-type") + .setStartEpochMicrosec(SAMPLE_START_EPOCH) + .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) + .setNfNamingCode("sample-nf-naming-code") + .setNfcNamingCode("sample-nfc-naming-code") + .setReportingEntityId("sample-reporting-entity-id") + .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setSourceId(ByteString.copyFromUtf8("sample-source-id")) + .setSourceName("sample-source-name") + .build() + + companion object { + private const val UNSUPPORTED_VERSION: Short = 2 + private const val SAMPLE_START_EPOCH = 120034455L + private const val SAMPLE_LAST_EPOCH = 120034455L + } } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt index 66f34e9e..c85ce035 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl +import com.google.protobuf.ByteString import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas @@ -28,6 +29,9 @@ internal class PayloadGenerator { private val randomGenerator = Random() + fun generateRawPayload(size: Int): ByteString = + ByteString.copyFrom(ByteArray(size)) + fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload { val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject) return HVRanMeasPayload.newBuilder() diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index 07027173..fb144616 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -19,66 +19,153 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl -import com.google.protobuf.ByteString +import com.google.protobuf.InvalidProtocolBufferException +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters -import org.onap.ves.VesEventV5 -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.* import reactor.test.test -const val SAMPLE_START_EPOCH: Long = 120034455 -const val SAMPLE_LAST_EPOCH: Long = 120034455 - /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ object MessageGeneratorImplTest : Spek({ describe("message factory") { - val generator = MessageGenerator.INSTANCE + given("single message parameters") { + on("messages amount not specified in parameters") { + it("should create infinite flux") { + val limit = 1000L + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID))) + .take(limit) + .test() + .expectNextCount(limit) + .verifyComplete() + } + } + on("messages amount specified in parameters") { + it("should create message flux of specified size") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5))) + .test() + .expectNextCount(5) + .verifyComplete() + } + } + on("message type requesting valid message") { + it("should create flux of valid messages with given domain") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting too big payload") { + it("should create flux of messages with given domain and payload exceeding threshold") { - given("only common header") { - it("should return infinite flux") { - val limit = 1000L - generator.createMessageFlux(getSampleMessageParameters()).take(limit).test() - .expectNextCount(limit) - .verifyComplete() + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting unsupported domain") { + it("should create flux of messages with domain other than HVRANMEAS") { + + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS) + } + .verifyComplete() + } + } + on("message type requesting invalid GPB data ") { + it("should create flux of messages with invalid payload") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { extractCommonEventHeader(it.payload) } + } + .verifyComplete() + } + } + on("message type requesting invalid wire frame ") { + it("should create flux of messages with invalid version") { + generator + .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1))) + .test() + .assertNext { + assertThat(it.isValid()).isFalse() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION) + } + .verifyComplete() + } } } - given("common header and messages amount") { - it("should return message flux of specified size") { - generator.createMessageFlux((getSampleMessageParameters(5))).test() - .expectNextCount(5) + given("list of message parameters") { + it("should create concatenated flux of messages") { + val singleFluxSize = 5L + val messageParameters = listOf( + MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize), + MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize), + MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize) + ) + generator.createMessageFlux(messageParameters) + .test() + .assertNext { + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + } + .expectNextCount(singleFluxSize - 1) + .assertNext { + assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + } + .expectNextCount(singleFluxSize - 1) + .assertNext { + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT) + } + .expectNextCount(singleFluxSize - 1) .verifyComplete() } } } }) -fun getSampleMessageParameters(amount: Long = -1): MessageParameters { - val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() - .setVersion("sample-version") - .setDomain(HVRANMEAS) - .setSequence(1) - .setPriority(MEDIUM) - .setEventId("sample-event-id") - .setEventName("sample-event-name") - .setEventType("sample-event-type") - .setStartEpochMicrosec(SAMPLE_START_EPOCH) - .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) - .setNfNamingCode("sample-nf-naming-code") - .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) - .setSourceId(ByteString.copyFromUtf8("sample-source-id")) - .setSourceName("sample-source-name") - .build() - - return MessageParameters(commonHeader, amount) -} +fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { + return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader +}
\ No newline at end of file diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index 08a35d42..0ab248b9 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import ratpack.exec.Promise import ratpack.handling.Chain import ratpack.handling.Context @@ -31,8 +33,9 @@ import ratpack.server.RatpackServer import ratpack.server.ServerConfig import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers +import java.nio.charset.Charset import javax.json.Json -import javax.json.JsonObject +import javax.json.JsonArray /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -47,7 +50,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun configureHandlers(chain: Chain) { chain .post("simulator/sync") { ctx -> @@ -68,11 +70,26 @@ internal class HttpServer(private val vesClient: XnfSimulator) { private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> { return ctx.request.body - .map { Json.createReader(it.inputStream).readObject() } + .map { Json.createReader(it.inputStream).readArray() } .map { extractMessageParameters(it) } .map { MessageGenerator.INSTANCE.createMessageFlux(it) } } + private fun extractMessageParameters(request: JsonArray): List<MessageParameters> = + try { + request + .map { it.asJsonObject() } + .map { + + val domain = Domain.valueOf(it.getString("domain")) + val messageType = MessageType.valueOf(it.getString("messageType")) + val messagesAmount = it.getJsonNumber("messagesAmount").longValue() + MessageParameters(domain, messageType, messagesAmount) + } + } catch (e: Exception) { + throw ValidationException("Validating request body failed", e) + } + private fun sendAcceptedResponse(ctx: Context) { ctx.response .status(STATUS_OK) @@ -94,17 +111,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) { .toString()) } - private fun extractMessageParameters(request: JsonObject): MessageParameters = - try { - val commonEventHeader = MessageGenerator.INSTANCE - .parseCommonHeader(request.getJsonObject("commonEventHeader")) - val messagesAmount = request.getJsonNumber("messagesAmount").longValue() - MessageParameters(commonEventHeader, messagesAmount) - } catch (e: Exception) { - throw ValidationException("Validating request body failed", e) - } - - companion object { private val logger = Logger(HttpServer::class) const val DEFAULT_PORT = 5000 |