aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator')
-rw-r--r--sources/hv-collector-dcae-app-simulator/Dockerfile18
-rw-r--r--sources/hv-collector-dcae-app-simulator/pom.xml155
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt74
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt88
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt101
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt66
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt67
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt27
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt70
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt62
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml36
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt82
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt183
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt214
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt54
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt125
16 files changed, 1422 insertions, 0 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/Dockerfile b/sources/hv-collector-dcae-app-simulator/Dockerfile
new file mode 100644
index 00000000..a561fff7
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/Dockerfile
@@ -0,0 +1,18 @@
+FROM docker.io/openjdk:11-jre-slim
+
+LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends curl \
+ && apt-get clean
+
+WORKDIR /opt/ves-hv-dcae-app-simulator
+
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
+
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY target/hv-collector-dcae-app-simulator-*.jar ./
diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml
new file mode 100644
index 00000000..c34e885d
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-dcae-app-simulator</artifactId>
+ <description>VES HighVolume Collector :: Dcae app simulator</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <activation>
+ <property>
+ <name>!skipDocker</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-domain</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ves-message-generator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-instances</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-reactor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-syntax</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.ratpack</groupId>
+ <artifactId>ratpack-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
new file mode 100644
index 00000000..490cde4a
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.getOrElse
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.io.InputStream
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
+ private val messageStreamValidation: MessageStreamValidation) {
+ private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+
+ fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
+
+ fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
+ if (topics.any { it.isBlank() })
+ throw IllegalArgumentException("Topic list cannot contain empty elements")
+ if (topics.isEmpty())
+ throw IllegalArgumentException("Topic list cannot be empty")
+
+ logger.info("Received new configuration. Creating consumer for topics: $topics")
+ consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
+ }.fix()
+
+ fun state() = consumerState.getOption().map { it.currentState() }
+
+ fun resetState(): IO<Unit> = consumerState.getOption().fold(
+ { IO.unit },
+ { it.reset() }
+ )
+
+ fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
+
+ private fun currentMessages(): List<ByteArray> =
+ consumerState.getOption()
+ .map { it.currentState().consumedMessages }
+ .getOrElse(::emptyList)
+
+ private fun extractTopics(topicsString: String): Set<String> =
+ topicsString.substringAfter("=")
+ .split(",")
+ .toSet()
+
+ companion object {
+ private val logger = Logger(DcaeAppSimulator::class)
+ }
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
new file mode 100644
index 00000000..e423191d
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventOuterClass
+import java.io.InputStream
+import javax.json.Json
+
+class MessageStreamValidation(
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
+
+ fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
+ IO.monadError().bindingCatch {
+ val messageParams = parseMessageParams(jsonDescription)
+ val expectedEvents = generateEvents(messageParams).bind()
+ val actualEvents = decodeConsumedEvents(consumedMessages)
+ if (shouldValidatePayloads(messageParams)) {
+ expectedEvents == actualEvents
+ } else {
+ validateHeaders(actualEvents, expectedEvents)
+ }
+ }.fix()
+
+ private fun parseMessageParams(input: InputStream): List<MessageParameters> {
+ val expectations = Json.createReader(input).readArray()
+ val messageParams = messageParametersParser.parse(expectations)
+
+ return messageParams.fold(
+ { throw IllegalArgumentException("Parsing error: " + it.message) },
+ {
+ if (it.isEmpty())
+ throw IllegalArgumentException("Message param list cannot be empty")
+ it
+ }
+ )
+ }
+
+ private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
+ parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+
+ private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>,
+ expected: List<VesEventOuterClass.VesEvent>): Boolean {
+ val consumedHeaders = actual.map { it.commonEventHeader }
+ val generatedHeaders = expected.map { it.commonEventHeader }
+ return generatedHeaders == consumedHeaders
+ }
+
+ private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
+ messageGenerator.createMessageFlux(parameters)
+ .map(WireFrameMessage::payload)
+ .map(ByteData::unsafeAsArray)
+ .map(VesEventOuterClass.VesEvent::parseFrom)
+ .collectList()
+ .asIo()
+
+ private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
+ consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
+
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
new file mode 100644
index 00000000..1eca9317
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -0,0 +1,101 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
+import ratpack.handling.Chain
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
+ private val responseValid by lazy {
+ Responses.statusResponse(
+ name = "valid",
+ message = "validation succeeded"
+ )
+ }
+
+ private val responseInvalid by lazy {
+ Responses.statusResponse(
+ name = "invalid",
+ message = "validation failed",
+ httpStatus = HttpStatus.BAD_REQUEST
+ )
+ }
+
+
+ fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
+ simulator.listenToTopics(kafkaTopics).map {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(::setupHandlers)
+ }
+ }
+
+ private fun setupHandlers(chain: Chain) {
+ chain
+ .put("configuration/topics") { ctx ->
+ ctx.request.body.then { body ->
+ val operation = simulator.listenToTopics(body.text)
+ ctx.response.sendOrError(operation)
+ }
+
+ }
+ .delete("messages") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ ctx.response.sendOrError(simulator.resetState())
+ }
+ .get("messages/all/count") { ctx ->
+ simulator.state().fold(
+ { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) },
+ {
+ ctx.response
+ .contentType(CONTENT_TEXT)
+ .send(it.messagesCount.toString())
+ })
+ }
+ .post("messages/all/validate") { ctx ->
+ ctx.request.body.then { body ->
+ val response = simulator.validate(body.inputStream)
+ .map { isValid ->
+ if (isValid) responseValid else responseInvalid
+ }
+ ctx.response.sendAndHandleErrors(response)
+ }
+ }
+ .get("healthcheck") { ctx ->
+ ctx.response.status(HttpConstants.STATUS_OK).send()
+ }
+ }
+
+ companion object {
+ private const val CONTENT_TEXT = "text/plain"
+ }
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
new file mode 100644
index 00000000..10dedbdf
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import arrow.effects.IO
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
+import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.kafka.receiver.KafkaReceiver
+import reactor.kafka.receiver.ReceiverOptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
+
+ fun start(): IO<Consumer> = IO {
+ val consumer = Consumer()
+ receiver.receive().map(consumer::update).evaluateIo().subscribe()
+ consumer
+ }
+
+ companion object {
+ private val logger = Logger(KafkaSource::class)
+
+ fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
+ return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
+ }
+
+ fun createReceiverOptions(bootstrapServers: String,
+ topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
+ val props = mapOf<String, Any>(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+ ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
+ ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
+ )
+ return ReceiverOptions.create<ByteArray, ByteArray>(props)
+ .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
+ .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
+ .subscription(topics)
+ }
+ }
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
new file mode 100644
index 00000000..17eeb5b1
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* dcaegen2-collectors-veshv
+* ================================================================================
+* Copyright (C) 2018 NOKIA
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
+
+import arrow.core.Option
+import arrow.core.fix
+import arrow.instances.option.monad.monad
+import arrow.typeclasses.binding
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.intValue
+import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+
+class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ KAFKA_SERVERS,
+ KAFKA_TOPICS
+ )
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
+ Option.monad().binding {
+ val listenPort = cmdLine
+ .intValue(LISTEN_PORT)
+ .bind()
+ val maxPayloadSizeBytes = cmdLine
+ .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+ val kafkaBootstrapServers = cmdLine
+ .stringValue(KAFKA_SERVERS)
+ .bind()
+ val kafkaTopics = cmdLine
+ .stringValue(KAFKA_TOPICS)
+ .map { it.split(",").toSet() }
+ .bind()
+
+ DcaeAppSimConfiguration(
+ listenPort,
+ maxPayloadSizeBytes,
+ kafkaBootstrapServers,
+ kafkaTopics)
+ }.fix()
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
new file mode 100644
index 00000000..a6fc8053
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
@@ -0,0 +1,27 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
+
+data class DcaeAppSimConfiguration(
+ val apiPort: Int,
+ val maxPayloadSizeBytes: Int,
+ val kafkaBootstrapServers: String,
+ val kafkaTopics: Set<String>
+)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
new file mode 100644
index 00000000..1eefdbdb
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -0,0 +1,70 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.kafka.receiver.ReceiverRecord
+import java.util.concurrent.ConcurrentLinkedQueue
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
+ val messagesCount: Int by lazy {
+ messages.size
+ }
+
+ val consumedMessages: List<ByteArray> by lazy {
+ messages.toList()
+ }
+}
+
+interface ConsumerStateProvider {
+ fun currentState(): ConsumerState
+ fun reset(): IO<Unit>
+}
+
+class Consumer : ConsumerStateProvider {
+
+ private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
+
+ override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
+
+ override fun reset(): IO<Unit> = IO {
+ consumedMessages.clear()
+ }
+
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
+ logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
+ consumedMessages.add(record.value())
+ }
+
+ companion object {
+ private val logger = Logger(Consumer::class)
+ }
+}
+
+class ConsumerFactory(private val kafkaBootstrapServers: String) {
+ fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+ KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
new file mode 100644
index 00000000..06ff4d59
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
+import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
+import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+
+private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
+private val logger = Logger(PACKAGE_NAME)
+const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
+
+fun main(args: Array<String>) =
+ ArgDcaeAppSimConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startApp)
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.error("Failed to start a server", ex)
+ ExitFailure(1)
+ },
+ {
+ logger.info("Started DCAE-APP Simulator API server")
+ }
+ )
+
+
+private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
+ logger.info("Using configuration: $config")
+ val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
+ val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
+ .start(config.apiPort, config.kafkaTopics)
+ .unit()
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
new file mode 100644
index 00000000..48da3b18
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
+ <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
new file mode 100644
index 00000000..08558d76
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import reactor.kafka.receiver.ReceiverRecord
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class ConsumerTest : Spek({
+
+ lateinit var cut: Consumer
+
+ beforeEachTest {
+ cut = Consumer()
+ }
+
+ describe("Consumer which holds the state of received Kafka records") {
+ it("should contain empty state in the beginning") {
+ assertEmptyState(cut)
+ }
+
+ describe("update") {
+ val value = byteArrayOf(2)
+
+ beforeEachTest {
+ cut.update(receiverRecord(
+ topic = "topic",
+ key = byteArrayOf(1),
+ value = value
+ )).unsafeRunSync()
+ }
+
+ it("should contain one message if it was updated once") {
+ assertState(cut, value)
+ }
+
+ it("should contain empty state message if it was reset after update") {
+ cut.reset().unsafeRunSync()
+ assertEmptyState(cut)
+ }
+ }
+ }
+})
+
+fun assertEmptyState(cut: Consumer) {
+ assertState(cut)
+}
+
+fun assertState(cut: Consumer, vararg values: ByteArray) {
+ assertThat(cut.currentState().consumedMessages)
+ .containsOnly(*values)
+ assertThat(cut.currentState().messagesCount)
+ .isEqualTo(values.size)
+}
+
+fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+ ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
new file mode 100644
index 00000000..e1641cbb
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -0,0 +1,183 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Some
+import arrow.effects.IO
+import com.google.protobuf.ByteString
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.never
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.mockito.ArgumentMatchers.anySet
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import java.util.concurrent.ConcurrentLinkedQueue
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class DcaeAppSimulatorTest : Spek({
+ lateinit var consumerFactory: ConsumerFactory
+ lateinit var messageStreamValidation: MessageStreamValidation
+ lateinit var consumer: Consumer
+ lateinit var cut: DcaeAppSimulator
+
+ beforeEachTest {
+ consumerFactory = mock()
+ messageStreamValidation = mock()
+ consumer = mock()
+ cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
+
+ whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
+ }
+
+ fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
+
+ describe("listenToTopics") {
+ val topics = setOf("perf3gpp", "faults")
+
+ it("should fail when topic list is empty") {
+ val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
+ assertThat(result.isLeft()).isTrue()
+ }
+
+ it("should fail when topic list contains empty strings") {
+ val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
+ assertThat(result.isLeft()).isTrue()
+ }
+
+ it("should subscribe to given topics") {
+ cut.listenToTopics(topics).unsafeRunSync()
+ verify(consumerFactory).createConsumerForTopics(topics)
+ }
+
+ it("should subscribe to given topics when called with comma separated list") {
+ cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
+ verify(consumerFactory).createConsumerForTopics(topics)
+ }
+
+ it("should handle errors") {
+ // given
+ val error = RuntimeException("WTF")
+ whenever(consumerFactory.createConsumerForTopics(anySet()))
+ .thenReturn(IO.raiseError(error))
+
+ // when
+ val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
+
+ // then
+ assertThat(result).isEqualTo(Left(error))
+ }
+ }
+
+ describe("state") {
+
+ it("should return None when topics hasn't been initialized") {
+ assertThat(cut.state()).isEqualTo(None)
+ }
+
+ describe("when topics are initialized") {
+ beforeEachTest {
+ cut.listenToTopics("perf3gpp").unsafeRunSync()
+ }
+
+ it("should return some state when it has been set") {
+ val state = consumerState()
+ whenever(consumer.currentState()).thenReturn(state)
+
+ assertThat(cut.state()).isEqualTo(Some(state))
+ }
+ }
+ }
+
+ describe("resetState") {
+ it("should do nothing when topics hasn't been initialized") {
+ cut.resetState().unsafeRunSync()
+ verify(consumer, never()).reset()
+ }
+
+ describe("when topics are initialized") {
+ beforeEachTest {
+ cut.listenToTopics("perf3gpp").unsafeRunSync()
+ }
+
+ it("should reset the state") {
+ // given
+ whenever(consumer.reset()).thenReturn(IO.unit)
+
+ // when
+ cut.resetState().unsafeRunSync()
+
+ // then
+ verify(consumer).reset()
+ }
+ }
+ }
+
+ describe("validate") {
+ beforeEachTest {
+ whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
+ }
+
+ it("should use empty list when consumer is unavailable") {
+ // when
+ val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+
+ // then
+ verify(messageStreamValidation).validate(any(), eq(emptyList()))
+ assertThat(result).isTrue()
+ }
+
+ it("should delegate to MessageStreamValidation") {
+ // given
+ cut.listenToTopics("perf3gpp").unsafeRunSync()
+ whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
+
+ // when
+ val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+
+ // then
+ verify(messageStreamValidation).validate(any(), any())
+ assertThat(result).isTrue()
+ }
+ }
+})
+
+
+private const val DUMMY_EVENT_ID = "aaa"
+private const val DUMMY_PAYLOAD = "payload"
+
+private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
+ return VesEvent.newBuilder()
+ .setCommonEventHeader(CommonEventHeader.newBuilder()
+ .setEventId(eventId))
+ .setEventFields(ByteString.copyFrom(payload.toByteArray()))
+ .build()
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
new file mode 100644
index 00000000..a631be76
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -0,0 +1,214 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.Either
+import arrow.core.Right
+import com.google.protobuf.ByteString
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.mockito.ArgumentMatchers.anyList
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import javax.json.stream.JsonParsingException
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class MessageStreamValidationTest : Spek({
+ lateinit var messageParametersParser: MessageParametersParser
+ lateinit var messageGenerator: MessageGenerator
+ lateinit var cut: MessageStreamValidation
+
+ beforeEachTest {
+ messageParametersParser = mock()
+ messageGenerator = mock()
+ cut = MessageStreamValidation(messageGenerator, messageParametersParser)
+ }
+
+ fun givenParsedMessageParameters(vararg params: MessageParameters) {
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
+ }
+
+ describe("validate") {
+
+ it("should return error when JSON is invalid") {
+ // when
+ val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
+
+ // then
+ when(result) {
+ is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
+ else -> fail("validation should fail")
+ }
+ }
+
+ it("should return error when message param list is empty") {
+ // given
+ givenParsedMessageParameters()
+
+ // when
+ val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
+
+ // then
+ assertThat(result.isLeft()).isTrue()
+ }
+
+ describe("when validating headers only") {
+ it("should return true when messages are the same") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val event = vesEvent()
+ val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
+ val receivedMessageBytes = event.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isTrue()
+ }
+
+ it("should return true when messages differ with payload only") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent(payload = "payload A")
+ val receivedEvent = vesEvent(payload = "payload B")
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isTrue()
+ }
+
+ it("should return false when messages are different") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent()
+ val receivedEvent = vesEvent(eventId = "bbb")
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isFalse()
+ }
+ }
+
+ describe("when validating whole messages") {
+ it("should return true when messages are the same") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val event = vesEvent()
+ val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
+ val receivedMessageBytes = event.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isTrue()
+ }
+
+ it("should return false when messages differ with payload only") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent(payload = "payload A")
+ val receivedEvent = vesEvent(payload = "payload B")
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isFalse()
+ }
+
+ it("should return false when messages are different") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent()
+ val receivedEvent = vesEvent("bbb")
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isFalse()
+ }
+ }
+ }
+})
+
+
+
+private const val DUMMY_EVENT_ID = "aaa"
+private const val DUMMY_PAYLOAD = "payload"
+
+private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
+ return VesEvent.newBuilder()
+ .setCommonEventHeader(CommonEventHeader.newBuilder()
+ .setEventId(eventId))
+ .setEventFields(ByteString.copyFrom(payload.toByteArray()))
+ .build()
+}
+
+private const val sampleJsonArray = """["headersOnly"]"""
+
+private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
new file mode 100644
index 00000000..de74f628
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since August 2018
+ */
+internal class KafkaSourceTest : Spek({
+ val servers = "kafka1:9080,kafka2:9080"
+ val topics = setOf("topic1", "topic2")
+
+ describe("receiver options") {
+ val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable()
+
+ fun verifyProperty(key: String, expectedValue: Any) {
+ it("should have $key option set") {
+ assertThat(options.consumerProperty(key))
+ .isEqualTo(expectedValue)
+ }
+ }
+
+ verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
+ verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator")
+ verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators")
+ verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+ verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+ verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
new file mode 100644
index 00000000..7137fe12
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
@@ -0,0 +1,125 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
+import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
+
+
+internal class ArgDcaeAppSimConfigurationTest : Spek({
+
+ lateinit var cut: ArgDcaeAppSimConfiguration
+ val listenPort = "1234"
+ val kafkaBootstrapServers = "localhosting:123,localhostinger:12345"
+ val kafkaTopics = "top1,top2"
+
+ beforeEachTest {
+ cut = ArgDcaeAppSimConfiguration()
+ }
+
+ describe("parsing arguments") {
+ lateinit var result: DcaeAppSimConfiguration
+
+ given("all parameters are present in the long form") {
+
+ beforeEachTest {
+ result = cut.parseExpectingSuccess(
+ "--listen-port", listenPort,
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--kafka-topics", kafkaTopics
+ )
+ }
+
+ it("should set proper port") {
+ assertThat(result.apiPort).isEqualTo(listenPort.toInt())
+ }
+
+
+ it("should set proper kafka bootstrap servers") {
+ assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers)
+ }
+
+ it("should set proper kafka topics") {
+ assertThat(result.kafkaTopics).isEqualTo(
+ setOf("top1", "top2")
+ )
+ }
+ }
+
+ given("some parameters are present in the short form") {
+
+ beforeEachTest {
+ result = cut.parseExpectingSuccess(
+ "-p", listenPort,
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "-f", kafkaTopics)
+ }
+
+ it("should set proper port") {
+ assertThat(result.apiPort).isEqualTo(listenPort.toInt())
+ }
+
+ it("should set proper kafka bootstrap servers") {
+ assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers)
+ }
+
+ it("should set proper kafka topics") {
+ assertThat(result.kafkaTopics).isEqualTo(
+ setOf("top1", "top2")
+ )
+ }
+ }
+
+ describe("required parameter is absent") {
+ given("kafka topics are missing") {
+ it("should throw exception") {
+ assertThat(cut.parseExpectingFailure(
+ "-p", listenPort,
+ "-s", kafkaBootstrapServers
+ )).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+
+ given("kafka bootstrap servers is missing") {
+ it("should throw exception") {
+ assertThat(cut.parseExpectingFailure(
+ "-p", listenPort,
+ "-f", kafkaTopics
+ )).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+
+ given("listen port is missing") {
+ it("should throw exception") {
+ assertThat(cut.parseExpectingFailure(
+ "-p", listenPort,
+ "-s", kafkaBootstrapServers
+ )).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+ }
+ }
+}) \ No newline at end of file