aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt13
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt (renamed from hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt)8
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt32
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt110
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt4
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt167
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt36
8 files changed, 277 insertions, 99 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index b793f3aa..4953d8f3 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -29,7 +29,9 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
@@ -55,8 +57,10 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- commonEventHeader = vesEvent().commonEventHeader,
+ domain = HVRANMEAS,
+ messageType = VALID,
amount = numMessages)
+
val fluxes = (1.rangeTo(runs)).map {
sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
}
@@ -82,7 +86,8 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- commonEventHeader = vesEvent().commonEventHeader,
+ domain = HVRANMEAS,
+ messageType = VALID,
amount = numMessages)
val dataStream = generateDataStream(sut.alloc, params)
@@ -162,7 +167,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt
private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
WireFrameEncoder(alloc).let { encoder ->
MessageGenerator.INSTANCE
- .createMessageFlux(params)
+ .createMessageFlux(listOf(params))
.map(encoder::encode)
.transform { simulateRemoteTcp(alloc, 1000, it) }
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
index e52db848..7407f692 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -20,20 +20,16 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.api
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
-import org.onap.ves.VesEventV5
import reactor.core.publisher.Flux
-import javax.json.JsonObject
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
interface MessageGenerator {
- fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
- fun parseCommonHeader(json: JsonObject): VesEventV5.VesEvent.CommonEventHeader
+ fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage>
companion object {
val INSTANCE: MessageGenerator by lazy {
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
index 7e80cc66..cc00f5ac 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
@@ -17,12 +17,14 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.config
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1)
+data class MessageParameters(val domain: Domain,
+ val messageType: MessageType,
+ val amount: Long = -1)
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
new file mode 100644
index 00000000..e34ed6d6
--- /dev/null
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+enum class MessageType {
+ VALID,
+ TOO_BIG_PAYLOAD,
+ UNSUPPORTED_DOMAIN,
+ INVALID_WIRE_FRAME,
+ INVALID_GPB_DATA
+}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index b2f73894..dca573dc 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -20,14 +20,23 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadContentType
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.*
+import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import javax.json.JsonObject
+import java.nio.charset.Charset
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -35,41 +44,78 @@ import javax.json.JsonObject
*/
class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
- Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
- if (messageParameters.amount < 0)
- it.repeat()
- else
- it.repeat(messageParameters.amount)
- }
+ override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux
+ .fromIterable(messageParameters)
+ .flatMap { createMessageFlux(it) }
- override fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
- .setVersion(json.getString("version"))
- .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain")))
- .setSequence(json.getInt("sequence"))
- .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority")))
- .setEventId(json.getString("eventId"))
- .setEventName(json.getString("eventName"))
- .setEventType(json.getString("eventType"))
- .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
- .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
- .setNfNamingCode(json.getString("nfNamingCode"))
- .setNfcNamingCode(json.getString("nfcNamingCode"))
- .setReportingEntityId(json.getString("reportingEntityId"))
- .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
- .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
- .setSourceName(json.getString("sourceName"))
- .build()
+ private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> =
+ Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) }
+ .let {
+ if (parameters.amount < 0)
+ it.repeat()
+ else
+ it.repeat(parameters.amount)
+ }
+ private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage =
+ when (messageType) {
+ VALID ->
+ PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload()))
+ TOO_BIG_PAYLOAD ->
+ PayloadWireFrameMessage(vesEvent(domain, oversizedPayload()))
+ UNSUPPORTED_DOMAIN ->
+ PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload()))
+ INVALID_WIRE_FRAME -> {
+ val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload()))
+ PayloadWireFrameMessage(
+ payload,
+ UNSUPPORTED_VERSION,
+ PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payload.size())
+ }
+ INVALID_GPB_DATA ->
+ PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+ }
- private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
- PayloadWireFrameMessage(vesMessageBytes(commonHeader))
+ private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
+ return vesEvent(domain, hvRanMeasPayload.toByteString())
+ }
+ private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray {
+ return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray()
+ }
- private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
+ private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
VesEvent.newBuilder()
- .setCommonEventHeader(commonHeader)
- .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString())
+ .setCommonEventHeader(commonEventHeader)
+ .setHvRanMeasFields(payload)
.build()
- .toByteArray()
+
+ private fun oversizedPayload() =
+ payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+
+
+ private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder()
+ .setVersion("sample-version")
+ .setDomain(domain)
+ .setSequence(1)
+ .setPriority(CommonEventHeader.Priority.NORMAL)
+ .setEventId("sample-event-id")
+ .setEventName("sample-event-name")
+ .setEventType("sample-event-type")
+ .setStartEpochMicrosec(SAMPLE_START_EPOCH)
+ .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
+ .setNfNamingCode("sample-nf-naming-code")
+ .setNfcNamingCode("sample-nfc-naming-code")
+ .setReportingEntityId("sample-reporting-entity-id")
+ .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+ .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
+ .setSourceName("sample-source-name")
+ .build()
+
+ companion object {
+ private const val UNSUPPORTED_VERSION: Short = 2
+ private const val SAMPLE_START_EPOCH = 120034455L
+ private const val SAMPLE_LAST_EPOCH = 120034455L
+ }
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
index 66f34e9e..c85ce035 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+import com.google.protobuf.ByteString
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
@@ -28,6 +29,9 @@ internal class PayloadGenerator {
private val randomGenerator = Random()
+ fun generateRawPayload(size: Int): ByteString =
+ ByteString.copyFrom(ByteArray(size))
+
fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload {
val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject)
return HVRanMeasPayload.newBuilder()
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
index 07027173..fb144616 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
@@ -19,66 +19,153 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
-import com.google.protobuf.ByteString
+import com.google.protobuf.InvalidProtocolBufferException
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
-import org.onap.ves.VesEventV5
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.*
import reactor.test.test
-const val SAMPLE_START_EPOCH: Long = 120034455
-const val SAMPLE_LAST_EPOCH: Long = 120034455
-
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
object MessageGeneratorImplTest : Spek({
describe("message factory") {
-
val generator = MessageGenerator.INSTANCE
+ given("single message parameters") {
+ on("messages amount not specified in parameters") {
+ it("should create infinite flux") {
+ val limit = 1000L
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID)))
+ .take(limit)
+ .test()
+ .expectNextCount(limit)
+ .verifyComplete()
+ }
+ }
+ on("messages amount specified in parameters") {
+ it("should create message flux of specified size") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5)))
+ .test()
+ .expectNextCount(5)
+ .verifyComplete()
+ }
+ }
+ on("message type requesting valid message") {
+ it("should create flux of valid messages with given domain") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1)))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ }
+ .verifyComplete()
+ }
+ }
+ on("message type requesting too big payload") {
+ it("should create flux of messages with given domain and payload exceeding threshold") {
- given("only common header") {
- it("should return infinite flux") {
- val limit = 1000L
- generator.createMessageFlux(getSampleMessageParameters()).take(limit).test()
- .expectNextCount(limit)
- .verifyComplete()
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1)))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ }
+ .verifyComplete()
+ }
+ }
+ on("message type requesting unsupported domain") {
+ it("should create flux of messages with domain other than HVRANMEAS") {
+
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1)))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS)
+ }
+ .verifyComplete()
+ }
+ }
+ on("message type requesting invalid GPB data ") {
+ it("should create flux of messages with invalid payload") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1)))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+ .isThrownBy { extractCommonEventHeader(it.payload) }
+ }
+ .verifyComplete()
+ }
+ }
+ on("message type requesting invalid wire frame ") {
+ it("should create flux of messages with invalid version") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1)))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isFalse()
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION)
+ }
+ .verifyComplete()
+ }
}
}
- given("common header and messages amount") {
- it("should return message flux of specified size") {
- generator.createMessageFlux((getSampleMessageParameters(5))).test()
- .expectNextCount(5)
+ given("list of message parameters") {
+ it("should create concatenated flux of messages") {
+ val singleFluxSize = 5L
+ val messageParameters = listOf(
+ MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize),
+ MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
+ MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize)
+ )
+ generator.createMessageFlux(messageParameters)
+ .test()
+ .assertNext {
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ }
+ .expectNextCount(singleFluxSize - 1)
+ .assertNext {
+ assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ }
+ .expectNextCount(singleFluxSize - 1)
+ .assertNext {
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
+ }
+ .expectNextCount(singleFluxSize - 1)
.verifyComplete()
}
}
}
})
-fun getSampleMessageParameters(amount: Long = -1): MessageParameters {
- val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
- .setVersion("sample-version")
- .setDomain(HVRANMEAS)
- .setSequence(1)
- .setPriority(MEDIUM)
- .setEventId("sample-event-id")
- .setEventName("sample-event-name")
- .setEventType("sample-event-type")
- .setStartEpochMicrosec(SAMPLE_START_EPOCH)
- .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
- .setNfNamingCode("sample-nf-naming-code")
- .setNfcNamingCode("sample-nfc-naming-code")
- .setReportingEntityId("sample-reporting-entity-id")
- .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
- .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
- .setSourceName("sample-source-name")
- .build()
-
- return MessageParameters(commonHeader, amount)
-}
+fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
+ return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+} \ No newline at end of file
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index 08a35d42..0ab248b9 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import ratpack.exec.Promise
import ratpack.handling.Chain
import ratpack.handling.Context
@@ -31,8 +33,9 @@ import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
+import java.nio.charset.Charset
import javax.json.Json
-import javax.json.JsonObject
+import javax.json.JsonArray
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -47,7 +50,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
}
}
-
private fun configureHandlers(chain: Chain) {
chain
.post("simulator/sync") { ctx ->
@@ -68,11 +70,26 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
return ctx.request.body
- .map { Json.createReader(it.inputStream).readObject() }
+ .map { Json.createReader(it.inputStream).readArray() }
.map { extractMessageParameters(it) }
.map { MessageGenerator.INSTANCE.createMessageFlux(it) }
}
+ private fun extractMessageParameters(request: JsonArray): List<MessageParameters> =
+ try {
+ request
+ .map { it.asJsonObject() }
+ .map {
+
+ val domain = Domain.valueOf(it.getString("domain"))
+ val messageType = MessageType.valueOf(it.getString("messageType"))
+ val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
+ MessageParameters(domain, messageType, messagesAmount)
+ }
+ } catch (e: Exception) {
+ throw ValidationException("Validating request body failed", e)
+ }
+
private fun sendAcceptedResponse(ctx: Context) {
ctx.response
.status(STATUS_OK)
@@ -94,17 +111,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
.toString())
}
- private fun extractMessageParameters(request: JsonObject): MessageParameters =
- try {
- val commonEventHeader = MessageGenerator.INSTANCE
- .parseCommonHeader(request.getJsonObject("commonEventHeader"))
- val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
- MessageParameters(commonEventHeader, messagesAmount)
- } catch (e: Exception) {
- throw ValidationException("Validating request body failed", e)
- }
-
-
companion object {
private val logger = Logger(HttpServer::class)
const val DEFAULT_PORT = 5000