aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ves-message-generator
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-02-04 15:20:14 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-02-15 15:09:48 +0100
commitdf17f466577b97a12fac39b64b5d113f32b82f2e (patch)
tree0a8999e593c90f97ed1b4f45b6e8adbbc110a787 /sources/hv-collector-ves-message-generator
parente7204cbcf6af61856330cffc541b6f5c78476a09 (diff)
Generate VesEvents in hv-ves/message-generator
- Split message generator on two specialized generators for VesEvent and WireFrame related message types - Refactor whole message-generator module Change-Id: I1266b549a9a4d27213d03e8921298deab2dacb59 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1162
Diffstat (limited to 'sources/hv-collector-ves-message-generator')
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt20
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt23
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt4
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageTypes.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt)27
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt15
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt109
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt74
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt)21
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt)12
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt72
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt66
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt228
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt72
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt187
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt (renamed from sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt)10
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt (renamed from sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt)10
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt143
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt81
18 files changed, 673 insertions, 501 deletions
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
index 076c06be..5f8638f0 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,18 +19,26 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-interface MessageGenerator {
- fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
+abstract class MessageGenerator<K : MessageParameters, T> {
+ abstract fun createMessageFlux(parameters: K): Flux<T>
- companion object {
- const val FIXED_PAYLOAD_SIZE = 100
+ protected fun repeatMessage(message: Mono<T>, amount: Long): Flux<T> = when {
+ amount < 0 -> repeatForever(message)
+ amount == 0L -> emptyMessageStream()
+ else -> repeatNTimes(message, amount)
}
+
+ private fun repeatForever(message: Mono<T>) = message.repeat()
+
+ private fun emptyMessageStream() = Flux.empty<T>()
+
+ private fun repeatNTimes(message: Mono<T>, amount: Long) = message.repeat(amount - 1)
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
index 047d863c..82b79c0c 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,12 +19,25 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-data class MessageParameters(val commonEventHeader: CommonEventHeader,
- val messageType: MessageType,
- val amount: Long = -1)
+abstract class MessageParameters(val amount: Long = -1)
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class WireFrameParameters(val messageType: WireFrameType,
+ amount: Long = -1) : MessageParameters(amount)
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class VesEventParameters(val commonEventHeader: VesEventOuterClass.CommonEventHeader,
+ val messageType: VesEventType,
+ amount: Long = -1) : MessageParameters(amount)
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
index 754fa31f..854c1cda 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
@@ -28,9 +28,7 @@ interface MessageParametersParser {
fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>>
companion object {
- val INSTANCE: MessageParametersParser by lazy {
- MessageParametersParserImpl()
- }
+ val INSTANCE: MessageParametersParser by lazy { MessageParametersParserImpl() }
}
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageTypes.kt
index 22c88252..54e26363 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageTypes.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,14 +19,31 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
+import arrow.core.Try
+
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
+ * @since February 2019
*/
-enum class MessageType {
+enum class VesEventType {
VALID,
TOO_BIG_PAYLOAD,
- FIXED_PAYLOAD,
+ FIXED_PAYLOAD;
+
+ companion object {
+ fun isVesEventType(str: String): Boolean = Try { valueOf(str) }.isSuccess()
+ }
+}
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+enum class WireFrameType {
INVALID_WIRE_FRAME,
- INVALID_GPB_DATA,
+ INVALID_GPB_DATA;
+
+ companion object {
+ fun isWireFrameType(str: String): Boolean = Try { WireFrameType.valueOf(str) }.isSuccess()
+ }
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
index e2269c20..aa473796 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,15 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.factory
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.PayloadGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireFrameGenerator
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since October 2018
*/
-object MessageGeneratorFactory {
- fun create(maxPayloadSizeBytes: Int): MessageGenerator =
- MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+class MessageGeneratorFactory(private val maxPayloadSizeBytes: Int) {
+ fun createVesEventGenerator() = VesEventGenerator(PayloadGenerator(), maxPayloadSizeBytes)
+
+ fun createWireFrameGenerator() = WireFrameGenerator()
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
deleted file mode 100644
index fa39ed16..00000000
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
-
-import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadContentType
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import java.nio.charset.Charset
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-class MessageGeneratorImpl internal constructor(
- private val payloadGenerator: PayloadGenerator,
- private val maxPayloadSizeBytes: Int
-) : MessageGenerator {
-
- override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
- .fromIterable(messageParameters)
- .flatMap { createMessageFlux(it) }
-
- private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> =
- Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
- .let {
- when {
- parameters.amount < 0 ->
- // repeat forever
- it.repeat()
- parameters.amount == 0L ->
- // do not generate any message
- Flux.empty()
- else ->
- // send original message and additional amount-1 messages
- it.repeat(parameters.amount - 1)
- }
- }
-
- private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage =
- when (messageType) {
- VALID ->
- WireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
- TOO_BIG_PAYLOAD ->
- WireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
- FIXED_PAYLOAD ->
- WireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
- INVALID_WIRE_FRAME -> {
- val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
- WireFrameMessage(
- payload,
- UNSUPPORTED_VERSION,
- UNSUPPORTED_VERSION,
- PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
- payload.size())
- }
- INVALID_GPB_DATA ->
- WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
- }
-
- private fun vesEvent(commonEventHeader: CommonEventHeader, eventFields: ByteString): ByteArray {
- return createVesEvent(commonEventHeader, eventFields).toByteArray()
- }
-
- private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
- VesEvent.newBuilder()
- .setCommonEventHeader(commonEventHeader)
- .setEventFields(payload)
- .build()
-
- private fun oversizedPayload() =
- payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
-
- private fun fixedPayload() =
- payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
-
- companion object {
- private const val UNSUPPORTED_VERSION: Short = 2
- }
-}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
index 88cc47a6..174a01fd 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,15 +19,24 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+import arrow.core.Either
import arrow.core.Option
import arrow.core.Try
import arrow.core.identity
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.Companion.isVesEventType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.Companion.isWireFrameType
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.CommonEventHeaderParser
import javax.json.JsonArray
+import javax.json.JsonObject
+import javax.json.JsonValue
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -37,28 +46,49 @@ internal class MessageParametersParserImpl(
private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()
) : MessageParametersParser {
- override fun parse(request: JsonArray) =
- Try {
- request
- .map { it.asJsonObject() }
- .onEach { logger.info { "Parsing MessageParameters body: $it" } }
- .map { json ->
- val commonEventHeader = commonEventHeaderParser
- .parse(json.getJsonObject("commonEventHeader"))
- .fold({ throw IllegalStateException("Invalid common header") }, ::identity)
- val messageType = MessageType.valueOf(json.getString("messageType"))
- val messagesAmount = json.getJsonNumber("messagesAmount")?.longValue()
- ?: throw NullPointerException("\"messagesAmount\" could not be parsed.")
- MessageParameters(commonEventHeader, messageType, messagesAmount)
- }
- }.toEither().mapLeft { ex ->
- ParsingError(
- ex.message ?: "Unable to parse message parameters",
- Option.fromNullable(ex))
- }
+ override fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>> =
+ Try { parseArray(request) }
+ .toEither()
+ .mapLeft { ex ->
+ ParsingError(
+ ex.message ?: "Unable to parse message parameters",
+ Option.fromNullable(ex))
+ }
+
+ private fun parseArray(array: JsonArray) = array
+ .map(JsonValue::asJsonObject)
+ .onEach { logger.info { "Parsing MessageParameters body: $it" } }
+ .map(::parseParameters)
+
+ private fun parseParameters(json: JsonObject): MessageParameters {
+ val messagesAmount = json.getJsonNumber("messagesAmount")?.longValue()
+ ?: throw ParsingException("\"messagesAmount\" could not be parsed.")
+
+ val messageType = json.getString("messageType")
+
+ return when {
+ isVesEventType(messageType) ->
+ constructVesEventParams(json, messageType, messagesAmount)
+ isWireFrameType(messageType) ->
+ WireFrameParameters(WireFrameType.valueOf(messageType), messagesAmount)
+ else -> throw ParsingException("Invalid message type")
+ }
+ }
+
+ private fun constructVesEventParams(json: JsonObject,
+ messageType: String,
+ messagesAmount: Long): VesEventParameters =
+ commonEventHeaderParser
+ .parse(json.getJsonObject("commonEventHeader"))
+ .fold({ throw ParsingException("Invalid common header") }, ::identity)
+ .let { VesEventParameters(it, VesEventType.valueOf(messageType), messagesAmount) }
+
+
+ private class ParsingException(message: String) : Exception(message)
companion object {
private val logger = Logger(MessageParametersParserImpl::class)
}
-
}
+
+
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt
index 909db5e4..05938924 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
import arrow.core.Option
import com.google.protobuf.util.JsonFormat
@@ -30,18 +30,17 @@ import javax.json.JsonObject
* @since July 2018
*/
class CommonEventHeaderParser {
- fun parse(json: JsonObject): Option<CommonEventHeader> =
- Option.fromNullable(
- CommonEventHeader.newBuilder()
- .apply { JsonFormat.parser().merge(json.toString(), this) }
- .build()
- .takeUnless { !isValid(it) }
- )
+ fun parse(json: JsonObject): Option<CommonEventHeader> = Option.fromNullable(
+ CommonEventHeader.newBuilder()
+ .apply { JsonFormat.parser().merge(json.toString(), this) }
+ .build()
+ .takeUnless { !isValid(it) }
+ )
- private fun isValid(header: CommonEventHeader): Boolean {
- return allMandatoryFieldsArePresent(header)
- }
+ private fun isValid(header: CommonEventHeader): Boolean =
+ allMandatoryFieldsArePresent(header)
+
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
headerRequiredFieldDescriptors
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt
index 545e237c..ed521054 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
import com.google.protobuf.ByteString
import java.util.*
@@ -32,9 +32,15 @@ internal class PayloadGenerator {
fun generatePayload(numOfCountMeasurements: Long = 2): ByteString =
ByteString.copyFrom(
- randomGenerator.ints(numOfCountMeasurements, 0, 256)
+ randomGenerator
+ .ints(numOfCountMeasurements, MIN_BYTE_VALUE, MAX_BYTE_VALUE)
.asSequence()
.toString()
.toByteArray()
)
+
+ companion object {
+ private const val MIN_BYTE_VALUE = 0
+ private const val MAX_BYTE_VALUE = 256
+ }
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt
new file mode 100644
index 00000000..7abd6054
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+
+import com.google.protobuf.ByteString
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.TOO_BIG_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesEventGenerator internal constructor(
+ private val payloadGenerator: PayloadGenerator,
+ private val maxPayloadSizeBytes: Int
+) : MessageGenerator<VesEventParameters, VesEvent>() {
+
+ override fun createMessageFlux(parameters: VesEventParameters): Flux<VesEvent> =
+ parameters.run {
+ Mono
+ .fromCallable { createMessage(commonEventHeader, messageType) }
+ .let { repeatMessage(it, amount) }
+ }
+
+ private fun createMessage(commonEventHeader: CommonEventHeader, messageType: VesEventType): VesEvent =
+ when (messageType) {
+ VALID -> vesEvent(commonEventHeader, payloadGenerator.generatePayload())
+ TOO_BIG_PAYLOAD -> vesEvent(commonEventHeader, oversizedPayload())
+ FIXED_PAYLOAD -> vesEvent(commonEventHeader, fixedPayload())
+ }
+
+ private fun vesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
+ VesEvent.newBuilder()
+ .setCommonEventHeader(commonEventHeader)
+ .setEventFields(payload)
+ .build()
+
+ private fun oversizedPayload(): ByteString =
+ payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
+
+ private fun fixedPayload(): ByteString =
+ payloadGenerator.generateRawPayload(FIXED_PAYLOAD_SIZE)
+
+ companion object {
+ const val FIXED_PAYLOAD_SIZE = 100
+ }
+}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt
new file mode 100644
index 00000000..ad45bc5c
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe
+
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadContentType
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.charset.Charset
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class WireFrameGenerator : MessageGenerator<WireFrameParameters, WireFrameMessage>() {
+
+ override fun createMessageFlux(parameters: WireFrameParameters): Flux<WireFrameMessage> =
+ parameters.run {
+ Mono
+ .fromCallable { createMessage(messageType) }
+ .let { repeatMessage(it, amount) }
+ }
+
+ private fun createMessage(messageType: WireFrameType): WireFrameMessage =
+ when (messageType) {
+ INVALID_WIRE_FRAME -> {
+ val payload = ByteData(VesEvent.getDefaultInstance().toByteArray())
+ WireFrameMessage(
+ payload,
+ UNSUPPORTED_VERSION,
+ UNSUPPORTED_VERSION,
+ PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payload.size())
+ }
+ INVALID_GPB_DATA ->
+ WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+ }
+
+ companion object {
+ private const val UNSUPPORTED_VERSION: Short = 2
+ }
+}
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
deleted file mode 100644
index 930f020b..00000000
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
-
-import com.google.protobuf.ByteString
-import com.google.protobuf.InvalidProtocolBufferException
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.test.test
-import kotlin.test.assertTrue
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-object MessageGeneratorImplTest : Spek({
- describe("message factory") {
- val maxPayloadSizeBytes = 1024
- val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
- given("single message parameters") {
-
- on("messages amount not specified in parameters") {
- it("should create infinite flux") {
- val limit = 1000L
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(PERF3GPP),
- MessageType.VALID
- )))
- .take(limit)
- .test()
- .expectNextCount(limit)
- .verifyComplete()
- }
- }
-
- on("messages amount = 0 specified in parameters") {
- it("should create empty message flux") {
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(PERF3GPP),
- MessageType.VALID,
- 0
- )))
- .test()
- .verifyComplete()
- }
- }
-
- on("messages amount specified in parameters") {
- it("should create message flux of specified size") {
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(PERF3GPP),
- MessageType.VALID,
- 5
- )))
- .test()
- .expectNextCount(5)
- .verifyComplete()
- }
- }
-
- on("message type requesting valid message") {
- it("should create flux of valid messages with given domain") {
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(FAULT),
- MessageType.VALID,
- 1
- )))
- .test()
- .assertNext {
- assertTrue(it.validate().isRight())
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
- }
- .verifyComplete()
- }
- }
-
- on("message type requesting too big payload") {
- it("should create flux of messages with given domain and payload exceeding threshold") {
-
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(PERF3GPP),
- MessageType.TOO_BIG_PAYLOAD,
- 1
- )))
- .test()
- .assertNext {
- assertTrue(it.validate().isRight())
- assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
- }
- .verifyComplete()
- }
- }
-
- on("message type requesting invalid GPB data ") {
- it("should create flux of messages with invalid payload") {
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(PERF3GPP),
- MessageType.INVALID_GPB_DATA,
- 1
- )))
- .test()
- .assertNext {
- assertTrue(it.validate().isRight())
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
- .isThrownBy { extractCommonEventHeader(it.payload) }
- }
- .verifyComplete()
- }
- }
-
- on("message type requesting invalid wire frame ") {
- it("should create flux of messages with invalid version") {
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(PERF3GPP),
- MessageType.INVALID_WIRE_FRAME,
- 1
- )))
- .test()
- .assertNext {
- assertTrue(it.validate().isLeft())
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
- assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
- }
- .verifyComplete()
- }
- }
-
- on("message type requesting fixed payload") {
- it("should create flux of valid messages with fixed payload") {
- generator
- .createMessageFlux(listOf(MessageParameters(
- commonHeader(FAULT),
- MessageType.FIXED_PAYLOAD,
- 1
- )))
- .test()
- .assertNext {
- assertTrue(it.validate().isRight())
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
- }
- .verifyComplete()
- }
- }
- }
- given("list of message parameters") {
- it("should create concatenated flux of messages") {
- val singleFluxSize = 5L
- val messageParameters = listOf(
- MessageParameters(commonHeader(PERF3GPP), MessageType.VALID, singleFluxSize),
- MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
- MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
- )
- generator.createMessageFlux(messageParameters)
- .test()
- .assertNext {
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
- }
- .expectNextCount(singleFluxSize - 1)
- .assertNext {
- assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
- }
- .expectNextCount(singleFluxSize - 1)
- .assertNext {
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
- }
- .expectNextCount(singleFluxSize - 1)
- .verifyComplete()
- }
- }
- }
-})
-
-fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
- VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-
-
-fun extractEventFields(bytes: ByteData): ByteString =
- VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields
-
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
index 134ebb2d..f34f153e 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+import arrow.core.Either
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
@@ -26,9 +27,13 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.tests.utils.assertFailedWithError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA
-private const val EXPECTED_MESSAGES_AMOUNT = 25000L
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -36,27 +41,66 @@ private const val EXPECTED_MESSAGES_AMOUNT = 25000L
*/
object MessageParametersParserTest : Spek({
describe("Messages parameters parser") {
- val messageParametersParser = MessageParametersParserImpl()
+ val cut = MessageParametersParserImpl()
given("parameters json array") {
on("valid parameters json") {
- it("should parse MessagesParameters object successfully") {
- val result = messageParametersParser.parse(validMessagesParametesJson())
- result.fold({ fail("should have succeeded") }) { rightResult ->
- assertThat(rightResult).hasSize(2)
- val firstMessage = rightResult.first()
- assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
- assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+ it("should parse VesEventParameters") {
+ val result = cut.parse(validVesEventParameters())
+
+ result.fold({ fail("parsing VesEventParameters should have succeeded") }) { rightResult ->
+ assertThat(rightResult).hasSize(1)
+
+ val vesEventParams = rightResult.first()
+ val expectedVesEventCount = 25000L
+
+ assertThat(vesEventParams is VesEventParameters)
+ vesEventParams as VesEventParameters
+ assertThat(vesEventParams.messageType).isEqualTo(VALID)
+ assertThat(vesEventParams.amount).isEqualTo(expectedVesEventCount)
+ }
+ }
+
+ it("should parse WireFrameParameters") {
+ val result = cut.parse(validWireFrameParameters())
+
+ result.fold({ fail("parsing WireFrameParameters should have succeeded") }) { rightResult ->
+ assertThat(rightResult).hasSize(1)
+
+ val wireFrameParams = rightResult.first()
+ val expectedWireFrameCount = 100L
+ assertThat(wireFrameParams is WireFrameParameters)
+ wireFrameParams as WireFrameParameters
+ assertThat(wireFrameParams.messageType).isEqualTo(INVALID_GPB_DATA)
+ assertThat(wireFrameParams.amount).isEqualTo(expectedWireFrameCount)
+ }
+ }
+
+
+ it("should parse multiple types of MessageParameters") {
+ val result = cut.parse(multipleMessageParameters())
+
+ result.fold({ fail("parsing multiple types of MessageParameters should have succeeded") }) { rightResult ->
+ assertThat(rightResult).hasSize(2)
+ assertThat(rightResult[0] is VesEventParameters)
+ assertThat(rightResult[1] is WireFrameParameters)
}
}
}
on("invalid parameters json") {
- it("should throw exception") {
- val result = messageParametersParser.parse(invalidMessagesParametesJson())
- assertThat(result.isLeft()).describedAs("is left").isTrue()
+ it("should verify messageAmount") {
+ cut
+ .parse(nonNumberMessageAmountParameters())
+ .assertFailedWithError { it.isInstanceOf(ParsingError::class.java) }
+ }
+
+ it("should verify messageType") {
+ cut
+ .parse(missingMessageTypeParameters())
+ .assertFailedWithError { it.isInstanceOf(ParsingError::class.java) }
}
}
}
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
index 78cfa028..a4a0e08c 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,86 +21,117 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import javax.json.Json
-private const val validMessageParameters =
-"""[
- {
- "commonEventHeader": {
- "version": "sample-version",
- "domain": "perf3gpp",
- "sequence": 1,
- "priority": 1,
- "eventId": "sample-event-id",
- "eventName": "sample-event-name",
- "eventType": "sample-event-type",
- "startEpochMicrosec": 120034455,
- "lastEpochMicrosec": 120034455,
- "nfNamingCode": "sample-nf-naming-code",
- "nfcNamingCode": "sample-nfc-naming-code",
- "reportingEntityId": "sample-reporting-entity-id",
- "reportingEntityName": "sample-reporting-entity-name",
- "sourceId": "sample-source-id",
- "sourceName": "sample-source-name",
- "vesEventListenerVersion": "another-version"
- },
- "messageType": "VALID",
- "messagesAmount": 25000
- },
- {
- "commonEventHeader": {
- "version": "sample-version",
- "domain": "perf3gpp",
- "sequence": 1,
- "priority": 1,
- "eventId": "sample-event-id",
- "eventName": "sample-event-name",
- "eventType": "sample-event-type",
- "startEpochMicrosec": 120034455,
- "lastEpochMicrosec": 120034455,
- "nfNamingCode": "sample-nf-naming-code",
- "nfcNamingCode": "sample-nfc-naming-code",
- "reportingEntityId": "sample-reporting-entity-id",
- "reportingEntityName": "sample-reporting-entity-name",
- "sourceId": "sample-source-id",
- "sourceName": "sample-source-name",
- "vesEventListenerVersion": "another-version"
- },
- "messageType": "TOO_BIG_PAYLOAD",
- "messagesAmount": 100
- }
- ]
+
+fun multipleMessageParameters() = readArray(multipleMessageParameters)
+
+fun validVesEventParameters() = readArray(validVesEventParameters)
+
+fun validWireFrameParameters() = readArray(validWireFrameParameters)
+
+fun missingMessageTypeParameters() = readArray(missingMessageTypeParameters)
+
+fun nonNumberMessageAmountParameters() = readArray(nonNumberMessageAmountParameters)
+
+private const val validVesEventParameters = """
+[
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "perf3gpp",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "VALID",
+ "messagesAmount": 25000
+ }
+]
"""
-private const val invalidMessageParameters =
+private const val validWireFrameParameters = """
+[
+ {
+ "messageType": "INVALID_GPB_DATA",
+ "messagesAmount": 100
+ }
+]
"""
- [
- {
- "commonEventHeader": {
- "version": "sample-version",
- "domain": "perf3gpp",
- "sequence": 1,
- "priority": 1,
- "eventId": "sample-event-id",
- "eventName": "sample-event-name",
- "eventType": "sample-event-type",
- "startEpochMicrosec": 120034455,
- "lastEpochMicrosec": 120034455,
- "nfNamingCode": "sample-nf-naming-code",
- "nfcNamingCode": "sample-nfc-naming-code",
- "reportingEntityId": "sample-reporting-entity-id",
- "reportingEntityName": "sample-reporting-entity-name",
- "sourceId": "sample-source-id",
- "sourceName": "sample-source-name",
- "vesEventListenerVersion": "another-version"
- },
- "messagesAmount": 3
- }
- ]
+
+private const val multipleMessageParameters = """
+[
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "perf3gpp",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "VALID",
+ "messagesAmount": 25000
+ },
+ {
+ "messageType": "INVALID_GPB_DATA",
+ "messagesAmount": 100
+ }
+]
"""
-fun validMessagesParametesJson() = Json
- .createReader(validMessageParameters.reader())
- .readArray()!!
+private const val missingMessageTypeParameters = """
+[
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "perf3gpp",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messagesAmount": 3
+ }
+]
+"""
+
+private const val nonNumberMessageAmountParameters = """
+[
+ {
+ "messageType": "INVALID_GPB_DATA",
+ "messagesAmount": "123"
+ }
+]
+"""
-fun invalidMessagesParametesJson() = Json
- .createReader(invalidMessageParameters.reader())
- .readArray()!!
+private fun readArray(json: String) = Json.createReader(json.reader()).readArray()!!
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt
index 3a33c44a..04222d1e 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
import arrow.core.Option
import arrow.core.identity
@@ -37,7 +37,7 @@ import kotlin.test.fail
class CommonEventHeaderParserTest : Spek({
describe("Common event header parser") {
- val parser = CommonEventHeaderParser()
+ val cut = CommonEventHeaderParser()
given("valid header in JSON format") {
val commonEventHeader = commonHeader(
@@ -47,7 +47,7 @@ class CommonEventHeaderParserTest : Spek({
it("should parse common event header") {
val result =
- parser.parse(jsonObject(json))
+ cut.parse(jsonObject(json))
.fold({ fail() }, ::identity)
assertThat(result).describedAs("common event header").isEqualTo(commonEventHeader)
@@ -58,7 +58,7 @@ class CommonEventHeaderParserTest : Spek({
val json = "{}".byteInputStream()
it("should throw exception") {
- val result = parser.parse(jsonObject(json))
+ val result = cut.parse(jsonObject(json))
assertFailed(result)
}
@@ -68,7 +68,7 @@ class CommonEventHeaderParserTest : Spek({
val json = "{}}}}".byteInputStream()
it("should throw exception") {
- val result = parser.parse(jsonObject(json))
+ val result = cut.parse(jsonObject(json))
assertFailed(result)
}
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt
index bb91245d..2d77bb9f 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
@@ -28,11 +28,11 @@ import org.jetbrains.spek.api.dsl.on
object PayloadGeneratorTest : Spek({
given("payload factory object") {
- val payloadGenerator = PayloadGenerator()
+ val cut = PayloadGenerator()
on("raw payload generation") {
val size = 100
- val generatedPayload = payloadGenerator.generateRawPayload(size)
+ val generatedPayload = cut.generateRawPayload(size)
it("should generate sequence of zeros") {
assertThat(generatedPayload.size()).isEqualTo(size)
@@ -41,8 +41,8 @@ object PayloadGeneratorTest : Spek({
}
on("two generated payloads") {
- val generatedPayload0 = payloadGenerator.generatePayload()
- val generatedPayload1 = payloadGenerator.generatePayload()
+ val generatedPayload0 = cut.generatePayload()
+ val generatedPayload1 = cut.generatePayload()
it("should be different") {
assertThat(generatedPayload0 != generatedPayload1).isTrue()
}
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt
new file mode 100644
index 00000000..2f13c52e
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt
@@ -0,0 +1,143 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
+import reactor.test.test
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+object VesEventGeneratorTest : Spek({
+ describe("message factory") {
+ val maxPayloadSizeBytes = 1024
+ val cut = VesEventGenerator(PayloadGenerator(), maxPayloadSizeBytes)
+
+ given("single message parameters") {
+ on("messages amount not specified in parameters") {
+ it("should createVesEventGenerator infinite flux") {
+ val limit = 1000L
+ cut
+ .createMessageFlux(VesEventParameters(
+ commonHeader(PERF3GPP),
+ VesEventType.VALID
+ ))
+ .take(limit)
+ .test()
+ .expectNextCount(limit)
+ .verifyComplete()
+ }
+ }
+
+ on("messages amount = 0 specified in parameters") {
+ it("should createVesEventGenerator empty message flux") {
+ cut
+ .createMessageFlux(VesEventParameters(
+ commonHeader(PERF3GPP),
+ VesEventType.VALID,
+ 0
+ ))
+ .test()
+ .verifyComplete()
+ }
+ }
+
+ on("messages amount specified in parameters") {
+ it("should createVesEventGenerator message flux of specified size") {
+ cut
+ .createMessageFlux(VesEventParameters(
+ commonHeader(PERF3GPP),
+ VesEventType.VALID,
+ 5
+ ))
+ .test()
+ .expectNextCount(5)
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting valid message") {
+ it("should createVesEventGenerator flux of valid messages with given domain") {
+ cut
+ .createMessageFlux(VesEventParameters(
+ commonHeader(FAULT),
+ VesEventType.VALID,
+ 1
+ ))
+ .test()
+ .assertNext {
+ assertThat(it.toByteArray().size).isLessThan(maxPayloadSizeBytes)
+ assertThat(it.commonEventHeader.domain).isEqualTo(FAULT.domainName)
+ }
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting too big payload") {
+ it("should createVesEventGenerator flux of messages with given domain and payload exceeding threshold") {
+
+ cut
+ .createMessageFlux(VesEventParameters(
+ commonHeader(PERF3GPP),
+ VesEventType.TOO_BIG_PAYLOAD,
+ 1
+ ))
+ .test()
+ .assertNext {
+ assertThat(it.toByteArray().size).isGreaterThan(maxPayloadSizeBytes)
+ assertThat(it.commonEventHeader.domain).isEqualTo(PERF3GPP.domainName)
+ }
+ .verifyComplete()
+ }
+ }
+
+
+
+ on("message type requesting fixed payload") {
+ it("should createVesEventGenerator flux of valid messages with fixed payload") {
+ cut
+ .createMessageFlux(VesEventParameters(
+ commonHeader(FAULT),
+ VesEventType.FIXED_PAYLOAD,
+ 1
+ ))
+ .test()
+ .assertNext {
+ assertThat(it.toByteArray().size).isLessThan(maxPayloadSizeBytes)
+ assertThat(it.eventFields.size()).isEqualTo(VesEventGenerator.FIXED_PAYLOAD_SIZE)
+ assertThat(it.commonEventHeader.domain).isEqualTo(FAULT.domainName)
+ }
+ .verifyComplete()
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt
new file mode 100644
index 00000000..f8c84c39
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2010 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe
+
+import com.google.protobuf.InvalidProtocolBufferException
+import org.assertj.core.api.Assertions
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
+import org.onap.ves.VesEventOuterClass
+import reactor.test.test
+import kotlin.test.assertTrue
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+object WireFrameGeneratorTest : Spek({
+
+ val maxPayloadSizeBytes = 1024
+ val cut = WireFrameGenerator()
+
+ on("message type requesting invalid GPB data ") {
+ it("should createVesEventGenerator flux of messages with invalid payload") {
+ cut
+ .createMessageFlux(WireFrameParameters(
+ WireFrameType.INVALID_GPB_DATA, 1
+ ))
+ .test()
+ .assertNext {
+ assertTrue(it.validate().isRight())
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+ .isThrownBy { extractCommonEventHeader(it.payload) }
+ }
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting invalid wire frame ") {
+ it("should createVesEventGenerator flux of messages with invalid version") {
+ cut
+ .createMessageFlux(WireFrameParameters(
+ WireFrameType.INVALID_WIRE_FRAME, 1
+ ))
+ .test()
+ .assertNext {
+ assertTrue(it.validate().isLeft())
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
+ }
+ .verifyComplete()
+ }
+ }
+
+})
+
+fun extractCommonEventHeader(bytes: ByteData): VesEventOuterClass.CommonEventHeader =
+ VesEventOuterClass.VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader