diff options
Diffstat (limited to 'hv-collector-dcae-app-simulator')
16 files changed, 0 insertions, 1422 deletions
diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile deleted file mode 100644 index a561fff7..00000000 --- a/hv-collector-dcae-app-simulator/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -FROM docker.io/openjdk:11-jre-slim - -LABEL copyright="Copyright (C) 2018 NOKIA" -LABEL license.name="The Apache Software License, Version 2.0" -LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" -LABEL maintainer="Nokia Wroclaw ONAP Team" - -RUN apt-get update \ - && apt-get install -y --no-install-recommends curl \ - && apt-get clean - -WORKDIR /opt/ves-hv-dcae-app-simulator - -ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"] - -COPY target/libs/external/* ./ -COPY target/libs/internal/* ./ -COPY target/hv-collector-dcae-app-simulator-*.jar ./ diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml deleted file mode 100644 index 82e99f95..00000000 --- a/hv-collector-dcae-app-simulator/pom.xml +++ /dev/null @@ -1,155 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ ============LICENSE_START======================================================= - ~ dcaegen2-collectors-veshv - ~ ================================================================================ - ~ Copyright (C) 2018 NOKIA - ~ ================================================================================ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - ~ ============LICENSE_END========================================================= - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - </license> - </licenses> - - <parent> - <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> - <artifactId>ves-hv-collector</artifactId> - <version>1.1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>hv-collector-dcae-app-simulator</artifactId> - <description>VES HighVolume Collector :: Dcae app simulator</description> - - <properties> - <skipAnalysis>false</skipAnalysis> - </properties> - - <build> - <plugins> - <plugin> - <artifactId>kotlin-maven-plugin</artifactId> - <groupId>org.jetbrains.kotlin</groupId> - </plugin> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <groupId>org.apache.maven.plugins</groupId> - </plugin> - </plugins> - </build> - <profiles> - <profile> - <id>docker</id> - <activation> - <property> - <name>!skipDocker</name> - </property> - </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - </plugin> - <plugin> - <groupId>io.fabric8</groupId> - <artifactId>docker-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - </profile> - </profiles> - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-ves-message-generator</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-test-utils</artifactId> - <version>${project.parent.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-reactor</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-syntax</artifactId> - </dependency> - <dependency> - <groupId>io.ratpack</groupId> - <artifactId>ratpack-core</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.kafka</groupId> - <artifactId>reactor-kafka</artifactId> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java-util</artifactId> - </dependency> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-stdlib-jdk8</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>runtime</scope> - </dependency> - </dependencies> - - -</project>
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt deleted file mode 100644 index 490cde4a..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl - -import arrow.core.getOrElse -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.utils.arrow.getOption -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.io.InputStream -import java.util.concurrent.atomic.AtomicReference - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since August 2018 - */ -class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() - - fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) - - fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch { - if (topics.any { it.isBlank() }) - throw IllegalArgumentException("Topic list cannot contain empty elements") - if (topics.isEmpty()) - throw IllegalArgumentException("Topic list cannot be empty") - - logger.info("Received new configuration. Creating consumer for topics: $topics") - consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) - }.fix() - - fun state() = consumerState.getOption().map { it.currentState() } - - fun resetState(): IO<Unit> = consumerState.getOption().fold( - { IO.unit }, - { it.reset() } - ) - - fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) - - private fun currentMessages(): List<ByteArray> = - consumerState.getOption() - .map { it.currentState().consumedMessages } - .getOrElse(::emptyList) - - private fun extractTopics(topicsString: String): Set<String> = - topicsString.substringAfter("=") - .split(",") - .toSet() - - companion object { - private val logger = Logger(DcaeAppSimulator::class) - } -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt deleted file mode 100644 index e423191d..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl - -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.utils.arrow.asIo -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventOuterClass -import java.io.InputStream -import javax.json.Json - -class MessageStreamValidation( - private val messageGenerator: MessageGenerator, - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - - fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = - IO.monadError().bindingCatch { - val messageParams = parseMessageParams(jsonDescription) - val expectedEvents = generateEvents(messageParams).bind() - val actualEvents = decodeConsumedEvents(consumedMessages) - if (shouldValidatePayloads(messageParams)) { - expectedEvents == actualEvents - } else { - validateHeaders(actualEvents, expectedEvents) - } - }.fix() - - private fun parseMessageParams(input: InputStream): List<MessageParameters> { - val expectations = Json.createReader(input).readArray() - val messageParams = messageParametersParser.parse(expectations) - - return messageParams.fold( - { throw IllegalArgumentException("Parsing error: " + it.message) }, - { - if (it.isEmpty()) - throw IllegalArgumentException("Message param list cannot be empty") - it - } - ) - } - - private fun shouldValidatePayloads(parameters: List<MessageParameters>) = - parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } - - private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, - expected: List<VesEventOuterClass.VesEvent>): Boolean { - val consumedHeaders = actual.map { it.commonEventHeader } - val generatedHeaders = expected.map { it.commonEventHeader } - return generatedHeaders == consumedHeaders - } - - private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> = - messageGenerator.createMessageFlux(parameters) - .map(WireFrameMessage::payload) - .map(ByteData::unsafeAsArray) - .map(VesEventOuterClass.VesEvent::parseFrom) - .collectList() - .asIo() - - private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = - consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) - -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt deleted file mode 100644 index 1eca9317..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator -import org.onap.dcae.collectors.veshv.utils.http.HttpConstants -import org.onap.dcae.collectors.veshv.utils.http.HttpStatus -import org.onap.dcae.collectors.veshv.utils.http.Responses -import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors -import org.onap.dcae.collectors.veshv.utils.http.sendOrError -import ratpack.handling.Chain -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { - private val responseValid by lazy { - Responses.statusResponse( - name = "valid", - message = "validation succeeded" - ) - } - - private val responseInvalid by lazy { - Responses.statusResponse( - name = "invalid", - message = "validation failed", - httpStatus = HttpStatus.BAD_REQUEST - ) - } - - - fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = - simulator.listenToTopics(kafkaTopics).map { - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(::setupHandlers) - } - } - - private fun setupHandlers(chain: Chain) { - chain - .put("configuration/topics") { ctx -> - ctx.request.body.then { body -> - val operation = simulator.listenToTopics(body.text) - ctx.response.sendOrError(operation) - } - - } - .delete("messages") { ctx -> - ctx.response.contentType(CONTENT_TEXT) - ctx.response.sendOrError(simulator.resetState()) - } - .get("messages/all/count") { ctx -> - simulator.state().fold( - { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) }, - { - ctx.response - .contentType(CONTENT_TEXT) - .send(it.messagesCount.toString()) - }) - } - .post("messages/all/validate") { ctx -> - ctx.request.body.then { body -> - val response = simulator.validate(body.inputStream) - .map { isValid -> - if (isValid) responseValid else responseInvalid - } - ctx.response.sendAndHandleErrors(response) - } - } - .get("healthcheck") { ctx -> - ctx.response.status(HttpConstants.STATUS_OK).send() - } - } - - companion object { - private const val CONTENT_TEXT = "text/plain" - } -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt deleted file mode 100644 index 10dedbdf..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import arrow.effects.IO -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer -import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.KafkaReceiver -import reactor.kafka.receiver.ReceiverOptions - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - - fun start(): IO<Consumer> = IO { - val consumer = Consumer() - receiver.receive().map(consumer::update).evaluateIo().subscribe() - consumer - } - - companion object { - private val logger = Logger(KafkaSource::class) - - fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { - return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) - } - - fun createReceiverOptions(bootstrapServers: String, - topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { - val props = mapOf<String, Any>( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", - ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" - ) - return ReceiverOptions.create<ByteArray, ByteArray>(props) - .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } - .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } - .subscription(topics) - } - } -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt deleted file mode 100644 index 17eeb5b1..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* dcaegen2-collectors-veshv -* ================================================================================ -* Copyright (C) 2018 NOKIA -* ================================================================================ -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config - -import arrow.core.Option -import arrow.core.fix -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding -import org.apache.commons.cli.CommandLine -import org.apache.commons.cli.DefaultParser -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES -import org.onap.dcae.collectors.veshv.utils.commandline.intValue -import org.onap.dcae.collectors.veshv.utils.commandline.stringValue - -class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { - override val cmdLineOptionsList: List<CommandLineOption> = listOf( - LISTEN_PORT, - MAXIMUM_PAYLOAD_SIZE_BYTES, - KAFKA_SERVERS, - KAFKA_TOPICS - ) - - override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> = - Option.monad().binding { - val listenPort = cmdLine - .intValue(LISTEN_PORT) - .bind() - val maxPayloadSizeBytes = cmdLine - .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) - val kafkaBootstrapServers = cmdLine - .stringValue(KAFKA_SERVERS) - .bind() - val kafkaTopics = cmdLine - .stringValue(KAFKA_TOPICS) - .map { it.split(",").toSet() } - .bind() - - DcaeAppSimConfiguration( - listenPort, - maxPayloadSizeBytes, - kafkaBootstrapServers, - kafkaTopics) - }.fix() -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt deleted file mode 100644 index a6fc8053..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt +++ /dev/null @@ -1,27 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config - -data class DcaeAppSimConfiguration( - val apiPort: Int, - val maxPayloadSizeBytes: Int, - val kafkaBootstrapServers: String, - val kafkaTopics: Set<String> -) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt deleted file mode 100644 index 1eefdbdb..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.ReceiverRecord -import java.util.concurrent.ConcurrentLinkedQueue - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) { - val messagesCount: Int by lazy { - messages.size - } - - val consumedMessages: List<ByteArray> by lazy { - messages.toList() - } -} - -interface ConsumerStateProvider { - fun currentState(): ConsumerState - fun reset(): IO<Unit> -} - -class Consumer : ConsumerStateProvider { - - private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() - - override fun currentState(): ConsumerState = ConsumerState(consumedMessages) - - override fun reset(): IO<Unit> = IO { - consumedMessages.clear() - } - - fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> { - logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } - consumedMessages.add(record.value()) - } - - companion object { - private val logger = Logger(Consumer::class) - } -} - -class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt deleted file mode 100644 index 06ff4d59..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.unit -import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory - -private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" -private val logger = Logger(PACKAGE_NAME) -const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" - -fun main(args: Array<String>) = - ArgDcaeAppSimConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startApp) - .unsafeRunEitherSync( - { ex -> - logger.error("Failed to start a server", ex) - ExitFailure(1) - }, - { - logger.info("Started DCAE-APP Simulator API server") - } - ) - - -private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - logger.info("Using configuration: $config") - val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) - val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) - return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) - .start(config.apiPort, config.kafkaTopics) - .unit() -} diff --git a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml deleted file mode 100644 index 48da3b18..00000000 --- a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml +++ /dev/null @@ -1,36 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> - - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> - - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> - </appender> - - <logger name="org.onap.dcae.collectors.veshv" level="INFO"/> - <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> - - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> -</configuration>
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt deleted file mode 100644 index 08558d76..00000000 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl - -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import reactor.kafka.receiver.ReceiverRecord - - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since August 2018 - */ -internal class ConsumerTest : Spek({ - - lateinit var cut: Consumer - - beforeEachTest { - cut = Consumer() - } - - describe("Consumer which holds the state of received Kafka records") { - it("should contain empty state in the beginning") { - assertEmptyState(cut) - } - - describe("update") { - val value = byteArrayOf(2) - - beforeEachTest { - cut.update(receiverRecord( - topic = "topic", - key = byteArrayOf(1), - value = value - )).unsafeRunSync() - } - - it("should contain one message if it was updated once") { - assertState(cut, value) - } - - it("should contain empty state message if it was reset after update") { - cut.reset().unsafeRunSync() - assertEmptyState(cut) - } - } - } -}) - -fun assertEmptyState(cut: Consumer) { - assertState(cut) -} - -fun assertState(cut: Consumer, vararg values: ByteArray) { - assertThat(cut.currentState().consumedMessages) - .containsOnly(*values) - assertThat(cut.currentState().messagesCount) - .isEqualTo(values.size) -} - -fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = - ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt deleted file mode 100644 index e1641cbb..00000000 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ /dev/null @@ -1,183 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl - -import arrow.core.Left -import arrow.core.None -import arrow.core.Some -import arrow.effects.IO -import com.google.protobuf.ByteString -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.never -import com.nhaarman.mockitokotlin2.verify -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import org.mockito.ArgumentMatchers.anySet -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import org.onap.ves.VesEventOuterClass.VesEvent -import java.util.concurrent.ConcurrentLinkedQueue - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since August 2018 - */ -internal class DcaeAppSimulatorTest : Spek({ - lateinit var consumerFactory: ConsumerFactory - lateinit var messageStreamValidation: MessageStreamValidation - lateinit var consumer: Consumer - lateinit var cut: DcaeAppSimulator - - beforeEachTest { - consumerFactory = mock() - messageStreamValidation = mock() - consumer = mock() - cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - - whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer)) - } - - fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList())) - - describe("listenToTopics") { - val topics = setOf("perf3gpp", "faults") - - it("should fail when topic list is empty") { - val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync() - assertThat(result.isLeft()).isTrue() - } - - it("should fail when topic list contains empty strings") { - val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync() - assertThat(result.isLeft()).isTrue() - } - - it("should subscribe to given topics") { - cut.listenToTopics(topics).unsafeRunSync() - verify(consumerFactory).createConsumerForTopics(topics) - } - - it("should subscribe to given topics when called with comma separated list") { - cut.listenToTopics("perf3gpp,faults").unsafeRunSync() - verify(consumerFactory).createConsumerForTopics(topics) - } - - it("should handle errors") { - // given - val error = RuntimeException("WTF") - whenever(consumerFactory.createConsumerForTopics(anySet())) - .thenReturn(IO.raiseError(error)) - - // when - val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync() - - // then - assertThat(result).isEqualTo(Left(error)) - } - } - - describe("state") { - - it("should return None when topics hasn't been initialized") { - assertThat(cut.state()).isEqualTo(None) - } - - describe("when topics are initialized") { - beforeEachTest { - cut.listenToTopics("perf3gpp").unsafeRunSync() - } - - it("should return some state when it has been set") { - val state = consumerState() - whenever(consumer.currentState()).thenReturn(state) - - assertThat(cut.state()).isEqualTo(Some(state)) - } - } - } - - describe("resetState") { - it("should do nothing when topics hasn't been initialized") { - cut.resetState().unsafeRunSync() - verify(consumer, never()).reset() - } - - describe("when topics are initialized") { - beforeEachTest { - cut.listenToTopics("perf3gpp").unsafeRunSync() - } - - it("should reset the state") { - // given - whenever(consumer.reset()).thenReturn(IO.unit) - - // when - cut.resetState().unsafeRunSync() - - // then - verify(consumer).reset() - } - } - } - - describe("validate") { - beforeEachTest { - whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true)) - } - - it("should use empty list when consumer is unavailable") { - // when - val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() - - // then - verify(messageStreamValidation).validate(any(), eq(emptyList())) - assertThat(result).isTrue() - } - - it("should delegate to MessageStreamValidation") { - // given - cut.listenToTopics("perf3gpp").unsafeRunSync() - whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) - - // when - val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() - - // then - verify(messageStreamValidation).validate(any(), any()) - assertThat(result).isTrue() - } - } -}) - - -private const val DUMMY_EVENT_ID = "aaa" -private const val DUMMY_PAYLOAD = "payload" - -private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { - return VesEvent.newBuilder() - .setCommonEventHeader(CommonEventHeader.newBuilder() - .setEventId(eventId)) - .setEventFields(ByteString.copyFrom(payload.toByteArray())) - .build() -} diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt deleted file mode 100644 index a631be76..00000000 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ /dev/null @@ -1,214 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl - -import arrow.core.Either -import arrow.core.Right -import com.google.protobuf.ByteString -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import org.mockito.ArgumentMatchers.anyList -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux -import javax.json.stream.JsonParsingException - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since August 2018 - */ -internal class MessageStreamValidationTest : Spek({ - lateinit var messageParametersParser: MessageParametersParser - lateinit var messageGenerator: MessageGenerator - lateinit var cut: MessageStreamValidation - - beforeEachTest { - messageParametersParser = mock() - messageGenerator = mock() - cut = MessageStreamValidation(messageGenerator, messageParametersParser) - } - - fun givenParsedMessageParameters(vararg params: MessageParameters) { - whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList())) - } - - describe("validate") { - - it("should return error when JSON is invalid") { - // when - val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync() - - // then - when(result) { - is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java) - else -> fail("validation should fail") - } - } - - it("should return error when message param list is empty") { - // given - givenParsedMessageParameters() - - // when - val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync() - - // then - assertThat(result.isLeft()).isTrue() - } - - describe("when validating headers only") { - it("should return true when messages are the same") { - // given - val jsonAsStream = sampleJsonAsStream() - val event = vesEvent() - val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) - val receivedMessageBytes = event.toByteArray() - - givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) - - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isTrue() - } - - it("should return true when messages differ with payload only") { - // given - val jsonAsStream = sampleJsonAsStream() - val generatedEvent = vesEvent(payload = "payload A") - val receivedEvent = vesEvent(payload = "payload B") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) - val receivedMessageBytes = receivedEvent.toByteArray() - - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) - - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isTrue() - } - - it("should return false when messages are different") { - // given - val jsonAsStream = sampleJsonAsStream() - val generatedEvent = vesEvent() - val receivedEvent = vesEvent(eventId = "bbb") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) - val receivedMessageBytes = receivedEvent.toByteArray() - - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) - - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isFalse() - } - } - - describe("when validating whole messages") { - it("should return true when messages are the same") { - // given - val jsonAsStream = sampleJsonAsStream() - val event = vesEvent() - val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) - val receivedMessageBytes = event.toByteArray() - - givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) - - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isTrue() - } - - it("should return false when messages differ with payload only") { - // given - val jsonAsStream = sampleJsonAsStream() - val generatedEvent = vesEvent(payload = "payload A") - val receivedEvent = vesEvent(payload = "payload B") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) - val receivedMessageBytes = receivedEvent.toByteArray() - - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) - - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isFalse() - } - - it("should return false when messages are different") { - // given - val jsonAsStream = sampleJsonAsStream() - val generatedEvent = vesEvent() - val receivedEvent = vesEvent("bbb") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) - val receivedMessageBytes = receivedEvent.toByteArray() - - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) - - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isFalse() - } - } - } -}) - - - -private const val DUMMY_EVENT_ID = "aaa" -private const val DUMMY_PAYLOAD = "payload" - -private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { - return VesEvent.newBuilder() - .setCommonEventHeader(CommonEventHeader.newBuilder() - .setEventId(eventId)) - .setEventFields(ByteString.copyFrom(payload.toByteArray())) - .build() -} - -private const val sampleJsonArray = """["headersOnly"]""" - -private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream() diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt deleted file mode 100644 index de74f628..00000000 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> - * @since August 2018 - */ -internal class KafkaSourceTest : Spek({ - val servers = "kafka1:9080,kafka2:9080" - val topics = setOf("topic1", "topic2") - - describe("receiver options") { - val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() - - fun verifyProperty(key: String, expectedValue: Any) { - it("should have $key option set") { - assertThat(options.consumerProperty(key)) - .isEqualTo(expectedValue) - } - } - - verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) - verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") - verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") - verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - } -})
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt deleted file mode 100644 index 7137fe12..00000000 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError - - -internal class ArgDcaeAppSimConfigurationTest : Spek({ - - lateinit var cut: ArgDcaeAppSimConfiguration - val listenPort = "1234" - val kafkaBootstrapServers = "localhosting:123,localhostinger:12345" - val kafkaTopics = "top1,top2" - - beforeEachTest { - cut = ArgDcaeAppSimConfiguration() - } - - describe("parsing arguments") { - lateinit var result: DcaeAppSimConfiguration - - given("all parameters are present in the long form") { - - beforeEachTest { - result = cut.parseExpectingSuccess( - "--listen-port", listenPort, - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--kafka-topics", kafkaTopics - ) - } - - it("should set proper port") { - assertThat(result.apiPort).isEqualTo(listenPort.toInt()) - } - - - it("should set proper kafka bootstrap servers") { - assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) - } - - it("should set proper kafka topics") { - assertThat(result.kafkaTopics).isEqualTo( - setOf("top1", "top2") - ) - } - } - - given("some parameters are present in the short form") { - - beforeEachTest { - result = cut.parseExpectingSuccess( - "-p", listenPort, - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "-f", kafkaTopics) - } - - it("should set proper port") { - assertThat(result.apiPort).isEqualTo(listenPort.toInt()) - } - - it("should set proper kafka bootstrap servers") { - assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) - } - - it("should set proper kafka topics") { - assertThat(result.kafkaTopics).isEqualTo( - setOf("top1", "top2") - ) - } - } - - describe("required parameter is absent") { - given("kafka topics are missing") { - it("should throw exception") { - assertThat(cut.parseExpectingFailure( - "-p", listenPort, - "-s", kafkaBootstrapServers - )).isInstanceOf(WrongArgumentError::class.java) - } - } - - given("kafka bootstrap servers is missing") { - it("should throw exception") { - assertThat(cut.parseExpectingFailure( - "-p", listenPort, - "-f", kafkaTopics - )).isInstanceOf(WrongArgumentError::class.java) - } - } - - given("listen port is missing") { - it("should throw exception") { - assertThat(cut.parseExpectingFailure( - "-p", listenPort, - "-s", kafkaBootstrapServers - )).isInstanceOf(WrongArgumentError::class.java) - } - } - } - } -})
\ No newline at end of file |