diff options
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator')
16 files changed, 1422 insertions, 0 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/Dockerfile b/sources/hv-collector-dcae-app-simulator/Dockerfile new file mode 100644 index 00000000..a561fff7 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/Dockerfile @@ -0,0 +1,18 @@ +FROM docker.io/openjdk:11-jre-slim + +LABEL copyright="Copyright (C) 2018 NOKIA" +LABEL license.name="The Apache Software License, Version 2.0" +LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" +LABEL maintainer="Nokia Wroclaw ONAP Team" + +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl \ + && apt-get clean + +WORKDIR /opt/ves-hv-dcae-app-simulator + +ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"] + +COPY target/libs/external/* ./ +COPY target/libs/internal/* ./ +COPY target/hv-collector-dcae-app-simulator-*.jar ./ diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml new file mode 100644 index 00000000..c34e885d --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/pom.xml @@ -0,0 +1,155 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-dcae-app-simulator</artifactId> + <description>VES HighVolume Collector :: Dcae app simulator</description> + + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>docker</id> + <activation> + <property> + <name>!skipDocker</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-domain</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-ves-message-generator</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects-instances</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects-reactor</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-syntax</artifactId> + </dependency> + <dependency> + <groupId>io.ratpack</groupId> + <artifactId>ratpack-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + + +</project>
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt new file mode 100644 index 00000000..490cde4a --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.getOrElse +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.io.InputStream +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, + private val messageStreamValidation: MessageStreamValidation) { + private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() + + fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) + + fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch { + if (topics.any { it.isBlank() }) + throw IllegalArgumentException("Topic list cannot contain empty elements") + if (topics.isEmpty()) + throw IllegalArgumentException("Topic list cannot be empty") + + logger.info("Received new configuration. Creating consumer for topics: $topics") + consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) + }.fix() + + fun state() = consumerState.getOption().map { it.currentState() } + + fun resetState(): IO<Unit> = consumerState.getOption().fold( + { IO.unit }, + { it.reset() } + ) + + fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + + private fun currentMessages(): List<ByteArray> = + consumerState.getOption() + .map { it.currentState().consumedMessages } + .getOrElse(::emptyList) + + private fun extractTopics(topicsString: String): Set<String> = + topicsString.substringAfter("=") + .split(",") + .toSet() + + companion object { + private val logger = Logger(DcaeAppSimulator::class) + } +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt new file mode 100644 index 00000000..e423191d --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventOuterClass +import java.io.InputStream +import javax.json.Json + +class MessageStreamValidation( + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + + fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = + IO.monadError().bindingCatch { + val messageParams = parseMessageParams(jsonDescription) + val expectedEvents = generateEvents(messageParams).bind() + val actualEvents = decodeConsumedEvents(consumedMessages) + if (shouldValidatePayloads(messageParams)) { + expectedEvents == actualEvents + } else { + validateHeaders(actualEvents, expectedEvents) + } + }.fix() + + private fun parseMessageParams(input: InputStream): List<MessageParameters> { + val expectations = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(expectations) + + return messageParams.fold( + { throw IllegalArgumentException("Parsing error: " + it.message) }, + { + if (it.isEmpty()) + throw IllegalArgumentException("Message param list cannot be empty") + it + } + ) + } + + private fun shouldValidatePayloads(parameters: List<MessageParameters>) = + parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + + private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, + expected: List<VesEventOuterClass.VesEvent>): Boolean { + val consumedHeaders = actual.map { it.commonEventHeader } + val generatedHeaders = expected.map { it.commonEventHeader } + return generatedHeaders == consumedHeaders + } + + private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> = + messageGenerator.createMessageFlux(parameters) + .map(WireFrameMessage::payload) + .map(ByteData::unsafeAsArray) + .map(VesEventOuterClass.VesEvent::parseFrom) + .collectList() + .asIo() + + private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = + consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) + +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt new file mode 100644 index 00000000..1eca9317 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -0,0 +1,101 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendOrError +import ratpack.handling.Chain +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { + private val responseValid by lazy { + Responses.statusResponse( + name = "valid", + message = "validation succeeded" + ) + } + + private val responseInvalid by lazy { + Responses.statusResponse( + name = "invalid", + message = "validation failed", + httpStatus = HttpStatus.BAD_REQUEST + ) + } + + + fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = + simulator.listenToTopics(kafkaTopics).map { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(::setupHandlers) + } + } + + private fun setupHandlers(chain: Chain) { + chain + .put("configuration/topics") { ctx -> + ctx.request.body.then { body -> + val operation = simulator.listenToTopics(body.text) + ctx.response.sendOrError(operation) + } + + } + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + ctx.response.sendOrError(simulator.resetState()) + } + .get("messages/all/count") { ctx -> + simulator.state().fold( + { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) }, + { + ctx.response + .contentType(CONTENT_TEXT) + .send(it.messagesCount.toString()) + }) + } + .post("messages/all/validate") { ctx -> + ctx.request.body.then { body -> + val response = simulator.validate(body.inputStream) + .map { isValid -> + if (isValid) responseValid else responseInvalid + } + ctx.response.sendAndHandleErrors(response) + } + } + .get("healthcheck") { ctx -> + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + companion object { + private const val CONTENT_TEXT = "text/plain" + } +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt new file mode 100644 index 00000000..10dedbdf --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.effects.IO +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer +import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { + + fun start(): IO<Consumer> = IO { + val consumer = Consumer() + receiver.receive().map(consumer::update).evaluateIo().subscribe() + consumer + } + + companion object { + private val logger = Logger(KafkaSource::class) + + fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { + return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) + } + + fun createReceiverOptions(bootstrapServers: String, + topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { + val props = mapOf<String, Any>( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" + ) + return ReceiverOptions.create<ByteArray, ByteArray>(props) + .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } + .subscription(topics) + } + } +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt new file mode 100644 index 00000000..17eeb5b1 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -0,0 +1,67 @@ +/* +* ============LICENSE_START======================================================= +* dcaegen2-collectors-veshv +* ================================================================================ +* Copyright (C) 2018 NOKIA +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +import arrow.core.Option +import arrow.core.fix +import arrow.instances.option.monad.monad +import arrow.typeclasses.binding +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES +import org.onap.dcae.collectors.veshv.utils.commandline.intValue +import org.onap.dcae.collectors.veshv.utils.commandline.stringValue + +class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { + override val cmdLineOptionsList: List<CommandLineOption> = listOf( + LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, + KAFKA_SERVERS, + KAFKA_TOPICS + ) + + override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> = + Option.monad().binding { + val listenPort = cmdLine + .intValue(LISTEN_PORT) + .bind() + val maxPayloadSizeBytes = cmdLine + .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + val kafkaBootstrapServers = cmdLine + .stringValue(KAFKA_SERVERS) + .bind() + val kafkaTopics = cmdLine + .stringValue(KAFKA_TOPICS) + .map { it.split(",").toSet() } + .bind() + + DcaeAppSimConfiguration( + listenPort, + maxPayloadSizeBytes, + kafkaBootstrapServers, + kafkaTopics) + }.fix() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt new file mode 100644 index 00000000..a6fc8053 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -0,0 +1,27 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +data class DcaeAppSimConfiguration( + val apiPort: Int, + val maxPayloadSizeBytes: Int, + val kafkaBootstrapServers: String, + val kafkaTopics: Set<String> +) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt new file mode 100644 index 00000000..1eefdbdb --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -0,0 +1,70 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.kafka.receiver.ReceiverRecord +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) { + val messagesCount: Int by lazy { + messages.size + } + + val consumedMessages: List<ByteArray> by lazy { + messages.toList() + } +} + +interface ConsumerStateProvider { + fun currentState(): ConsumerState + fun reset(): IO<Unit> +} + +class Consumer : ConsumerStateProvider { + + private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() + + override fun currentState(): ConsumerState = ConsumerState(consumedMessages) + + override fun reset(): IO<Unit> = IO { + consumedMessages.clear() + } + + fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> { + logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } + consumedMessages.add(record.value()) + } + + companion object { + private val logger = Logger(Consumer::class) + } +} + +class ConsumerFactory(private val kafkaBootstrapServers: String) { + fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt new file mode 100644 index 00000000..06ff4d59 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer +import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync +import org.onap.dcae.collectors.veshv.utils.arrow.unit +import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory + +private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" +private val logger = Logger(PACKAGE_NAME) +const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" + +fun main(args: Array<String>) = + ArgDcaeAppSimConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map(::startApp) + .unsafeRunEitherSync( + { ex -> + logger.error("Failed to start a server", ex) + ExitFailure(1) + }, + { + logger.info("Started DCAE-APP Simulator API server") + } + ) + + +private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { + logger.info("Using configuration: $config") + val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) + .start(config.apiPort, config.kafkaTopics) + .unit() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..48da3b18 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="INFO"/> + <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt new file mode 100644 index 00000000..08558d76 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import reactor.kafka.receiver.ReceiverRecord + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +internal class ConsumerTest : Spek({ + + lateinit var cut: Consumer + + beforeEachTest { + cut = Consumer() + } + + describe("Consumer which holds the state of received Kafka records") { + it("should contain empty state in the beginning") { + assertEmptyState(cut) + } + + describe("update") { + val value = byteArrayOf(2) + + beforeEachTest { + cut.update(receiverRecord( + topic = "topic", + key = byteArrayOf(1), + value = value + )).unsafeRunSync() + } + + it("should contain one message if it was updated once") { + assertState(cut, value) + } + + it("should contain empty state message if it was reset after update") { + cut.reset().unsafeRunSync() + assertEmptyState(cut) + } + } + } +}) + +fun assertEmptyState(cut: Consumer) { + assertState(cut) +} + +fun assertState(cut: Consumer, vararg values: ByteArray) { + assertThat(cut.currentState().consumedMessages) + .containsOnly(*values) + assertThat(cut.currentState().messagesCount) + .isEqualTo(values.size) +} + +fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = + ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt new file mode 100644 index 00000000..e1641cbb --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -0,0 +1,183 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.Left +import arrow.core.None +import arrow.core.Some +import arrow.effects.IO +import com.google.protobuf.ByteString +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.never +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.mockito.ArgumentMatchers.anySet +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +internal class DcaeAppSimulatorTest : Spek({ + lateinit var consumerFactory: ConsumerFactory + lateinit var messageStreamValidation: MessageStreamValidation + lateinit var consumer: Consumer + lateinit var cut: DcaeAppSimulator + + beforeEachTest { + consumerFactory = mock() + messageStreamValidation = mock() + consumer = mock() + cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) + + whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer)) + } + + fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList())) + + describe("listenToTopics") { + val topics = setOf("perf3gpp", "faults") + + it("should fail when topic list is empty") { + val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync() + assertThat(result.isLeft()).isTrue() + } + + it("should fail when topic list contains empty strings") { + val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync() + assertThat(result.isLeft()).isTrue() + } + + it("should subscribe to given topics") { + cut.listenToTopics(topics).unsafeRunSync() + verify(consumerFactory).createConsumerForTopics(topics) + } + + it("should subscribe to given topics when called with comma separated list") { + cut.listenToTopics("perf3gpp,faults").unsafeRunSync() + verify(consumerFactory).createConsumerForTopics(topics) + } + + it("should handle errors") { + // given + val error = RuntimeException("WTF") + whenever(consumerFactory.createConsumerForTopics(anySet())) + .thenReturn(IO.raiseError(error)) + + // when + val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync() + + // then + assertThat(result).isEqualTo(Left(error)) + } + } + + describe("state") { + + it("should return None when topics hasn't been initialized") { + assertThat(cut.state()).isEqualTo(None) + } + + describe("when topics are initialized") { + beforeEachTest { + cut.listenToTopics("perf3gpp").unsafeRunSync() + } + + it("should return some state when it has been set") { + val state = consumerState() + whenever(consumer.currentState()).thenReturn(state) + + assertThat(cut.state()).isEqualTo(Some(state)) + } + } + } + + describe("resetState") { + it("should do nothing when topics hasn't been initialized") { + cut.resetState().unsafeRunSync() + verify(consumer, never()).reset() + } + + describe("when topics are initialized") { + beforeEachTest { + cut.listenToTopics("perf3gpp").unsafeRunSync() + } + + it("should reset the state") { + // given + whenever(consumer.reset()).thenReturn(IO.unit) + + // when + cut.resetState().unsafeRunSync() + + // then + verify(consumer).reset() + } + } + } + + describe("validate") { + beforeEachTest { + whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true)) + } + + it("should use empty list when consumer is unavailable") { + // when + val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + + // then + verify(messageStreamValidation).validate(any(), eq(emptyList())) + assertThat(result).isTrue() + } + + it("should delegate to MessageStreamValidation") { + // given + cut.listenToTopics("perf3gpp").unsafeRunSync() + whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) + + // when + val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + + // then + verify(messageStreamValidation).validate(any(), any()) + assertThat(result).isTrue() + } + } +}) + + +private const val DUMMY_EVENT_ID = "aaa" +private const val DUMMY_PAYLOAD = "payload" + +private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setEventId(eventId)) + .setEventFields(ByteString.copyFrom(payload.toByteArray())) + .build() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt new file mode 100644 index 00000000..a631be76 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -0,0 +1,214 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.Either +import arrow.core.Right +import com.google.protobuf.ByteString +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.mockito.ArgumentMatchers.anyList +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import javax.json.stream.JsonParsingException + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +internal class MessageStreamValidationTest : Spek({ + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + lateinit var cut: MessageStreamValidation + + beforeEachTest { + messageParametersParser = mock() + messageGenerator = mock() + cut = MessageStreamValidation(messageGenerator, messageParametersParser) + } + + fun givenParsedMessageParameters(vararg params: MessageParameters) { + whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList())) + } + + describe("validate") { + + it("should return error when JSON is invalid") { + // when + val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync() + + // then + when(result) { + is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java) + else -> fail("validation should fail") + } + } + + it("should return error when message param list is empty") { + // given + givenParsedMessageParameters() + + // when + val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync() + + // then + assertThat(result.isLeft()).isTrue() + } + + describe("when validating headers only") { + it("should return true when messages are the same") { + // given + val jsonAsStream = sampleJsonAsStream() + val event = vesEvent() + val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) + val receivedMessageBytes = event.toByteArray() + + givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return true when messages differ with payload only") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent(payload = "payload A") + val receivedEvent = vesEvent(payload = "payload B") + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return false when messages are different") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent() + val receivedEvent = vesEvent(eventId = "bbb") + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + } + + describe("when validating whole messages") { + it("should return true when messages are the same") { + // given + val jsonAsStream = sampleJsonAsStream() + val event = vesEvent() + val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) + val receivedMessageBytes = event.toByteArray() + + givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return false when messages differ with payload only") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent(payload = "payload A") + val receivedEvent = vesEvent(payload = "payload B") + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + + it("should return false when messages are different") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent() + val receivedEvent = vesEvent("bbb") + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + } + } +}) + + + +private const val DUMMY_EVENT_ID = "aaa" +private const val DUMMY_PAYLOAD = "payload" + +private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setEventId(eventId)) + .setEventFields(ByteString.copyFrom(payload.toByteArray())) + .build() +} + +private const val sampleJsonArray = """["headersOnly"]""" + +private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream() diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt new file mode 100644 index 00000000..de74f628 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since August 2018 + */ +internal class KafkaSourceTest : Spek({ + val servers = "kafka1:9080,kafka2:9080" + val topics = setOf("topic1", "topic2") + + describe("receiver options") { + val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() + + fun verifyProperty(key: String, expectedValue: Any) { + it("should have $key option set") { + assertThat(options.consumerProperty(key)) + .isEqualTo(expectedValue) + } + } + + verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) + verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") + verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") + verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + } +})
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt new file mode 100644 index 00000000..7137fe12 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt @@ -0,0 +1,125 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess +import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError + + +internal class ArgDcaeAppSimConfigurationTest : Spek({ + + lateinit var cut: ArgDcaeAppSimConfiguration + val listenPort = "1234" + val kafkaBootstrapServers = "localhosting:123,localhostinger:12345" + val kafkaTopics = "top1,top2" + + beforeEachTest { + cut = ArgDcaeAppSimConfiguration() + } + + describe("parsing arguments") { + lateinit var result: DcaeAppSimConfiguration + + given("all parameters are present in the long form") { + + beforeEachTest { + result = cut.parseExpectingSuccess( + "--listen-port", listenPort, + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "--kafka-topics", kafkaTopics + ) + } + + it("should set proper port") { + assertThat(result.apiPort).isEqualTo(listenPort.toInt()) + } + + + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) + } + + it("should set proper kafka topics") { + assertThat(result.kafkaTopics).isEqualTo( + setOf("top1", "top2") + ) + } + } + + given("some parameters are present in the short form") { + + beforeEachTest { + result = cut.parseExpectingSuccess( + "-p", listenPort, + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "-f", kafkaTopics) + } + + it("should set proper port") { + assertThat(result.apiPort).isEqualTo(listenPort.toInt()) + } + + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) + } + + it("should set proper kafka topics") { + assertThat(result.kafkaTopics).isEqualTo( + setOf("top1", "top2") + ) + } + } + + describe("required parameter is absent") { + given("kafka topics are missing") { + it("should throw exception") { + assertThat(cut.parseExpectingFailure( + "-p", listenPort, + "-s", kafkaBootstrapServers + )).isInstanceOf(WrongArgumentError::class.java) + } + } + + given("kafka bootstrap servers is missing") { + it("should throw exception") { + assertThat(cut.parseExpectingFailure( + "-p", listenPort, + "-f", kafkaTopics + )).isInstanceOf(WrongArgumentError::class.java) + } + } + + given("listen port is missing") { + it("should throw exception") { + assertThat(cut.parseExpectingFailure( + "-p", listenPort, + "-s", kafkaBootstrapServers + )).isInstanceOf(WrongArgumentError::class.java) + } + } + } + } +})
\ No newline at end of file |