aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-dcae-app-simulator/pom.xml5
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt4
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt35
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt151
-rw-r--r--hv-collector-utils/pom.xml13
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt6
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt9
-rw-r--r--hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt63
-rw-r--r--hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt98
-rw-r--r--hv-collector-ves-message-generator/pom.xml4
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt39
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt2
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt3
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt52
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt1
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt22
17 files changed, 320 insertions, 193 deletions
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index a2f92e81..f3c17357 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -93,6 +93,11 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ves-message-generator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-effects</artifactId>
</dependency>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
index 264033e3..065cdf92 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
@@ -28,7 +28,9 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
override val cmdLineOptionsList: List<CommandLineOption> = listOf(
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
index 869c5ab6..08bb149f 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -19,17 +19,24 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
-import arrow.core.Option
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
+import java.util.concurrent.ConcurrentLinkedQueue
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+ val messagesCount: Int by lazy {
+ messages.size
+ }
-class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
+ val consumedMessages: List<ByteArray> by lazy {
+ messages.toList()
+ }
+}
interface ConsumerStateProvider {
fun currentState(): ConsumerState
@@ -37,31 +44,21 @@ interface ConsumerStateProvider {
}
class Consumer : ConsumerStateProvider {
- private var msgCount = 0L
- private var lastKey: ByteArray? = null
- private var lastValue: ByteArray? = null
- override fun currentState() =
- ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+ private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
- override fun reset() = IO {
- synchronized(this) {
- msgCount = 0
- lastKey = null
- lastValue = null
- }
+ override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
+
+ override fun reset(): IO<Unit> = IO {
+ consumedMessages.clear()
}
fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
-
- synchronized(this) {
- msgCount++
- lastKey = record.key()
- lastValue = record.value()
- }
+ consumedMessages.add(record.value())
}
+
companion object {
private val logger = Logger(Consumer::class)
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index d1d90b00..cb1484b7 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,28 +19,31 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-import arrow.core.Try
-import arrow.core.getOrElse
import arrow.effects.IO
-import com.google.protobuf.MessageOrBuilder
-import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
+import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
+import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import javax.json.Json
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val consumerFactory: ConsumerFactory) {
+class ApiServer(private val consumerFactory: ConsumerFactory,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
private lateinit var consumerState: ConsumerStateProvider
- private val jsonPrinter = JsonFormat.printer()
fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
@@ -57,60 +60,92 @@ class ApiServer(private val consumerFactory: ConsumerFactory) {
val topics = extractTopics(it.text)
logger.info("Received new configuration. Creating consumer for topics: $topics")
consumerState = consumerFactory.createConsumerForTopics(topics)
- ctx.response.contentType(CONTENT_TEXT)
- ctx.response.send("OK")
+ ctx.response
+ .status(STATUS_OK)
+ .send()
}
}
-
- .get("messages/count") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
- val state = consumerState.currentState()
- ctx.response.send(state.msgCount.toString())
- }
-
- .get("messages/last/key") { ctx ->
- ctx.response.contentType(CONTENT_JSON)
- val state = consumerState.currentState()
- val resp = state.lastKey
- .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
- .map(this::protobufToJson)
- .getOrElse { "null" }
- ctx.response.send(resp)
- }
-
- .get("messages/last/value") { ctx ->
- ctx.response.contentType(CONTENT_JSON)
- val state = consumerState.currentState()
- val resp = state.lastValue
- .map { Try { VesEvent.parseFrom(it) } }
- .map(this::protobufToJson)
- .getOrElse { "null" }
- ctx.response.send(resp)
- }
-
- .get("messages/last/hvRanMeasFields") { ctx ->
- ctx.response.contentType(CONTENT_JSON)
- val state = consumerState.currentState()
- val resp = state.lastValue
- .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
- .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
- .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
- .map(this::protobufToJson)
- .getOrElse { "null" }
- ctx.response.send(resp)
- }
-
.delete("messages") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
consumerState.reset()
.unsafeRunAsync {
it.fold(
- { ctx.response.send("NOK") },
- { ctx.response.send("OK") }
- )
+ { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
+ { ctx.response.status(STATUS_OK) }
+ ).send()
}
}
+ .get("messages/all/count") { ctx ->
+ val state = consumerState.currentState()
+ ctx.response
+ .contentType(CONTENT_TEXT)
+ .send(state.messagesCount.toString())
+ }
+ .post("messages/all/validate") { ctx ->
+ ctx.request.body
+ .map { Json.createReader(it.inputStream).readArray() }
+ .map { messageParametersParser.parse(it) }
+ .map { generateEvents(ctx, it) }
+ .then { (generatedEvents, shouldValidatePayloads) ->
+ generatedEvents
+ .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
+ .block()
+ }
+ }
+ }
+
+ private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
+ Pair<Mono<List<VesEvent>>, Boolean> = Pair(
+
+ doGenerateEvents(parameters).doOnError {
+ logger.error("Error occurred when generating messages: $it")
+ ctx.response
+ .status(STATUS_INTERNAL_SERVER_ERROR)
+ .send()
+ },
+ parameters.all { it.messageType == FIXED_PAYLOAD }
+ )
+
+ private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
+ .createMessageFlux(parameters)
+ .map(PayloadWireFrameMessage::payload)
+ .map { decode(it.unsafeAsArray()) }
+ .collectList()
+
+
+ private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
+
+
+ private fun sendResponse(ctx: Context,
+ generatedEvents: List<VesEvent>,
+ shouldValidatePayloads: Boolean) =
+ resolveResponseStatusCode(
+ generated = generatedEvents,
+ consumed = decodeConsumedEvents(),
+ validatePayloads = shouldValidatePayloads
+ ).let { ctx.response.status(it).send() }
+
+
+ private fun decodeConsumedEvents(): List<VesEvent> = consumerState
+ .currentState()
+ .consumedMessages
+ .map(::decode)
+
+
+ private fun resolveResponseStatusCode(generated: List<VesEvent>,
+ consumed: List<VesEvent>,
+ validatePayloads: Boolean): Int =
+ if (validatePayloads) {
+ if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
+ } else {
+ validateHeaders(consumed, generated)
+ }
+
+ private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
+ val consumedHeaders = consumed.map { it.commonEventHeader }
+ val generatedHeaders = generated.map { it.commonEventHeader }
+ return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
}
private fun extractTopics(it: String): Set<String> =
@@ -118,16 +153,14 @@ class ApiServer(private val consumerFactory: ConsumerFactory) {
.split(",")
.toSet()
- private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
- parseResult.fold(
- { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
- { jsonPrinter.print(it) })
-
-
companion object {
private val logger = Logger(ApiServer::class)
-
private const val CONTENT_TEXT = "text/plain"
- private const val CONTENT_JSON = "application/json"
+
+ private const val STATUS_OK = 200
+ private const val STATUS_BAD_REQUEST = 400
+ private const val STATUS_INTERNAL_SERVER_ERROR = 500
}
}
+
+
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index d0e44932..39097c10 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -61,19 +61,10 @@
<dependencies>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-ves-message-generator</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
@@ -98,6 +89,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.nhaarman</groupId>
<artifactId>mockito-kotlin</artifactId>
</dependency>
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
index 9c873a0f..c00ce68d 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
@@ -54,20 +54,16 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
protected abstract fun getConfiguration(cmdLine: CommandLine): Option<T>
- protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
- intValue(cmdLineOpt).getOrElse { default }
-
protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
longValue(cmdLineOpt).getOrElse { default }
protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
optionValue(cmdLineOpt).getOrElse { default }
-
protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
optionValue(cmdLineOpt).map(String::toInt)
- protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
+ private fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
optionValue(cmdLineOpt).map(String::toLong)
protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt
index 24c2cbfa..1621ba59 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt
@@ -35,14 +35,17 @@ class MessageParametersParser(
request
.map { it.asJsonObject() }
.map {
- val commonEventHeader = commonEventHeaderParser.parse(it.getJsonObject("commonEventHeader"))
+ val commonEventHeader = commonEventHeaderParser
+ .parse(it.getJsonObject("commonEventHeader"))
val messageType = MessageType.valueOf(it.getString("messageType"))
- val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
+ val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue()
+ ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.",
+ NullPointerException())
MessageParameters(commonEventHeader, messageType, messagesAmount)
}
} catch (e: Exception) {
throw ParsingException("Parsing request body failed", e)
}
- internal class ParsingException(message: String?, cause: Exception) : Exception(message, cause)
+ internal class ParsingException(message: String, cause: Exception) : Exception(message, cause)
}
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt
new file mode 100644
index 00000000..ec628a2a
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.messages
+
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser.ParsingException
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+
+private const val EXPECTED_MESSAGES_AMOUNT = 25000L
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+object MessageParametersParserTest : Spek({
+ describe("Messages parameters parser") {
+ val messageParametersParser = MessageParametersParser()
+
+ given("parameters json array") {
+ on("valid parameters json") {
+ it("should parse MessagesParameters object successfully") {
+ val result = messageParametersParser.parse(validMessagesParametesJson())
+
+ assertThat(result).isNotNull
+ assertThat(result).hasSize(2)
+ val firstMessage = result.first()
+ assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
+ assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+ }
+ }
+ on("invalid parameters json") {
+ it("should throw exception") {
+ assertThatExceptionOfType(ParsingException::class.java).isThrownBy {
+ messageParametersParser.parse(invalidMessagesParametesJson())
+ }
+ }
+ }
+ }
+ }
+})
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt
new file mode 100644
index 00000000..f6a3a15b
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.messages
+
+import javax.json.Json
+
+private const val validMessageParameters = "[\n" +
+ " {\n" +
+ " \"commonEventHeader\": {\n" +
+ " \"version\": \"sample-version\",\n" +
+ " \"domain\": \"HVRANMEAS\",\n" +
+ " \"sequence\": 1,\n" +
+ " \"priority\": 1,\n" +
+ " \"eventId\": \"sample-event-id\",\n" +
+ " \"eventName\": \"sample-event-name\",\n" +
+ " \"eventType\": \"sample-event-type\",\n" +
+ " \"startEpochMicrosec\": 120034455,\n" +
+ " \"lastEpochMicrosec\": 120034455,\n" +
+ " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
+ " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
+ " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
+ " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
+ " \"sourceId\": \"sample-source-id\",\n" +
+ " \"sourceName\": \"sample-source-name\"\n" +
+ " },\n" +
+ " \"messageType\": \"VALID\",\n" +
+ " \"messagesAmount\": 25000\n" +
+ " },\n" +
+ " {\n" +
+ " \"commonEventHeader\": {\n" +
+ " \"version\": \"sample-version\",\n" +
+ " \"domain\": \"HVRANMEAS\",\n" +
+ " \"sequence\": 1,\n" +
+ " \"priority\": 1,\n" +
+ " \"eventId\": \"sample-event-id\",\n" +
+ " \"eventName\": \"sample-event-name\",\n" +
+ " \"eventType\": \"sample-event-type\",\n" +
+ " \"startEpochMicrosec\": 120034455,\n" +
+ " \"lastEpochMicrosec\": 120034455,\n" +
+ " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
+ " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
+ " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
+ " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
+ " \"sourceId\": \"sample-source-id\",\n" +
+ " \"sourceName\": \"sample-source-name\"\n" +
+ " },\n" +
+ " \"messageType\": \"TOO_BIG_PAYLOAD\",\n" +
+ " \"messagesAmount\": 100\n" +
+ " }\n" +
+ "]"
+
+private const val invalidMessageParameters = "[\n" +
+ " {\n" +
+ " \"commonEventHeader\": {\n" +
+ " \"version\": \"sample-version\",\n" +
+ " \"domain\": \"HVRANMEAS\",\n" +
+ " \"sequence\": 1,\n" +
+ " \"priority\": 1,\n" +
+ " \"eventId\": \"sample-event-id\",\n" +
+ " \"eventName\": \"sample-event-name\",\n" +
+ " \"eventType\": \"sample-event-type\",\n" +
+ " \"startEpochMicrosec\": 120034455,\n" +
+ " \"lastEpochMicrosec\": 120034455,\n" +
+ " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
+ " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
+ " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
+ " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
+ " \"sourceId\": \"sample-source-id\",\n" +
+ " \"sourceName\": \"sample-source-name\"\n" +
+ " },\n" +
+ " \"messagesAmount\": 3\n" +
+ " }\n" +
+ "]"
+
+fun validMessagesParametesJson() = Json
+ .createReader(validMessageParameters.reader())
+ .readArray()
+
+fun invalidMessagesParametesJson() = Json
+ .createReader(invalidMessageParameters.reader())
+ .readArray()
diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml
index f049d78f..a7dad24b 100644
--- a/hv-collector-ves-message-generator/pom.xml
+++ b/hv-collector-ves-message-generator/pom.xml
@@ -95,10 +95,6 @@
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
- <dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- </dependency>
</dependencies>
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt
deleted file mode 100644
index 605b1729..00000000
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ves.message.generator.api
-
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParserImpl
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import javax.json.JsonObject
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-interface CommonEventHeaderParser {
-
- fun parse(json: JsonObject): CommonEventHeader
-
- companion object {
- val INSTANCE: CommonEventHeaderParser by lazy {
- CommonEventHeaderParserImpl()
- }
- }
-} \ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
index 7407f692..d9329cb0 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -35,6 +35,8 @@ interface MessageGenerator {
val INSTANCE: MessageGenerator by lazy {
MessageGeneratorImpl(PayloadGenerator())
}
+
+ const val FIXED_PAYLOAD_SIZE = 100
}
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
index 0ac90544..22c88252 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
@@ -26,6 +26,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api
enum class MessageType {
VALID,
TOO_BIG_PAYLOAD,
+ FIXED_PAYLOAD,
INVALID_WIRE_FRAME,
- INVALID_GPB_DATA
+ INVALID_GPB_DATA,
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt
deleted file mode 100644
index 61f5f2f3..00000000
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
-
-import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.CommonEventHeaderParser
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
-import javax.json.JsonObject
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-class CommonEventHeaderParserImpl : CommonEventHeaderParser {
-
- override fun parse(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
- .setVersion(json.getString("version"))
- .setDomain(Domain.valueOf(json.getString("domain")))
- .setSequence(json.getInt("sequence"))
- .setPriority(Priority.forNumber(json.getInt("priority")))
- .setEventId(json.getString("version"))
- .setEventName(json.getString("version"))
- .setEventType(json.getString("version"))
- .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
- .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
- .setNfNamingCode(json.getString("nfNamingCode"))
- .setNfcNamingCode(json.getString("nfcNamingCode"))
- .setReportingEntityId(json.getString("reportingEntityId"))
- .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
- .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
- .setSourceName(json.getString("sourceName"))
- .build()
-} \ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index e9db716d..c6e0556b 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
@@ -63,6 +64,8 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
TOO_BIG_PAYLOAD ->
PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
+ FIXED_PAYLOAD ->
+ PayloadWireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
INVALID_WIRE_FRAME -> {
val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
PayloadWireFrameMessage(
@@ -92,6 +95,9 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
private fun oversizedPayload() =
payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+ private fun fixedPayload() =
+ payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
+
companion object {
private const val UNSUPPORTED_VERSION: Short = 2
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
index c85ce035..acdaf19f 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
@@ -62,5 +62,4 @@ internal class PayloadGenerator {
private const val URI_BASE_NAME = "sample/uri"
private const val UPPER_URI_NUMBER_BOUND = 10_000
}
-
}
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
index b2490dd1..1e38d46e 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
@@ -146,6 +146,24 @@ object MessageGeneratorImplTest : Spek({
.verifyComplete()
}
}
+ on("message type requesting fixed payload") {
+ it("should create flux of valid messages with fixed payload") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ createSampleCommonHeader(FAULT),
+ MessageType.FIXED_PAYLOAD,
+ 1
+ )))
+ .test()
+ .assertNext {
+ assertThat(it.isValid()).isTrue()
+ assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ }
+ .verifyComplete()
+ }
+ }
}
given("list of message parameters") {
it("should create concatenated flux of messages") {
@@ -182,6 +200,10 @@ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
}
+fun extractHvRanMeasFields(bytes: ByteData): ByteString {
+ return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields
+}
+
private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder()
.setVersion("sample-version")
.setDomain(domain)