summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-07-25 16:13:28 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-03 10:07:04 +0200
commitf738ede42e619f1a5c13671cb560224aa639f1db (patch)
treed1ed0743a90e495155f8196fbe83bab7fdf52d52
parentefceaa86a2fc3eb22b9e30bafd08c0fb6ad2a783 (diff)
Pass CommonEventHeader to XNF simulator api
This change makes XNF simulator more configurable and allows to validate more message parameters in robot integration tests Closes ONAP-689 Change-Id: Ic0a10f1e1cdd84ac415c00050b4cca1ac496c56b Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-601
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt37
-rw-r--r--hv-collector-utils/pom.xml14
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt52
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt48
-rw-r--r--hv-collector-ves-message-generator/pom.xml5
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt39
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt4
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt1
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt52
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt49
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt84
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt45
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt9
13 files changed, 325 insertions, 114 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 4953d8f3..928c62fb 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -20,7 +20,12 @@
package org.onap.dcae.collectors.veshv.tests.component
import arrow.syntax.function.partially1
-import io.netty.buffer.*
+import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.CompositeByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -31,6 +36,8 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.core.publisher.Flux
import reactor.math.sum
@@ -57,9 +64,10 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- domain = HVRANMEAS,
+ commonEventHeader = createSampleCommonHeader(HVRANMEAS),
messageType = VALID,
- amount = numMessages)
+ amount = numMessages
+ )
val fluxes = (1.rangeTo(runs)).map {
sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
@@ -86,9 +94,10 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- domain = HVRANMEAS,
+ commonEventHeader = createSampleCommonHeader(HVRANMEAS),
messageType = VALID,
- amount = numMessages)
+ amount = numMessages
+ )
val dataStream = generateDataStream(sut.alloc, params)
.transform(::dropWhenIndex.partially1 { it % 101 == 0L })
@@ -193,3 +202,21 @@ private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
sink.complete()
}
}
+
+private fun createSampleCommonHeader(domain: Domain): VesEventV5.VesEvent.CommonEventHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+ .setVersion("sample-version")
+ .setDomain(domain)
+ .setSequence(1)
+ .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL)
+ .setEventId("sample-event-id")
+ .setEventName("sample-event-name")
+ .setEventType("sample-event-type")
+ .setStartEpochMicrosec(120034455)
+ .setLastEpochMicrosec(120034455)
+ .setNfNamingCode("sample-nf-naming-code")
+ .setNfcNamingCode("sample-nfc-naming-code")
+ .setReportingEntityId("sample-reporting-entity-id")
+ .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+ .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
+ .setSourceName("sample-source-name")
+ .build()
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index ea19ba3f..d0e44932 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -60,6 +60,20 @@
<dependencies>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-domain</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ves-message-generator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt
new file mode 100644
index 00000000..d115675d
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/CommonEventHeaderParser.kt
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.messages
+
+import com.google.protobuf.ByteString
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
+import javax.json.JsonObject
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+class CommonEventHeaderParser {
+ fun parse(json: JsonObject): CommonEventHeader = newBuilder()
+ .setVersion(json.getString("version"))
+ .setDomain(Domain.valueOf(json.getString("domain")))
+ .setSequence(json.getInt("sequence"))
+ .setPriority(Priority.forNumber(json.getInt("priority")))
+ .setEventId(json.getString("version"))
+ .setEventName(json.getString("version"))
+ .setEventType(json.getString("version"))
+ .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
+ .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
+ .setNfNamingCode(json.getString("nfNamingCode"))
+ .setNfcNamingCode(json.getString("nfcNamingCode"))
+ .setReportingEntityId(json.getString("reportingEntityId"))
+ .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
+ .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
+ .setSourceName(json.getString("sourceName"))
+ .build()
+
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt
new file mode 100644
index 00000000..24c2cbfa
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.messages
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import javax.json.JsonArray
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+class MessageParametersParser(
+ private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()) {
+
+ fun parse(request: JsonArray): List<MessageParameters> =
+ try {
+ request
+ .map { it.asJsonObject() }
+ .map {
+ val commonEventHeader = commonEventHeaderParser.parse(it.getJsonObject("commonEventHeader"))
+ val messageType = MessageType.valueOf(it.getString("messageType"))
+ val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
+ MessageParameters(commonEventHeader, messageType, messagesAmount)
+ }
+ } catch (e: Exception) {
+ throw ParsingException("Parsing request body failed", e)
+ }
+
+ internal class ParsingException(message: String?, cause: Exception) : Exception(message, cause)
+}
diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml
index dfa30b10..f049d78f 100644
--- a/hv-collector-ves-message-generator/pom.xml
+++ b/hv-collector-ves-message-generator/pom.xml
@@ -63,11 +63,6 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-utils</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt
new file mode 100644
index 00000000..605b1729
--- /dev/null
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParserImpl
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import javax.json.JsonObject
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+interface CommonEventHeaderParser {
+
+ fun parse(json: JsonObject): CommonEventHeader
+
+ companion object {
+ val INSTANCE: CommonEventHeaderParser by lazy {
+ CommonEventHeaderParserImpl()
+ }
+ }
+} \ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
index cc00f5ac..8d989cc5 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
@@ -19,12 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-data class MessageParameters(val domain: Domain,
+data class MessageParameters(val commonEventHeader: CommonEventHeader,
val messageType: MessageType,
val amount: Long = -1)
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
index e34ed6d6..0ac90544 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
@@ -26,7 +26,6 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api
enum class MessageType {
VALID,
TOO_BIG_PAYLOAD,
- UNSUPPORTED_DOMAIN,
INVALID_WIRE_FRAME,
INVALID_GPB_DATA
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt
new file mode 100644
index 00000000..61f5f2f3
--- /dev/null
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import com.google.protobuf.ByteString
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.CommonEventHeaderParser
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
+import javax.json.JsonObject
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+class CommonEventHeaderParserImpl : CommonEventHeaderParser {
+
+ override fun parse(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
+ .setVersion(json.getString("version"))
+ .setDomain(Domain.valueOf(json.getString("domain")))
+ .setSequence(json.getInt("sequence"))
+ .setPriority(Priority.forNumber(json.getInt("priority")))
+ .setEventId(json.getString("version"))
+ .setEventName(json.getString("version"))
+ .setEventType(json.getString("version"))
+ .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
+ .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
+ .setNfNamingCode(json.getString("nfNamingCode"))
+ .setNfcNamingCode(json.getString("nfcNamingCode"))
+ .setReportingEntityId(json.getString("reportingEntityId"))
+ .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
+ .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
+ .setSourceName(json.getString("sourceName"))
+ .build()
+} \ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index dca573dc..e9db716d 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -26,14 +26,14 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.*
-import org.onap.ves.HVRanMeasFieldsV5
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.charset.Charset
@@ -49,7 +49,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
.flatMap { createMessageFlux(it) }
private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> =
- Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) }
+ Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
.let {
if (parameters.amount < 0)
it.repeat()
@@ -57,16 +57,14 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
it.repeat(parameters.amount)
}
- private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage =
+ private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): PayloadWireFrameMessage =
when (messageType) {
VALID ->
- PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload()))
+ PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
TOO_BIG_PAYLOAD ->
- PayloadWireFrameMessage(vesEvent(domain, oversizedPayload()))
- UNSUPPORTED_DOMAIN ->
- PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload()))
+ PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
INVALID_WIRE_FRAME -> {
- val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload()))
+ val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
PayloadWireFrameMessage(
payload,
UNSUPPORTED_VERSION,
@@ -77,12 +75,12 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
}
- private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
- return vesEvent(domain, hvRanMeasPayload.toByteString())
+ private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
+ return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString())
}
- private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray {
- return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray()
+ private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
+ return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray()
}
private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
@@ -94,28 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
private fun oversizedPayload() =
payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
-
- private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder()
- .setVersion("sample-version")
- .setDomain(domain)
- .setSequence(1)
- .setPriority(CommonEventHeader.Priority.NORMAL)
- .setEventId("sample-event-id")
- .setEventName("sample-event-name")
- .setEventType("sample-event-type")
- .setStartEpochMicrosec(SAMPLE_START_EPOCH)
- .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
- .setNfNamingCode("sample-nf-naming-code")
- .setNfcNamingCode("sample-nfc-naming-code")
- .setReportingEntityId("sample-reporting-entity-id")
- .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
- .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
- .setSourceName("sample-source-name")
- .build()
-
companion object {
private const val UNSUPPORTED_VERSION: Short = 2
- private const val SAMPLE_START_EPOCH = 120034455L
- private const val SAMPLE_LAST_EPOCH = 120034455L
}
}
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
index fb144616..b2490dd1 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
@@ -34,7 +35,9 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.*
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.test.test
/**
@@ -49,7 +52,10 @@ object MessageGeneratorImplTest : Spek({
it("should create infinite flux") {
val limit = 1000L
generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID)))
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(HVRANMEAS),
+ MessageType.VALID
+ )))
.take(limit)
.test()
.expectNextCount(limit)
@@ -59,7 +65,11 @@ object MessageGeneratorImplTest : Spek({
on("messages amount specified in parameters") {
it("should create message flux of specified size") {
generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5)))
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(HVRANMEAS),
+ MessageType.VALID,
+ 5
+ )))
.test()
.expectNextCount(5)
.verifyComplete()
@@ -68,12 +78,16 @@ object MessageGeneratorImplTest : Spek({
on("message type requesting valid message") {
it("should create flux of valid messages with given domain") {
generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1)))
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(FAULT),
+ MessageType.VALID,
+ 1
+ )))
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
}
.verifyComplete()
}
@@ -82,7 +96,11 @@ object MessageGeneratorImplTest : Spek({
it("should create flux of messages with given domain and payload exceeding threshold") {
generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1)))
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(HVRANMEAS),
+ MessageType.TOO_BIG_PAYLOAD,
+ 1
+ )))
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
@@ -92,24 +110,14 @@ object MessageGeneratorImplTest : Spek({
.verifyComplete()
}
}
- on("message type requesting unsupported domain") {
- it("should create flux of messages with domain other than HVRANMEAS") {
-
- generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1)))
- .test()
- .assertNext {
- assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS)
- }
- .verifyComplete()
- }
- }
on("message type requesting invalid GPB data ") {
it("should create flux of messages with invalid payload") {
generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1)))
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(HVRANMEAS),
+ MessageType.INVALID_GPB_DATA,
+ 1
+ )))
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
@@ -123,7 +131,11 @@ object MessageGeneratorImplTest : Spek({
on("message type requesting invalid wire frame ") {
it("should create flux of messages with invalid version") {
generator
- .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1)))
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(HVRANMEAS),
+ MessageType.INVALID_WIRE_FRAME,
+ 1
+ )))
.test()
.assertNext {
assertThat(it.isValid()).isFalse()
@@ -139,9 +151,9 @@ object MessageGeneratorImplTest : Spek({
it("should create concatenated flux of messages") {
val singleFluxSize = 5L
val messageParameters = listOf(
- MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize),
- MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
- MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize)
+ MessageParameters(createSampleCommonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize),
+ MessageParameters(createSampleCommonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
+ MessageParameters(createSampleCommonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
)
generator.createMessageFlux(messageParameters)
.test()
@@ -168,4 +180,24 @@ object MessageGeneratorImplTest : Spek({
fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-} \ No newline at end of file
+}
+
+private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder()
+ .setVersion("sample-version")
+ .setDomain(domain)
+ .setSequence(1)
+ .setPriority(CommonEventHeader.Priority.NORMAL)
+ .setEventId("sample-event-id")
+ .setEventName("sample-event-name")
+ .setEventType("sample-event-type")
+ .setStartEpochMicrosec(120034455)
+ .setLastEpochMicrosec(120034455)
+ .setNfNamingCode("sample-nf-naming-code")
+ .setNfcNamingCode("sample-nfc-naming-code")
+ .setReportingEntityId("sample-reporting-entity-id")
+ .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+ .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
+ .setSourceName("sample-source-name")
+ .build()
+
+
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index 24ef578d..de686bc5 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -20,27 +20,22 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import ratpack.exec.Promise
import ratpack.handling.Chain
import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
-import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import javax.json.Json
-import javax.json.JsonArray
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-internal class HttpServer(private val vesClient: XnfSimulator) {
+internal class HttpServer(private val vesClient: XnfSimulator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
fun start(port: Int): IO<RatpackServer> = IO {
RatpackServer.start { server ->
@@ -52,14 +47,20 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
private fun configureHandlers(chain: Chain) {
chain
.post("simulator/sync") { ctx ->
- createMessageFlux(ctx)
+ ctx.request.body
+ .map { Json.createReader(it.inputStream).readArray() }
+ .map { messageParametersParser.parse(it) }
+ .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
.map { vesClient.sendIo(it) }
.map { it.unsafeRunSync() }
.onError { handleException(it, ctx) }
.then { sendAcceptedResponse(ctx) }
}
.post("simulator/async") { ctx ->
- createMessageFlux(ctx)
+ ctx.request.body
+ .map { Json.createReader(it.inputStream).readArray() }
+ .map { messageParametersParser.parse(it) }
+ .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
.map { vesClient.sendRx(it) }
.map { it.subscribeOn(Schedulers.elastic()).subscribe() }
.onError { handleException(it, ctx) }
@@ -67,28 +68,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
}
}
- private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
- return ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { extractMessageParameters(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- }
-
- private fun extractMessageParameters(request: JsonArray): List<MessageParameters> =
- try {
- request
- .map { it.asJsonObject() }
- .map {
-
- val domain = Domain.valueOf(it.getString("domain"))
- val messageType = MessageType.valueOf(it.getString("messageType"))
- val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
- MessageParameters(domain, messageType, messagesAmount)
- }
- } catch (e: Exception) {
- throw ValidationException("Validating request body failed", e)
- }
-
private fun sendAcceptedResponse(ctx: Context) {
ctx.response
.status(STATUS_OK)
@@ -117,5 +96,3 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
}
}
-
-internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 19c52efa..fa6d626b 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -38,10 +38,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
*/
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map {
- XnfSimulator(it)
- .run(::HttpServer)
- .start(it.listenPort)
+ .map {config ->
+ XnfSimulator(config)
+ .let { HttpServer(it) }
+ .start(config.listenPort)
.void()
}
.unsafeRunEitherSync(
@@ -53,4 +53,3 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
logger.info("Started xNF Simulator API server")
}
)
-