diff options
17 files changed, 320 insertions, 193 deletions
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index a2f92e81..f3c17357 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -93,6 +93,11 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-ves-message-generator</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-effects</artifactId> </dependency> diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt index 264033e3..065cdf92 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt @@ -28,7 +28,9 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.* +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { override val cmdLineOptionsList: List<CommandLineOption> = listOf( diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt index 869c5ab6..08bb149f 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -19,17 +19,24 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka -import arrow.core.Option import arrow.effects.IO import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord +import java.util.concurrent.ConcurrentLinkedQueue /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ +class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){ + val messagesCount: Int by lazy { + messages.size + } -class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>) + val consumedMessages: List<ByteArray> by lazy { + messages.toList() + } +} interface ConsumerStateProvider { fun currentState(): ConsumerState @@ -37,31 +44,21 @@ interface ConsumerStateProvider { } class Consumer : ConsumerStateProvider { - private var msgCount = 0L - private var lastKey: ByteArray? = null - private var lastValue: ByteArray? = null - override fun currentState() = - ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue)) + private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() - override fun reset() = IO { - synchronized(this) { - msgCount = 0 - lastKey = null - lastValue = null - } + override fun currentState(): ConsumerState = ConsumerState(consumedMessages) + + override fun reset(): IO<Unit> = IO { + consumedMessages.clear() } fun update(record: ReceiverRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } - - synchronized(this) { - msgCount++ - lastKey = record.key() - lastValue = record.value() - } + consumedMessages.add(record.value()) } + companion object { private val logger = Logger(Consumer::class) } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt index d1d90b00..cb1484b7 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -19,28 +19,31 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote -import arrow.core.Try -import arrow.core.getOrElse import arrow.effects.IO -import com.google.protobuf.MessageOrBuilder -import com.google.protobuf.util.JsonFormat +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields +import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD import org.onap.ves.VesEventV5.VesEvent import ratpack.handling.Chain +import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.ServerConfig +import reactor.core.publisher.Mono +import javax.json.Json /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class ApiServer(private val consumerFactory: ConsumerFactory) { +class ApiServer(private val consumerFactory: ConsumerFactory, + private val messageParametersParser: MessageParametersParser = MessageParametersParser()) { private lateinit var consumerState: ConsumerStateProvider - private val jsonPrinter = JsonFormat.printer() fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO { consumerState = consumerFactory.createConsumerForTopics(kafkaTopics) @@ -57,60 +60,92 @@ class ApiServer(private val consumerFactory: ConsumerFactory) { val topics = extractTopics(it.text) logger.info("Received new configuration. Creating consumer for topics: $topics") consumerState = consumerFactory.createConsumerForTopics(topics) - ctx.response.contentType(CONTENT_TEXT) - ctx.response.send("OK") + ctx.response + .status(STATUS_OK) + .send() } } - - .get("messages/count") { ctx -> - ctx.response.contentType(CONTENT_TEXT) - val state = consumerState.currentState() - ctx.response.send(state.msgCount.toString()) - } - - .get("messages/last/key") { ctx -> - ctx.response.contentType(CONTENT_JSON) - val state = consumerState.currentState() - val resp = state.lastKey - .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } } - .map(this::protobufToJson) - .getOrElse { "null" } - ctx.response.send(resp) - } - - .get("messages/last/value") { ctx -> - ctx.response.contentType(CONTENT_JSON) - val state = consumerState.currentState() - val resp = state.lastValue - .map { Try { VesEvent.parseFrom(it) } } - .map(this::protobufToJson) - .getOrElse { "null" } - ctx.response.send(resp) - } - - .get("messages/last/hvRanMeasFields") { ctx -> - ctx.response.contentType(CONTENT_JSON) - val state = consumerState.currentState() - val resp = state.lastValue - .flatMap { Try { VesEvent.parseFrom(it) }.toOption() } - .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS } - .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } } - .map(this::protobufToJson) - .getOrElse { "null" } - ctx.response.send(resp) - } - .delete("messages") { ctx -> ctx.response.contentType(CONTENT_TEXT) consumerState.reset() .unsafeRunAsync { it.fold( - { ctx.response.send("NOK") }, - { ctx.response.send("OK") } - ) + { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) }, + { ctx.response.status(STATUS_OK) } + ).send() } } + .get("messages/all/count") { ctx -> + val state = consumerState.currentState() + ctx.response + .contentType(CONTENT_TEXT) + .send(state.messagesCount.toString()) + } + .post("messages/all/validate") { ctx -> + ctx.request.body + .map { Json.createReader(it.inputStream).readArray() } + .map { messageParametersParser.parse(it) } + .map { generateEvents(ctx, it) } + .then { (generatedEvents, shouldValidatePayloads) -> + generatedEvents + .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) } + .block() + } + } + } + + private fun generateEvents(ctx: Context, parameters: List<MessageParameters>): + Pair<Mono<List<VesEvent>>, Boolean> = Pair( + + doGenerateEvents(parameters).doOnError { + logger.error("Error occurred when generating messages: $it") + ctx.response + .status(STATUS_INTERNAL_SERVER_ERROR) + .send() + }, + parameters.all { it.messageType == FIXED_PAYLOAD } + ) + + private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE + .createMessageFlux(parameters) + .map(PayloadWireFrameMessage::payload) + .map { decode(it.unsafeAsArray()) } + .collectList() + + + private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes) + + + private fun sendResponse(ctx: Context, + generatedEvents: List<VesEvent>, + shouldValidatePayloads: Boolean) = + resolveResponseStatusCode( + generated = generatedEvents, + consumed = decodeConsumedEvents(), + validatePayloads = shouldValidatePayloads + ).let { ctx.response.status(it).send() } + + + private fun decodeConsumedEvents(): List<VesEvent> = consumerState + .currentState() + .consumedMessages + .map(::decode) + + + private fun resolveResponseStatusCode(generated: List<VesEvent>, + consumed: List<VesEvent>, + validatePayloads: Boolean): Int = + if (validatePayloads) { + if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST + } else { + validateHeaders(consumed, generated) + } + + private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int { + val consumedHeaders = consumed.map { it.commonEventHeader } + val generatedHeaders = generated.map { it.commonEventHeader } + return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST } private fun extractTopics(it: String): Set<String> = @@ -118,16 +153,14 @@ class ApiServer(private val consumerFactory: ConsumerFactory) { .split(",") .toSet() - private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String = - parseResult.fold( - { ex -> "\"Failed to parse protobuf: ${ex.message}\"" }, - { jsonPrinter.print(it) }) - - companion object { private val logger = Logger(ApiServer::class) - private const val CONTENT_TEXT = "text/plain" - private const val CONTENT_JSON = "application/json" + + private const val STATUS_OK = 200 + private const val STATUS_BAD_REQUEST = 400 + private const val STATUS_INTERNAL_SERVER_ERROR = 500 } } + + diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml index d0e44932..39097c10 100644 --- a/hv-collector-utils/pom.xml +++ b/hv-collector-utils/pom.xml @@ -61,19 +61,10 @@ <dependencies> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-ves-message-generator</artifactId> <version>${project.parent.version}</version> </dependency> <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - </dependency> - <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> @@ -98,6 +89,10 @@ <artifactId>slf4j-api</artifactId> </dependency> <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + </dependency> + <dependency> <groupId>com.nhaarman</groupId> <artifactId>mockito-kotlin</artifactId> </dependency> diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt index 9c873a0f..c00ce68d 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt @@ -54,20 +54,16 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) { protected abstract fun getConfiguration(cmdLine: CommandLine): Option<T> - protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int = - intValue(cmdLineOpt).getOrElse { default } - protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = longValue(cmdLineOpt).getOrElse { default } protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String = optionValue(cmdLineOpt).getOrElse { default } - protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> = optionValue(cmdLineOpt).map(String::toInt) - protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> = + private fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> = optionValue(cmdLineOpt).map(String::toLong) protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> = diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt index 24c2cbfa..1621ba59 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt @@ -35,14 +35,17 @@ class MessageParametersParser( request .map { it.asJsonObject() } .map { - val commonEventHeader = commonEventHeaderParser.parse(it.getJsonObject("commonEventHeader")) + val commonEventHeader = commonEventHeaderParser + .parse(it.getJsonObject("commonEventHeader")) val messageType = MessageType.valueOf(it.getString("messageType")) - val messagesAmount = it.getJsonNumber("messagesAmount").longValue() + val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue() + ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.", + NullPointerException()) MessageParameters(commonEventHeader, messageType, messagesAmount) } } catch (e: Exception) { throw ParsingException("Parsing request body failed", e) } - internal class ParsingException(message: String?, cause: Exception) : Exception(message, cause) + internal class ParsingException(message: String, cause: Exception) : Exception(message, cause) } diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt new file mode 100644 index 00000000..ec628a2a --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.messages + +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser.ParsingException +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType + +private const val EXPECTED_MESSAGES_AMOUNT = 25000L + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +object MessageParametersParserTest : Spek({ + describe("Messages parameters parser") { + val messageParametersParser = MessageParametersParser() + + given("parameters json array") { + on("valid parameters json") { + it("should parse MessagesParameters object successfully") { + val result = messageParametersParser.parse(validMessagesParametesJson()) + + assertThat(result).isNotNull + assertThat(result).hasSize(2) + val firstMessage = result.first() + assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID) + assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT) + } + } + on("invalid parameters json") { + it("should throw exception") { + assertThatExceptionOfType(ParsingException::class.java).isThrownBy { + messageParametersParser.parse(invalidMessagesParametesJson()) + } + } + } + } + } +}) diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt new file mode 100644 index 00000000..f6a3a15b --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.messages + +import javax.json.Json + +private const val validMessageParameters = "[\n" + + " {\n" + + " \"commonEventHeader\": {\n" + + " \"version\": \"sample-version\",\n" + + " \"domain\": \"HVRANMEAS\",\n" + + " \"sequence\": 1,\n" + + " \"priority\": 1,\n" + + " \"eventId\": \"sample-event-id\",\n" + + " \"eventName\": \"sample-event-name\",\n" + + " \"eventType\": \"sample-event-type\",\n" + + " \"startEpochMicrosec\": 120034455,\n" + + " \"lastEpochMicrosec\": 120034455,\n" + + " \"nfNamingCode\": \"sample-nf-naming-code\",\n" + + " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" + + " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" + + " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" + + " \"sourceId\": \"sample-source-id\",\n" + + " \"sourceName\": \"sample-source-name\"\n" + + " },\n" + + " \"messageType\": \"VALID\",\n" + + " \"messagesAmount\": 25000\n" + + " },\n" + + " {\n" + + " \"commonEventHeader\": {\n" + + " \"version\": \"sample-version\",\n" + + " \"domain\": \"HVRANMEAS\",\n" + + " \"sequence\": 1,\n" + + " \"priority\": 1,\n" + + " \"eventId\": \"sample-event-id\",\n" + + " \"eventName\": \"sample-event-name\",\n" + + " \"eventType\": \"sample-event-type\",\n" + + " \"startEpochMicrosec\": 120034455,\n" + + " \"lastEpochMicrosec\": 120034455,\n" + + " \"nfNamingCode\": \"sample-nf-naming-code\",\n" + + " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" + + " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" + + " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" + + " \"sourceId\": \"sample-source-id\",\n" + + " \"sourceName\": \"sample-source-name\"\n" + + " },\n" + + " \"messageType\": \"TOO_BIG_PAYLOAD\",\n" + + " \"messagesAmount\": 100\n" + + " }\n" + + "]" + +private const val invalidMessageParameters = "[\n" + + " {\n" + + " \"commonEventHeader\": {\n" + + " \"version\": \"sample-version\",\n" + + " \"domain\": \"HVRANMEAS\",\n" + + " \"sequence\": 1,\n" + + " \"priority\": 1,\n" + + " \"eventId\": \"sample-event-id\",\n" + + " \"eventName\": \"sample-event-name\",\n" + + " \"eventType\": \"sample-event-type\",\n" + + " \"startEpochMicrosec\": 120034455,\n" + + " \"lastEpochMicrosec\": 120034455,\n" + + " \"nfNamingCode\": \"sample-nf-naming-code\",\n" + + " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" + + " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" + + " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" + + " \"sourceId\": \"sample-source-id\",\n" + + " \"sourceName\": \"sample-source-name\"\n" + + " },\n" + + " \"messagesAmount\": 3\n" + + " }\n" + + "]" + +fun validMessagesParametesJson() = Json + .createReader(validMessageParameters.reader()) + .readArray() + +fun invalidMessagesParametesJson() = Json + .createReader(invalidMessageParameters.reader()) + .readArray() diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml index f049d78f..a7dad24b 100644 --- a/hv-collector-ves-message-generator/pom.xml +++ b/hv-collector-ves-message-generator/pom.xml @@ -95,10 +95,6 @@ <artifactId>logback-classic</artifactId> <scope>runtime</scope> </dependency> - <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - </dependency> </dependencies> diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt deleted file mode 100644 index 605b1729..00000000 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.api - -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParserImpl -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import javax.json.JsonObject - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since July 2018 - */ -interface CommonEventHeaderParser { - - fun parse(json: JsonObject): CommonEventHeader - - companion object { - val INSTANCE: CommonEventHeaderParser by lazy { - CommonEventHeaderParserImpl() - } - } -}
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt index 7407f692..d9329cb0 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt @@ -35,6 +35,8 @@ interface MessageGenerator { val INSTANCE: MessageGenerator by lazy { MessageGeneratorImpl(PayloadGenerator()) } + + const val FIXED_PAYLOAD_SIZE = 100 } } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt index 0ac90544..22c88252 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt @@ -26,6 +26,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api enum class MessageType { VALID, TOO_BIG_PAYLOAD, + FIXED_PAYLOAD, INVALID_WIRE_FRAME, - INVALID_GPB_DATA + INVALID_GPB_DATA, } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt deleted file mode 100644 index 61f5f2f3..00000000 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl - -import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.ves.message.generator.api.CommonEventHeaderParser -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority -import javax.json.JsonObject - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since July 2018 - */ -class CommonEventHeaderParserImpl : CommonEventHeaderParser { - - override fun parse(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder() - .setVersion(json.getString("version")) - .setDomain(Domain.valueOf(json.getString("domain"))) - .setSequence(json.getInt("sequence")) - .setPriority(Priority.forNumber(json.getInt("priority"))) - .setEventId(json.getString("version")) - .setEventName(json.getString("version")) - .setEventType(json.getString("version")) - .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) - .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) - .setNfNamingCode(json.getString("nfNamingCode")) - .setNfcNamingCode(json.getString("nfcNamingCode")) - .setReportingEntityId(json.getString("reportingEntityId")) - .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) - .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) - .setSourceName(json.getString("sourceName")) - .build() -}
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index e9db716d..c6e0556b 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD @@ -63,6 +64,8 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) TOO_BIG_PAYLOAD -> PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload())) + FIXED_PAYLOAD -> + PayloadWireFrameMessage(vesEvent(commonEventHeader, fixedPayload())) INVALID_WIRE_FRAME -> { val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) PayloadWireFrameMessage( @@ -92,6 +95,9 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa private fun oversizedPayload() = payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1) + private fun fixedPayload() = + payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE) + companion object { private const val UNSUPPORTED_VERSION: Short = 2 } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt index c85ce035..acdaf19f 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt @@ -62,5 +62,4 @@ internal class PayloadGenerator { private const val URI_BASE_NAME = "sample/uri" private const val UPPER_URI_NUMBER_BOUND = 10_000 } - } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index b2490dd1..1e38d46e 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -146,6 +146,24 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + on("message type requesting fixed payload") { + it("should create flux of valid messages with fixed payload") { + generator + .createMessageFlux(listOf(MessageParameters( + createSampleCommonHeader(FAULT), + MessageType.FIXED_PAYLOAD, + 1 + ))) + .test() + .assertNext { + assertThat(it.isValid()).isTrue() + assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + } + .verifyComplete() + } + } } given("list of message parameters") { it("should create concatenated flux of messages") { @@ -182,6 +200,10 @@ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader } +fun extractHvRanMeasFields(bytes: ByteData): ByteString { + return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields +} + private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder() .setVersion("sample-version") .setDomain(domain) |