aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator')
-rw-r--r--hv-collector-dcae-app-simulator/Dockerfile18
-rw-r--r--hv-collector-dcae-app-simulator/pom.xml155
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt74
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt88
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt101
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt66
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt67
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt27
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt70
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt62
-rw-r--r--hv-collector-dcae-app-simulator/src/main/resources/logback.xml36
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt82
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt183
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt214
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt54
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt125
16 files changed, 0 insertions, 1422 deletions
diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile
deleted file mode 100644
index a561fff7..00000000
--- a/hv-collector-dcae-app-simulator/Dockerfile
+++ /dev/null
@@ -1,18 +0,0 @@
-FROM docker.io/openjdk:11-jre-slim
-
-LABEL copyright="Copyright (C) 2018 NOKIA"
-LABEL license.name="The Apache Software License, Version 2.0"
-LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
-LABEL maintainer="Nokia Wroclaw ONAP Team"
-
-RUN apt-get update \
- && apt-get install -y --no-install-recommends curl \
- && apt-get clean
-
-WORKDIR /opt/ves-hv-dcae-app-simulator
-
-ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
-
-COPY target/libs/external/* ./
-COPY target/libs/internal/* ./
-COPY target/hv-collector-dcae-app-simulator-*.jar ./
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
deleted file mode 100644
index 82e99f95..00000000
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ /dev/null
@@ -1,155 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ ============LICENSE_START=======================================================
- ~ dcaegen2-collectors-veshv
- ~ ================================================================================
- ~ Copyright (C) 2018 NOKIA
- ~ ================================================================================
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~ ============LICENSE_END=========================================================
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
-
- <parent>
- <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
- <artifactId>ves-hv-collector</artifactId>
- <version>1.1.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>hv-collector-dcae-app-simulator</artifactId>
- <description>VES HighVolume Collector :: Dcae app simulator</description>
-
- <properties>
- <skipAnalysis>false</skipAnalysis>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>kotlin-maven-plugin</artifactId>
- <groupId>org.jetbrains.kotlin</groupId>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <groupId>org.apache.maven.plugins</groupId>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>docker</id>
- <activation>
- <property>
- <name>!skipDocker</name>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
- <dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-utils</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-ves-message-generator</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-test-utils</artifactId>
- <version>${project.parent.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-reactor</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-syntax</artifactId>
- </dependency>
- <dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java-util</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-stdlib-jdk8</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
-
-
-</project> \ No newline at end of file
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
deleted file mode 100644
index 490cde4a..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.core.getOrElse
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.io.InputStream
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
- private val messageStreamValidation: MessageStreamValidation) {
- private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
-
- fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
-
- fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
- if (topics.any { it.isBlank() })
- throw IllegalArgumentException("Topic list cannot contain empty elements")
- if (topics.isEmpty())
- throw IllegalArgumentException("Topic list cannot be empty")
-
- logger.info("Received new configuration. Creating consumer for topics: $topics")
- consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
- }.fix()
-
- fun state() = consumerState.getOption().map { it.currentState() }
-
- fun resetState(): IO<Unit> = consumerState.getOption().fold(
- { IO.unit },
- { it.reset() }
- )
-
- fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
-
- private fun currentMessages(): List<ByteArray> =
- consumerState.getOption()
- .map { it.currentState().consumedMessages }
- .getOrElse(::emptyList)
-
- private fun extractTopics(topicsString: String): Set<String> =
- topicsString.substringAfter("=")
- .split(",")
- .toSet()
-
- companion object {
- private val logger = Logger(DcaeAppSimulator::class)
- }
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
deleted file mode 100644
index e423191d..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass
-import java.io.InputStream
-import javax.json.Json
-
-class MessageStreamValidation(
- private val messageGenerator: MessageGenerator,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
-
- fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
- IO.monadError().bindingCatch {
- val messageParams = parseMessageParams(jsonDescription)
- val expectedEvents = generateEvents(messageParams).bind()
- val actualEvents = decodeConsumedEvents(consumedMessages)
- if (shouldValidatePayloads(messageParams)) {
- expectedEvents == actualEvents
- } else {
- validateHeaders(actualEvents, expectedEvents)
- }
- }.fix()
-
- private fun parseMessageParams(input: InputStream): List<MessageParameters> {
- val expectations = Json.createReader(input).readArray()
- val messageParams = messageParametersParser.parse(expectations)
-
- return messageParams.fold(
- { throw IllegalArgumentException("Parsing error: " + it.message) },
- {
- if (it.isEmpty())
- throw IllegalArgumentException("Message param list cannot be empty")
- it
- }
- )
- }
-
- private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
- parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
-
- private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>,
- expected: List<VesEventOuterClass.VesEvent>): Boolean {
- val consumedHeaders = actual.map { it.commonEventHeader }
- val generatedHeaders = expected.map { it.commonEventHeader }
- return generatedHeaders == consumedHeaders
- }
-
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
- messageGenerator.createMessageFlux(parameters)
- .map(WireFrameMessage::payload)
- .map(ByteData::unsafeAsArray)
- .map(VesEventOuterClass.VesEvent::parseFrom)
- .collectList()
- .asIo()
-
- private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
-
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
deleted file mode 100644
index 1eca9317..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
-import org.onap.dcae.collectors.veshv.utils.http.Responses
-import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
-import org.onap.dcae.collectors.veshv.utils.http.sendOrError
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
- private val responseValid by lazy {
- Responses.statusResponse(
- name = "valid",
- message = "validation succeeded"
- )
- }
-
- private val responseInvalid by lazy {
- Responses.statusResponse(
- name = "invalid",
- message = "validation failed",
- httpStatus = HttpStatus.BAD_REQUEST
- )
- }
-
-
- fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
- simulator.listenToTopics(kafkaTopics).map {
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(::setupHandlers)
- }
- }
-
- private fun setupHandlers(chain: Chain) {
- chain
- .put("configuration/topics") { ctx ->
- ctx.request.body.then { body ->
- val operation = simulator.listenToTopics(body.text)
- ctx.response.sendOrError(operation)
- }
-
- }
- .delete("messages") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
- ctx.response.sendOrError(simulator.resetState())
- }
- .get("messages/all/count") { ctx ->
- simulator.state().fold(
- { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) },
- {
- ctx.response
- .contentType(CONTENT_TEXT)
- .send(it.messagesCount.toString())
- })
- }
- .post("messages/all/validate") { ctx ->
- ctx.request.body.then { body ->
- val response = simulator.validate(body.inputStream)
- .map { isValid ->
- if (isValid) responseValid else responseInvalid
- }
- ctx.response.sendAndHandleErrors(response)
- }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(HttpConstants.STATUS_OK).send()
- }
- }
-
- companion object {
- private const val CONTENT_TEXT = "text/plain"
- }
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
deleted file mode 100644
index 10dedbdf..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-
-import arrow.effects.IO
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
-import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.KafkaReceiver
-import reactor.kafka.receiver.ReceiverOptions
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
-
- fun start(): IO<Consumer> = IO {
- val consumer = Consumer()
- receiver.receive().map(consumer::update).evaluateIo().subscribe()
- consumer
- }
-
- companion object {
- private val logger = Logger(KafkaSource::class)
-
- fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
- return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
- }
-
- fun createReceiverOptions(bootstrapServers: String,
- topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
- val props = mapOf<String, Any>(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
- ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
- ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
- )
- return ReceiverOptions.create<ByteArray, ByteArray>(props)
- .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
- .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
- .subscription(topics)
- }
- }
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
deleted file mode 100644
index 17eeb5b1..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* dcaegen2-collectors-veshv
-* ================================================================================
-* Copyright (C) 2018 NOKIA
-* ================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
-
-import arrow.core.Option
-import arrow.core.fix
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
-import org.apache.commons.cli.CommandLine
-import org.apache.commons.cli.DefaultParser
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.utils.commandline.intValue
-import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
-
-class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList: List<CommandLineOption> = listOf(
- LISTEN_PORT,
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- KAFKA_SERVERS,
- KAFKA_TOPICS
- )
-
- override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
- Option.monad().binding {
- val listenPort = cmdLine
- .intValue(LISTEN_PORT)
- .bind()
- val maxPayloadSizeBytes = cmdLine
- .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
- val kafkaBootstrapServers = cmdLine
- .stringValue(KAFKA_SERVERS)
- .bind()
- val kafkaTopics = cmdLine
- .stringValue(KAFKA_TOPICS)
- .map { it.split(",").toSet() }
- .bind()
-
- DcaeAppSimConfiguration(
- listenPort,
- maxPayloadSizeBytes,
- kafkaBootstrapServers,
- kafkaTopics)
- }.fix()
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
deleted file mode 100644
index a6fc8053..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
-
-data class DcaeAppSimConfiguration(
- val apiPort: Int,
- val maxPayloadSizeBytes: Int,
- val kafkaBootstrapServers: String,
- val kafkaTopics: Set<String>
-)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
deleted file mode 100644
index 1eefdbdb..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.ReceiverRecord
-import java.util.concurrent.ConcurrentLinkedQueue
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
- val messagesCount: Int by lazy {
- messages.size
- }
-
- val consumedMessages: List<ByteArray> by lazy {
- messages.toList()
- }
-}
-
-interface ConsumerStateProvider {
- fun currentState(): ConsumerState
- fun reset(): IO<Unit>
-}
-
-class Consumer : ConsumerStateProvider {
-
- private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
-
- override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
-
- override fun reset(): IO<Unit> = IO {
- consumedMessages.clear()
- }
-
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
- logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
- consumedMessages.add(record.value())
- }
-
- companion object {
- private val logger = Logger(Consumer::class)
- }
-}
-
-class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
- KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
deleted file mode 100644
index 06ff4d59..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
-import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-
-private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
-private val logger = Logger(PACKAGE_NAME)
-const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
-
-fun main(args: Array<String>) =
- ArgDcaeAppSimConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startApp)
- .unsafeRunEitherSync(
- { ex ->
- logger.error("Failed to start a server", ex)
- ExitFailure(1)
- },
- {
- logger.info("Started DCAE-APP Simulator API server")
- }
- )
-
-
-private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- logger.info("Using configuration: $config")
- val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
- val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
- return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
- .start(config.apiPort, config.kafkaTopics)
- .unit()
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
deleted file mode 100644
index 48da3b18..00000000
--- a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
- <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
-
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>
- %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
- </pattern>
- </encoder>
- </appender>
-
- <appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
- </encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
- <maxFileSize>50MB</maxFileSize>
- <maxHistory>30</maxHistory>
- <totalSizeCap>10GB</totalSizeCap>
- </rollingPolicy>
- </appender>
-
- <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
- <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- <appender-ref ref="ROLLING-FILE"/>
- </root>
-</configuration> \ No newline at end of file
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
deleted file mode 100644
index 08558d76..00000000
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-import reactor.kafka.receiver.ReceiverRecord
-
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-internal class ConsumerTest : Spek({
-
- lateinit var cut: Consumer
-
- beforeEachTest {
- cut = Consumer()
- }
-
- describe("Consumer which holds the state of received Kafka records") {
- it("should contain empty state in the beginning") {
- assertEmptyState(cut)
- }
-
- describe("update") {
- val value = byteArrayOf(2)
-
- beforeEachTest {
- cut.update(receiverRecord(
- topic = "topic",
- key = byteArrayOf(1),
- value = value
- )).unsafeRunSync()
- }
-
- it("should contain one message if it was updated once") {
- assertState(cut, value)
- }
-
- it("should contain empty state message if it was reset after update") {
- cut.reset().unsafeRunSync()
- assertEmptyState(cut)
- }
- }
- }
-})
-
-fun assertEmptyState(cut: Consumer) {
- assertState(cut)
-}
-
-fun assertState(cut: Consumer, vararg values: ByteArray) {
- assertThat(cut.currentState().consumedMessages)
- .containsOnly(*values)
- assertThat(cut.currentState().messagesCount)
- .isEqualTo(values.size)
-}
-
-fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
- ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
deleted file mode 100644
index e1641cbb..00000000
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.core.Left
-import arrow.core.None
-import arrow.core.Some
-import arrow.effects.IO
-import com.google.protobuf.ByteString
-import com.nhaarman.mockitokotlin2.any
-import com.nhaarman.mockitokotlin2.eq
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.never
-import com.nhaarman.mockitokotlin2.verify
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-import org.mockito.ArgumentMatchers.anySet
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import org.onap.ves.VesEventOuterClass.VesEvent
-import java.util.concurrent.ConcurrentLinkedQueue
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-internal class DcaeAppSimulatorTest : Spek({
- lateinit var consumerFactory: ConsumerFactory
- lateinit var messageStreamValidation: MessageStreamValidation
- lateinit var consumer: Consumer
- lateinit var cut: DcaeAppSimulator
-
- beforeEachTest {
- consumerFactory = mock()
- messageStreamValidation = mock()
- consumer = mock()
- cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
-
- whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
- }
-
- fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
-
- describe("listenToTopics") {
- val topics = setOf("perf3gpp", "faults")
-
- it("should fail when topic list is empty") {
- val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
- assertThat(result.isLeft()).isTrue()
- }
-
- it("should fail when topic list contains empty strings") {
- val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
- assertThat(result.isLeft()).isTrue()
- }
-
- it("should subscribe to given topics") {
- cut.listenToTopics(topics).unsafeRunSync()
- verify(consumerFactory).createConsumerForTopics(topics)
- }
-
- it("should subscribe to given topics when called with comma separated list") {
- cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
- verify(consumerFactory).createConsumerForTopics(topics)
- }
-
- it("should handle errors") {
- // given
- val error = RuntimeException("WTF")
- whenever(consumerFactory.createConsumerForTopics(anySet()))
- .thenReturn(IO.raiseError(error))
-
- // when
- val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
-
- // then
- assertThat(result).isEqualTo(Left(error))
- }
- }
-
- describe("state") {
-
- it("should return None when topics hasn't been initialized") {
- assertThat(cut.state()).isEqualTo(None)
- }
-
- describe("when topics are initialized") {
- beforeEachTest {
- cut.listenToTopics("perf3gpp").unsafeRunSync()
- }
-
- it("should return some state when it has been set") {
- val state = consumerState()
- whenever(consumer.currentState()).thenReturn(state)
-
- assertThat(cut.state()).isEqualTo(Some(state))
- }
- }
- }
-
- describe("resetState") {
- it("should do nothing when topics hasn't been initialized") {
- cut.resetState().unsafeRunSync()
- verify(consumer, never()).reset()
- }
-
- describe("when topics are initialized") {
- beforeEachTest {
- cut.listenToTopics("perf3gpp").unsafeRunSync()
- }
-
- it("should reset the state") {
- // given
- whenever(consumer.reset()).thenReturn(IO.unit)
-
- // when
- cut.resetState().unsafeRunSync()
-
- // then
- verify(consumer).reset()
- }
- }
- }
-
- describe("validate") {
- beforeEachTest {
- whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
- }
-
- it("should use empty list when consumer is unavailable") {
- // when
- val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
-
- // then
- verify(messageStreamValidation).validate(any(), eq(emptyList()))
- assertThat(result).isTrue()
- }
-
- it("should delegate to MessageStreamValidation") {
- // given
- cut.listenToTopics("perf3gpp").unsafeRunSync()
- whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
-
- // when
- val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
-
- // then
- verify(messageStreamValidation).validate(any(), any())
- assertThat(result).isTrue()
- }
- }
-})
-
-
-private const val DUMMY_EVENT_ID = "aaa"
-private const val DUMMY_PAYLOAD = "payload"
-
-private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
- return VesEvent.newBuilder()
- .setCommonEventHeader(CommonEventHeader.newBuilder()
- .setEventId(eventId))
- .setEventFields(ByteString.copyFrom(payload.toByteArray()))
- .build()
-}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
deleted file mode 100644
index a631be76..00000000
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.core.Either
-import arrow.core.Right
-import com.google.protobuf.ByteString
-import com.nhaarman.mockitokotlin2.any
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-import org.mockito.ArgumentMatchers.anyList
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
-import javax.json.stream.JsonParsingException
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-internal class MessageStreamValidationTest : Spek({
- lateinit var messageParametersParser: MessageParametersParser
- lateinit var messageGenerator: MessageGenerator
- lateinit var cut: MessageStreamValidation
-
- beforeEachTest {
- messageParametersParser = mock()
- messageGenerator = mock()
- cut = MessageStreamValidation(messageGenerator, messageParametersParser)
- }
-
- fun givenParsedMessageParameters(vararg params: MessageParameters) {
- whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
- }
-
- describe("validate") {
-
- it("should return error when JSON is invalid") {
- // when
- val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
-
- // then
- when(result) {
- is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
- else -> fail("validation should fail")
- }
- }
-
- it("should return error when message param list is empty") {
- // given
- givenParsedMessageParameters()
-
- // when
- val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
-
- // then
- assertThat(result.isLeft()).isTrue()
- }
-
- describe("when validating headers only") {
- it("should return true when messages are the same") {
- // given
- val jsonAsStream = sampleJsonAsStream()
- val event = vesEvent()
- val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
- val receivedMessageBytes = event.toByteArray()
-
- givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
-
- // when
- val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
- // then
- assertThat(result).isTrue()
- }
-
- it("should return true when messages differ with payload only") {
- // given
- val jsonAsStream = sampleJsonAsStream()
- val generatedEvent = vesEvent(payload = "payload A")
- val receivedEvent = vesEvent(payload = "payload B")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
- val receivedMessageBytes = receivedEvent.toByteArray()
-
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
-
- // when
- val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
- // then
- assertThat(result).isTrue()
- }
-
- it("should return false when messages are different") {
- // given
- val jsonAsStream = sampleJsonAsStream()
- val generatedEvent = vesEvent()
- val receivedEvent = vesEvent(eventId = "bbb")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
- val receivedMessageBytes = receivedEvent.toByteArray()
-
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
-
- // when
- val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
- // then
- assertThat(result).isFalse()
- }
- }
-
- describe("when validating whole messages") {
- it("should return true when messages are the same") {
- // given
- val jsonAsStream = sampleJsonAsStream()
- val event = vesEvent()
- val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
- val receivedMessageBytes = event.toByteArray()
-
- givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
-
- // when
- val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
- // then
- assertThat(result).isTrue()
- }
-
- it("should return false when messages differ with payload only") {
- // given
- val jsonAsStream = sampleJsonAsStream()
- val generatedEvent = vesEvent(payload = "payload A")
- val receivedEvent = vesEvent(payload = "payload B")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
- val receivedMessageBytes = receivedEvent.toByteArray()
-
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
-
- // when
- val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
- // then
- assertThat(result).isFalse()
- }
-
- it("should return false when messages are different") {
- // given
- val jsonAsStream = sampleJsonAsStream()
- val generatedEvent = vesEvent()
- val receivedEvent = vesEvent("bbb")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
- val receivedMessageBytes = receivedEvent.toByteArray()
-
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
-
- // when
- val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
- // then
- assertThat(result).isFalse()
- }
- }
- }
-})
-
-
-
-private const val DUMMY_EVENT_ID = "aaa"
-private const val DUMMY_PAYLOAD = "payload"
-
-private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
- return VesEvent.newBuilder()
- .setCommonEventHeader(CommonEventHeader.newBuilder()
- .setEventId(eventId))
- .setEventFields(ByteString.copyFrom(payload.toByteArray()))
- .build()
-}
-
-private const val sampleJsonArray = """["headersOnly"]"""
-
-private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
deleted file mode 100644
index de74f628..00000000
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
- * @since August 2018
- */
-internal class KafkaSourceTest : Spek({
- val servers = "kafka1:9080,kafka2:9080"
- val topics = setOf("topic1", "topic2")
-
- describe("receiver options") {
- val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable()
-
- fun verifyProperty(key: String, expectedValue: Any) {
- it("should have $key option set") {
- assertThat(options.consumerProperty(key))
- .isEqualTo(expectedValue)
- }
- }
-
- verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
- verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator")
- verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators")
- verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
- verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
- verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- }
-}) \ No newline at end of file
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
deleted file mode 100644
index 7137fe12..00000000
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
-
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
-import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
-
-
-internal class ArgDcaeAppSimConfigurationTest : Spek({
-
- lateinit var cut: ArgDcaeAppSimConfiguration
- val listenPort = "1234"
- val kafkaBootstrapServers = "localhosting:123,localhostinger:12345"
- val kafkaTopics = "top1,top2"
-
- beforeEachTest {
- cut = ArgDcaeAppSimConfiguration()
- }
-
- describe("parsing arguments") {
- lateinit var result: DcaeAppSimConfiguration
-
- given("all parameters are present in the long form") {
-
- beforeEachTest {
- result = cut.parseExpectingSuccess(
- "--listen-port", listenPort,
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--kafka-topics", kafkaTopics
- )
- }
-
- it("should set proper port") {
- assertThat(result.apiPort).isEqualTo(listenPort.toInt())
- }
-
-
- it("should set proper kafka bootstrap servers") {
- assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers)
- }
-
- it("should set proper kafka topics") {
- assertThat(result.kafkaTopics).isEqualTo(
- setOf("top1", "top2")
- )
- }
- }
-
- given("some parameters are present in the short form") {
-
- beforeEachTest {
- result = cut.parseExpectingSuccess(
- "-p", listenPort,
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "-f", kafkaTopics)
- }
-
- it("should set proper port") {
- assertThat(result.apiPort).isEqualTo(listenPort.toInt())
- }
-
- it("should set proper kafka bootstrap servers") {
- assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers)
- }
-
- it("should set proper kafka topics") {
- assertThat(result.kafkaTopics).isEqualTo(
- setOf("top1", "top2")
- )
- }
- }
-
- describe("required parameter is absent") {
- given("kafka topics are missing") {
- it("should throw exception") {
- assertThat(cut.parseExpectingFailure(
- "-p", listenPort,
- "-s", kafkaBootstrapServers
- )).isInstanceOf(WrongArgumentError::class.java)
- }
- }
-
- given("kafka bootstrap servers is missing") {
- it("should throw exception") {
- assertThat(cut.parseExpectingFailure(
- "-p", listenPort,
- "-f", kafkaTopics
- )).isInstanceOf(WrongArgumentError::class.java)
- }
- }
-
- given("listen port is missing") {
- it("should throw exception") {
- assertThat(cut.parseExpectingFailure(
- "-p", listenPort,
- "-s", kafkaBootstrapServers
- )).isInstanceOf(WrongArgumentError::class.java)
- }
- }
- }
- }
-}) \ No newline at end of file