aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ves-message-generator/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ves-message-generator/src')
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt36
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt30
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt37
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt32
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt33
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt50
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt109
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt57
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt40
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt82
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt227
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt64
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt51
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt106
14 files changed, 954 insertions, 0 deletions
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
new file mode 100644
index 00000000..076c06be
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+interface MessageGenerator {
+ fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
+
+ companion object {
+ const val FIXED_PAYLOAD_SIZE = 100
+ }
+}
+
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
new file mode 100644
index 00000000..047d863c
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class MessageParameters(val commonEventHeader: CommonEventHeader,
+ val messageType: MessageType,
+ val amount: Long = -1)
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
new file mode 100644
index 00000000..754fa31f
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+import arrow.core.Either
+import arrow.core.Option
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
+import javax.json.JsonArray
+
+interface MessageParametersParser {
+ fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>>
+
+ companion object {
+ val INSTANCE: MessageParametersParser by lazy {
+ MessageParametersParserImpl()
+ }
+ }
+}
+
+data class ParsingError(val message: String, val cause: Option<Throwable>)
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
new file mode 100644
index 00000000..22c88252
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+enum class MessageType {
+ VALID,
+ TOO_BIG_PAYLOAD,
+ FIXED_PAYLOAD,
+ INVALID_WIRE_FRAME,
+ INVALID_GPB_DATA,
+}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
new file mode 100644
index 00000000..e2269c20
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.factory
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since October 2018
+ */
+object MessageGeneratorFactory {
+ fun create(maxPayloadSizeBytes: Int): MessageGenerator =
+ MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
new file mode 100644
index 00000000..909db5e4
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import arrow.core.Option
+import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import javax.json.JsonObject
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+class CommonEventHeaderParser {
+ fun parse(json: JsonObject): Option<CommonEventHeader> =
+ Option.fromNullable(
+ CommonEventHeader.newBuilder()
+ .apply { JsonFormat.parser().merge(json.toString(), this) }
+ .build()
+ .takeUnless { !isValid(it) }
+ )
+
+
+ private fun isValid(header: CommonEventHeader): Boolean {
+ return allMandatoryFieldsArePresent(header)
+ }
+
+ private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
+ headerRequiredFieldDescriptors
+ .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
+
+}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
new file mode 100644
index 00000000..fa39ed16
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import com.google.protobuf.ByteString
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadContentType
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.charset.Charset
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class MessageGeneratorImpl internal constructor(
+ private val payloadGenerator: PayloadGenerator,
+ private val maxPayloadSizeBytes: Int
+) : MessageGenerator {
+
+ override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
+ .fromIterable(messageParameters)
+ .flatMap { createMessageFlux(it) }
+
+ private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> =
+ Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
+ .let {
+ when {
+ parameters.amount < 0 ->
+ // repeat forever
+ it.repeat()
+ parameters.amount == 0L ->
+ // do not generate any message
+ Flux.empty()
+ else ->
+ // send original message and additional amount-1 messages
+ it.repeat(parameters.amount - 1)
+ }
+ }
+
+ private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage =
+ when (messageType) {
+ VALID ->
+ WireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
+ TOO_BIG_PAYLOAD ->
+ WireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
+ FIXED_PAYLOAD ->
+ WireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
+ INVALID_WIRE_FRAME -> {
+ val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
+ WireFrameMessage(
+ payload,
+ UNSUPPORTED_VERSION,
+ UNSUPPORTED_VERSION,
+ PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payload.size())
+ }
+ INVALID_GPB_DATA ->
+ WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+ }
+
+ private fun vesEvent(commonEventHeader: CommonEventHeader, eventFields: ByteString): ByteArray {
+ return createVesEvent(commonEventHeader, eventFields).toByteArray()
+ }
+
+ private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
+ VesEvent.newBuilder()
+ .setCommonEventHeader(commonEventHeader)
+ .setEventFields(payload)
+ .build()
+
+ private fun oversizedPayload() =
+ payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
+
+ private fun fixedPayload() =
+ payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
+
+ companion object {
+ private const val UNSUPPORTED_VERSION: Short = 2
+ }
+}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
new file mode 100644
index 00000000..6ef6d53a
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.identity
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import javax.json.JsonArray
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+internal class MessageParametersParserImpl(
+ private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()
+) : MessageParametersParser {
+
+ override fun parse(request: JsonArray) =
+ Try {
+ request
+ .map { it.asJsonObject() }
+ .map { json ->
+ val commonEventHeader = commonEventHeaderParser
+ .parse(json.getJsonObject("commonEventHeader"))
+ .fold({ throw IllegalStateException("Invalid common header") }, ::identity)
+ val messageType = MessageType.valueOf(json.getString("messageType"))
+ val messagesAmount = json.getJsonNumber("messagesAmount")?.longValue()
+ ?: throw NullPointerException("\"messagesAmount\" could not be parsed.")
+ MessageParameters(commonEventHeader, messageType, messagesAmount)
+ }
+ }.toEither().mapLeft { ex ->
+ ParsingError(
+ ex.message ?: "Unable to parse message parameters",
+ Option.fromNullable(ex))
+ }
+}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
new file mode 100644
index 00000000..545e237c
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import com.google.protobuf.ByteString
+import java.util.*
+import kotlin.streams.asSequence
+
+internal class PayloadGenerator {
+
+ private val randomGenerator = Random()
+
+ fun generateRawPayload(size: Int): ByteString =
+ ByteString.copyFrom(ByteArray(size))
+
+ fun generatePayload(numOfCountMeasurements: Long = 2): ByteString =
+ ByteString.copyFrom(
+ randomGenerator.ints(numOfCountMeasurements, 0, 256)
+ .asSequence()
+ .toString()
+ .toByteArray()
+ )
+}
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
new file mode 100644
index 00000000..3a33c44a
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import arrow.core.Option
+import arrow.core.identity
+import com.google.protobuf.util.JsonFormat
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import java.io.ByteArrayInputStream
+import javax.json.Json
+import kotlin.test.fail
+
+class CommonEventHeaderParserTest : Spek({
+
+ describe("Common event header parser") {
+ val parser = CommonEventHeaderParser()
+
+ given("valid header in JSON format") {
+ val commonEventHeader = commonHeader(
+ domain = STATE_CHANGE,
+ id = "sample-event-id")
+ val json = JsonFormat.printer().print(commonEventHeader).byteInputStream()
+
+ it("should parse common event header") {
+ val result =
+ parser.parse(jsonObject(json))
+ .fold({ fail() }, ::identity)
+
+ assertThat(result).describedAs("common event header").isEqualTo(commonEventHeader)
+ }
+ }
+
+ given("invalid header in JSON format") {
+ val json = "{}".byteInputStream()
+
+ it("should throw exception") {
+ val result = parser.parse(jsonObject(json))
+
+ assertFailed(result)
+ }
+ }
+
+ given("invalid JSON") {
+ val json = "{}}}}".byteInputStream()
+
+ it("should throw exception") {
+ val result = parser.parse(jsonObject(json))
+
+ assertFailed(result)
+ }
+ }
+ }
+})
+
+fun assertFailed(result: Option<CommonEventHeader>) =
+ result.fold({}, { fail() })
+
+fun jsonObject(json: ByteArrayInputStream) = Json.createReader(json).readObject()!! \ No newline at end of file
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
new file mode 100644
index 00000000..e2aec7df
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -0,0 +1,227 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import com.google.protobuf.ByteString
+import com.google.protobuf.InvalidProtocolBufferException
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.test.test
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+object MessageGeneratorImplTest : Spek({
+ describe("message factory") {
+ val maxPayloadSizeBytes = 1024
+ val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+ given("single message parameters") {
+
+ on("messages amount not specified in parameters") {
+ it("should create infinite flux") {
+ val limit = 1000L
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.VALID
+ )))
+ .take(limit)
+ .test()
+ .expectNextCount(limit)
+ .verifyComplete()
+ }
+ }
+
+ on("messages amount = 0 specified in parameters") {
+ it("should create empty message flux") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.VALID,
+ 0
+ )))
+ .test()
+ .verifyComplete()
+ }
+ }
+
+ on("messages amount specified in parameters") {
+ it("should create message flux of specified size") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.VALID,
+ 5
+ )))
+ .test()
+ .expectNextCount(5)
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting valid message") {
+ it("should create flux of valid messages with given domain") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(FAULT),
+ MessageType.VALID,
+ 1
+ )))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
+ }
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting too big payload") {
+ it("should create flux of messages with given domain and payload exceeding threshold") {
+
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.TOO_BIG_PAYLOAD,
+ 1
+ )))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
+ }
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting invalid GPB data ") {
+ it("should create flux of messages with invalid payload") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.INVALID_GPB_DATA,
+ 1
+ )))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+ .isThrownBy { extractCommonEventHeader(it.payload) }
+ }
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting invalid wire frame ") {
+ it("should create flux of messages with invalid version") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.INVALID_WIRE_FRAME,
+ 1
+ )))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isFalse()
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
+ assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
+ }
+ .verifyComplete()
+ }
+ }
+
+ on("message type requesting fixed payload") {
+ it("should create flux of valid messages with fixed payload") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(FAULT),
+ MessageType.FIXED_PAYLOAD,
+ 1
+ )))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
+ }
+ .verifyComplete()
+ }
+ }
+ }
+ given("list of message parameters") {
+ it("should create concatenated flux of messages") {
+ val singleFluxSize = 5L
+ val messageParameters = listOf(
+ MessageParameters(commonHeader(PERF3GPP), MessageType.VALID, singleFluxSize),
+ MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
+ MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
+ )
+ generator.createMessageFlux(messageParameters)
+ .test()
+ .assertNext {
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
+ }
+ .expectNextCount(singleFluxSize - 1)
+ .assertNext {
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
+ }
+ .expectNextCount(singleFluxSize - 1)
+ .assertNext {
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
+ }
+ .expectNextCount(singleFluxSize - 1)
+ .verifyComplete()
+ }
+ }
+ }
+})
+
+fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
+ VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+
+
+fun extractEventFields(bytes: ByteData): ByteString =
+ VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields
+
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
new file mode 100644
index 00000000..134ebb2d
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+
+private const val EXPECTED_MESSAGES_AMOUNT = 25000L
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+object MessageParametersParserTest : Spek({
+ describe("Messages parameters parser") {
+ val messageParametersParser = MessageParametersParserImpl()
+
+ given("parameters json array") {
+ on("valid parameters json") {
+ it("should parse MessagesParameters object successfully") {
+ val result = messageParametersParser.parse(validMessagesParametesJson())
+
+ result.fold({ fail("should have succeeded") }) { rightResult ->
+ assertThat(rightResult).hasSize(2)
+ val firstMessage = rightResult.first()
+ assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
+ assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+
+ }
+ }
+ }
+
+ on("invalid parameters json") {
+ it("should throw exception") {
+ val result = messageParametersParser.parse(invalidMessagesParametesJson())
+ assertThat(result.isLeft()).describedAs("is left").isTrue()
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
new file mode 100644
index 00000000..bb91245d
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+object PayloadGeneratorTest : Spek({
+
+ given("payload factory object") {
+ val payloadGenerator = PayloadGenerator()
+
+ on("raw payload generation") {
+ val size = 100
+ val generatedPayload = payloadGenerator.generateRawPayload(size)
+
+ it("should generate sequence of zeros") {
+ assertThat(generatedPayload.size()).isEqualTo(size)
+ assertThat(generatedPayload.toByteArray()).isEqualTo(ByteArray(size))
+ }
+ }
+
+ on("two generated payloads") {
+ val generatedPayload0 = payloadGenerator.generatePayload()
+ val generatedPayload1 = payloadGenerator.generatePayload()
+ it("should be different") {
+ assertThat(generatedPayload0 != generatedPayload1).isTrue()
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
new file mode 100644
index 00000000..78cfa028
--- /dev/null
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
@@ -0,0 +1,106 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+
+import javax.json.Json
+
+private const val validMessageParameters =
+"""[
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "perf3gpp",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "VALID",
+ "messagesAmount": 25000
+ },
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "perf3gpp",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "TOO_BIG_PAYLOAD",
+ "messagesAmount": 100
+ }
+ ]
+"""
+
+private const val invalidMessageParameters =
+"""
+ [
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "perf3gpp",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messagesAmount": 3
+ }
+ ]
+"""
+
+fun validMessagesParametesJson() = Json
+ .createReader(validMessageParameters.reader())
+ .readArray()!!
+
+fun invalidMessagesParametesJson() = Json
+ .createReader(invalidMessageParameters.reader())
+ .readArray()!!