From d00acee05c05c7e3146abf7d13b78953f9a0d3f9 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 24 Aug 2018 12:51:14 +0200 Subject: Improve DCAE APP Simulator coverage Also there was a need to refactor the code, because application logic was placed inside Ratpack handlers. Change-Id: Iba3d4d039a98ba88e0dba580c1b7726b53440538 Issue-ID: DCAEGEN2-732 Signed-off-by: Piotr Jaszczyk --- hv-collector-dcae-app-simulator/pom.xml | 12 +- .../dcaeapp/config/ArgDcaeAppSimConfiguration.kt | 62 ------ .../dcaeapp/config/DcaeAppSimConfiguration.kt | 26 --- .../simulators/dcaeapp/impl/DcaeAppSimulator.kt | 74 +++++++ .../dcaeapp/impl/MessageStreamValidation.kt | 85 ++++++++ .../simulators/dcaeapp/impl/adapters/ApiServer.kt | 129 ++++++++++++ .../dcaeapp/impl/adapters/KafkaSource.kt | 65 ++++++ .../impl/config/ArgDcaeAppSimConfiguration.kt | 62 ++++++ .../dcaeapp/impl/config/DcaeAppSimConfiguration.kt | 26 +++ .../veshv/simulators/dcaeapp/impl/consumer.kt | 70 +++++++ .../veshv/simulators/dcaeapp/kafka/KafkaSource.kt | 60 ------ .../veshv/simulators/dcaeapp/kafka/consumer.kt | 71 ------- .../collectors/veshv/simulators/dcaeapp/main.kt | 14 +- .../veshv/simulators/dcaeapp/remote/ApiServer.kt | 169 ---------------- .../config/ArgDcaeAppSimConfigurationTest.kt | 125 ------------ .../veshv/simulators/dcaeapp/impl/ConsumerTest.kt | 83 ++++++++ .../dcaeapp/impl/DcaeAppSimulatorTest.kt | 184 +++++++++++++++++ .../dcaeapp/impl/MessageStreamValidationTest.kt | 224 +++++++++++++++++++++ .../dcaeapp/impl/adapters/KafkaSourceTest.kt | 54 +++++ .../impl/config/ArgDcaeAppSimConfigurationTest.kt | 125 ++++++++++++ .../org.mockito.plugins.MockMaker | 1 + .../org/onap/dcae/collectors/veshv/domain/codec.kt | 2 +- .../collectors/veshv/domain/WireFrameCodecsTest.kt | 2 +- .../onap/dcae/collectors/veshv/utils/arrow/core.kt | 5 +- .../dcae/collectors/veshv/utils/arrow/effects.kt | 16 ++ .../collectors/veshv/utils/arrow/CoreKtTest.kt | 63 ++++++ pom.xml | 5 + 27 files changed, 1290 insertions(+), 524 deletions(-) delete mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt delete mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt delete mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt delete mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt delete mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt delete mode 100644 hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt create mode 100644 hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt create mode 100644 hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt create mode 100644 hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt create mode 100644 hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt create mode 100644 hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt create mode 100644 hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index 47f71ba6..ce4a2715 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -19,8 +19,8 @@ ~ ============LICENSE_END========================================================= --> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 @@ -104,6 +104,14 @@ io.arrow-kt arrow-effects + + io.arrow-kt + arrow-effects-reactor + + + io.arrow-kt + arrow-syntax + io.ratpack ratpack-core diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt deleted file mode 100644 index 065cdf92..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* dcaegen2-collectors-veshv -* ================================================================================ -* Copyright (C) 2018 NOKIA -* ================================================================================ -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config - -import arrow.core.ForOption -import arrow.core.Option -import arrow.core.fix -import arrow.instances.extensions -import arrow.typeclasses.binding -import org.apache.commons.cli.CommandLine -import org.apache.commons.cli.DefaultParser -import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT - -class ArgDcaeAppSimConfiguration : ArgBasedConfiguration(DefaultParser()) { - override val cmdLineOptionsList: List = listOf( - LISTEN_PORT, - KAFKA_SERVERS, - KAFKA_TOPICS - ) - - override fun getConfiguration(cmdLine: CommandLine): Option = - ForOption extensions { - binding { - val listenPort = cmdLine - .intValue(LISTEN_PORT) - .bind() - val kafkaBootstrapServers = cmdLine - .stringValue(KAFKA_SERVERS) - .bind() - val kafkaTopics = cmdLine - .stringValue(KAFKA_TOPICS) - .map { it.split(",").toSet() } - .bind() - - DcaeAppSimConfiguration( - listenPort, - kafkaBootstrapServers, - kafkaTopics) - }.fix() - } -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt deleted file mode 100644 index 5bd2d155..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config - -data class DcaeAppSimConfiguration( - val apiPort: Int, - val kafkaBootstrapServers: String, - val kafkaTopics: Set -) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt new file mode 100644 index 00000000..262e05bf --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.getOrElse +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.io.InputStream +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, + private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) { + private val consumerState: AtomicReference = AtomicReference() + + fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) + + fun listenToTopics(topics: Set): IO = IO.monadError().bindingCatch { + if (topics.any { it.isBlank() }) + throw IllegalArgumentException("Topic list cannot contain empty elements") + if (topics.isEmpty()) + throw IllegalArgumentException("Topic list cannot be empty") + + logger.info("Received new configuration. Creating consumer for topics: $topics") + consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) + }.fix() + + fun state() = consumerState.getOption().map { it.currentState() } + + fun resetState(): IO = consumerState.getOption().fold( + { IO.unit }, + { it.reset() } + ) + + fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + + private fun currentMessages(): List = + consumerState.getOption() + .map { it.currentState().consumedMessages } + .getOrElse(::emptyList) + + private fun extractTopics(topicsString: String): Set = + topicsString.substringAfter("=") + .split(",") + .toSet() + + companion object { + private val logger = Logger(DcaeAppSimulator::class) + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt new file mode 100644 index 00000000..239f7102 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5 +import java.io.InputStream +import javax.json.Json + +class MessageStreamValidation( + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, + private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + + fun validate(jsonDescription: InputStream, consumedMessages: List): IO = + IO.monadError().bindingCatch { + val messageParams = parseMessageParams(jsonDescription) + val expectedEvents = generateEvents(messageParams).bind() + val actualEvents = decodeConsumedEvents(consumedMessages) + if (shouldValidatePayloads(messageParams)) { + expectedEvents == actualEvents + } else { + validateHeaders(actualEvents, expectedEvents) + } + }.fix() + + private fun parseMessageParams(input: InputStream): List { + val expectations = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(expectations) + + if (messageParams.isEmpty()) + throw IllegalArgumentException("Message param list cannot be empty") + + return messageParams + } + + private fun shouldValidatePayloads(parameters: List) = + parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + + + private fun validateHeaders(actual: List, expected: List): Boolean { + val consumedHeaders = actual.map { it.commonEventHeader } + val generatedHeaders = expected.map { it.commonEventHeader } + return generatedHeaders == consumedHeaders + } + + + private fun generateEvents(parameters: List): IO> = + messageGenerator.createMessageFlux(parameters) + .map(PayloadWireFrameMessage::payload) + .map(ByteData::unsafeAsArray) + .map(VesEventV5.VesEvent::parseFrom) + .collectList() + .asIo() + + private fun decodeConsumedEvents(consumedMessages: List) = + consumedMessages.map(VesEventV5.VesEvent::parseFrom) + +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt new file mode 100644 index 00000000..6c830b9d --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt @@ -0,0 +1,129 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.core.Left +import arrow.core.Right +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import ratpack.exec.Promise +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.http.Response +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class ApiServer(private val simulator: DcaeAppSimulator) { + + + fun start(port: Int, kafkaTopics: Set): IO = + simulator.listenToTopics(kafkaTopics).map { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(::setupHandlers) + } + } + + private fun setupHandlers(chain: Chain) { + chain + .put("configuration/topics") { ctx -> + val operation = ctx.bodyIo().flatMap { body -> + simulator.listenToTopics(body.text) + } + ctx.response.sendOrError(operation) + + } + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + ctx.response.sendOrError(simulator.resetState()) + } + .get("messages/all/count") { ctx -> + simulator.state().fold( + { ctx.response.status(STATUS_NOT_FOUND) }, + { + ctx.response + .contentType(CONTENT_TEXT) + .send(it.messagesCount.toString()) + }) + } + .post("messages/all/validate") { ctx -> + val responseStatus = IO.monad().binding { + val body = ctx.bodyIo().bind() + val isValid = simulator.validate(body.inputStream).bind() + if (isValid) + STATUS_OK + else + STATUS_BAD_REQUEST + }.fix() + + ctx.response.sendStatusOrError(responseStatus) + } + .get("healthcheck") { ctx -> + ctx.response.status(STATUS_OK).send() + } + } + + private fun Context.bodyIo() = request.body.asIo() + + private fun Promise.asIo(): IO = IO.async { emitResult -> + onError { + emitResult(Left(it)) + }.then { result -> + emitResult(Right(result)) + } + } + + private fun Response.sendOrError(responseStatus: IO) { + sendStatusOrError(responseStatus.map { STATUS_OK }) + } + + private fun Response.sendStatusOrError(responseStatus: IO) { + responseStatus.unsafeRunAsync { cb -> + cb.fold( + { err -> + logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err) + status(ApiServer.STATUS_INTERNAL_SERVER_ERROR) + .send(CONTENT_TEXT, err.message) + }, + { + status(it).send() + } + ) + } + } + + companion object { + private val logger = Logger(ApiServer::class) + private const val CONTENT_TEXT = "text/plain" + + private const val STATUS_OK = 200 + private const val STATUS_BAD_REQUEST = 400 + private const val STATUS_NOT_FOUND = 404 + private const val STATUS_INTERNAL_SERVER_ERROR = 500 + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt new file mode 100644 index 00000000..15965174 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.effects.IO +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer +import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class KafkaSource(private val receiver: KafkaReceiver) { + + fun start(): IO = IO { + val consumer = Consumer() + receiver.receive().map(consumer::update).evaluateIo().subscribe() + consumer + } + + companion object { + private val logger = Logger(KafkaSource::class) + + fun create(bootstrapServers: String, topics: Set): KafkaSource { + return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) + } + + fun createReceiverOptions(bootstrapServers: String, topics: Set): ReceiverOptions? { + val props = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" + ) + return ReceiverOptions.create(props) + .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } + .subscription(topics) + } + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt new file mode 100644 index 00000000..d5f55605 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -0,0 +1,62 @@ +/* +* ============LICENSE_START======================================================= +* dcaegen2-collectors-veshv +* ================================================================================ +* Copyright (C) 2018 NOKIA +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +import arrow.core.ForOption +import arrow.core.Option +import arrow.core.fix +import arrow.instances.extensions +import arrow.typeclasses.binding +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT + +class ArgDcaeAppSimConfiguration : ArgBasedConfiguration(DefaultParser()) { + override val cmdLineOptionsList: List = listOf( + LISTEN_PORT, + KAFKA_SERVERS, + KAFKA_TOPICS + ) + + override fun getConfiguration(cmdLine: CommandLine): Option = + ForOption extensions { + binding { + val listenPort = cmdLine + .intValue(LISTEN_PORT) + .bind() + val kafkaBootstrapServers = cmdLine + .stringValue(KAFKA_SERVERS) + .bind() + val kafkaTopics = cmdLine + .stringValue(KAFKA_TOPICS) + .map { it.split(",").toSet() } + .bind() + + DcaeAppSimConfiguration( + listenPort, + kafkaBootstrapServers, + kafkaTopics) + }.fix() + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt new file mode 100644 index 00000000..c114313d --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +data class DcaeAppSimConfiguration( + val apiPort: Int, + val kafkaBootstrapServers: String, + val kafkaTopics: Set +) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt new file mode 100644 index 00000000..1eefdbdb --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -0,0 +1,70 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.kafka.receiver.ReceiverRecord +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class ConsumerState(private val messages: ConcurrentLinkedQueue) { + val messagesCount: Int by lazy { + messages.size + } + + val consumedMessages: List by lazy { + messages.toList() + } +} + +interface ConsumerStateProvider { + fun currentState(): ConsumerState + fun reset(): IO +} + +class Consumer : ConsumerStateProvider { + + private var consumedMessages: ConcurrentLinkedQueue = ConcurrentLinkedQueue() + + override fun currentState(): ConsumerState = ConsumerState(consumedMessages) + + override fun reset(): IO = IO { + consumedMessages.clear() + } + + fun update(record: ReceiverRecord) = IO { + logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } + consumedMessages.add(record.value()) + } + + companion object { + private val logger = Logger(Consumer::class) + } +} + +class ConsumerFactory(private val kafkaBootstrapServers: String) { + fun createConsumerForTopics(kafkaTopics: Set): IO = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt deleted file mode 100644 index d53609ca..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka - -import arrow.effects.IO -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.KafkaReceiver -import reactor.kafka.receiver.ReceiverOptions -import java.util.* - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -class KafkaSource(private val receiver: KafkaReceiver) { - - fun start(): IO = IO { - val consumer = Consumer() - receiver.receive().subscribe(consumer::update) - consumer - } - - companion object { - private val logger = Logger(KafkaSource::class) - - fun create(bootstrapServers: String, topics: Set): KafkaSource { - val props = HashMap() - props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator" - props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators" - props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" - val receiverOptions = ReceiverOptions.create(props) - .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } - .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } - .subscription(topics) - return KafkaSource(KafkaReceiver.create(receiverOptions)) - } - } -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt deleted file mode 100644 index 08bb149f..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.ReceiverRecord -import java.util.concurrent.ConcurrentLinkedQueue - -/** - * @author Piotr Jaszczyk - * @since June 2018 - */ -class ConsumerState(private val messages: ConcurrentLinkedQueue){ - val messagesCount: Int by lazy { - messages.size - } - - val consumedMessages: List by lazy { - messages.toList() - } -} - -interface ConsumerStateProvider { - fun currentState(): ConsumerState - fun reset(): IO -} - -class Consumer : ConsumerStateProvider { - - private var consumedMessages: ConcurrentLinkedQueue = ConcurrentLinkedQueue() - - override fun currentState(): ConsumerState = ConsumerState(consumedMessages) - - override fun reset(): IO = IO { - consumedMessages.clear() - } - - fun update(record: ReceiverRecord) { - logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } - consumedMessages.add(record.value()) - } - - - companion object { - private val logger = Logger(Consumer::class) - } -} - -class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set): ConsumerStateProvider { - return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync() - } -} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 9f84fc4d..a65a2686 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,10 +20,12 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgDcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.void @@ -50,7 +52,7 @@ fun main(args: Array) = private fun startApp(config: DcaeAppSimConfiguration): IO { - return ApiServer(ConsumerFactory(config.kafkaBootstrapServers)) + return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) .start(config.apiPort, config.kafkaTopics) .void() -} \ No newline at end of file +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt deleted file mode 100644 index cd258134..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ /dev/null @@ -1,169 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD -import org.onap.ves.VesEventV5.VesEvent -import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import reactor.core.publisher.Mono -import javax.json.Json - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -class ApiServer(private val consumerFactory: ConsumerFactory, - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - - private lateinit var consumerState: ConsumerStateProvider - - fun start(port: Int, kafkaTopics: Set): IO = IO { - consumerState = consumerFactory.createConsumerForTopics(kafkaTopics) - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(this::setupHandlers) - } - } - - private fun setupHandlers(chain: Chain) { - chain - .put("configuration/topics") { ctx -> - ctx.request.body.then { it -> - val topics = extractTopics(it.text) - logger.info("Received new configuration. Creating consumer for topics: $topics") - consumerState = consumerFactory.createConsumerForTopics(topics) - ctx.response - .status(STATUS_OK) - .send() - } - - } - .delete("messages") { ctx -> - ctx.response.contentType(CONTENT_TEXT) - consumerState.reset() - .unsafeRunAsync { - it.fold( - { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) }, - { ctx.response.status(STATUS_OK) } - ).send() - } - } - .get("messages/all/count") { ctx -> - val state = consumerState.currentState() - ctx.response - .contentType(CONTENT_TEXT) - .send(state.messagesCount.toString()) - } - .post("messages/all/validate") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { generateEvents(ctx, it) } - .then { (generatedEvents, shouldValidatePayloads) -> - generatedEvents - .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) } - .block() - } - } - .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() - } - } - - private fun generateEvents(ctx: Context, parameters: List): - Pair>, Boolean> = Pair( - - doGenerateEvents(parameters).doOnError { - logger.error("Error occurred when generating messages: $it") - ctx.response - .status(STATUS_INTERNAL_SERVER_ERROR) - .send() - }, - parameters.all { it.messageType == FIXED_PAYLOAD } - ) - - private fun doGenerateEvents(parameters: List): Mono> = MessageGenerator.INSTANCE - .createMessageFlux(parameters) - .map(PayloadWireFrameMessage::payload) - .map { decode(it.unsafeAsArray()) } - .collectList() - - - private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes) - - - private fun sendResponse(ctx: Context, - generatedEvents: List, - shouldValidatePayloads: Boolean) = - resolveResponseStatusCode( - generated = generatedEvents, - consumed = decodeConsumedEvents(), - validatePayloads = shouldValidatePayloads - ).let { ctx.response.status(it).send() } - - - private fun decodeConsumedEvents(): List = consumerState - .currentState() - .consumedMessages - .map(::decode) - - - private fun resolveResponseStatusCode(generated: List, - consumed: List, - validatePayloads: Boolean): Int = - if (validatePayloads) { - if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST - } else { - validateHeaders(consumed, generated) - } - - private fun validateHeaders(consumed: List, generated: List): Int { - val consumedHeaders = consumed.map { it.commonEventHeader } - val generatedHeaders = generated.map { it.commonEventHeader } - return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST - } - - private fun extractTopics(it: String): Set = - it.substringAfter("=") - .split(",") - .toSet() - - companion object { - private val logger = Logger(ApiServer::class) - private const val CONTENT_TEXT = "text/plain" - - private const val STATUS_OK = 200 - private const val STATUS_BAD_REQUEST = 400 - private const val STATUS_INTERNAL_SERVER_ERROR = 500 - } -} - - diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt deleted file mode 100644 index 7d887939..00000000 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError - - -internal class ArgDcaeAppSimConfigurationTest : Spek({ - - lateinit var cut: ArgDcaeAppSimConfiguration - val listenPort = "1234" - val kafkaBootstrapServers = "localhosting:123,localhostinger:12345" - val kafkaTopics = "top1,top2" - - beforeEachTest { - cut = ArgDcaeAppSimConfiguration() - } - - describe("parsing arguments") { - lateinit var result: DcaeAppSimConfiguration - - given("all parameters are present in the long form") { - - beforeEachTest { - result = cut.parseExpectingSuccess( - "--listen-port", listenPort, - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--kafka-topics", kafkaTopics - ) - } - - it("should set proper port") { - assertThat(result.apiPort).isEqualTo(listenPort.toInt()) - } - - - it("should set proper kafka bootstrap servers") { - assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) - } - - it("should set proper kafka topics") { - assertThat(result.kafkaTopics).isEqualTo( - setOf("top1", "top2") - ) - } - } - - given("some parameters are present in the short form") { - - beforeEachTest { - result = cut.parseExpectingSuccess( - "-p", listenPort, - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "-f", kafkaTopics) - } - - it("should set proper port") { - assertThat(result.apiPort).isEqualTo(listenPort.toInt()) - } - - it("should set proper kafka bootstrap servers") { - assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) - } - - it("should set proper kafka topics") { - assertThat(result.kafkaTopics).isEqualTo( - setOf("top1", "top2") - ) - } - } - - describe("required parameter is absent") { - given("kafka topics are missing") { - it("should throw exception") { - assertThat(cut.parseExpectingFailure( - "-p", listenPort, - "-s", kafkaBootstrapServers - )).isInstanceOf(WrongArgumentError::class.java) - } - } - - given("kafka bootstrap servers is missing") { - it("should throw exception") { - assertThat(cut.parseExpectingFailure( - "-p", listenPort, - "-f", kafkaTopics - )).isInstanceOf(WrongArgumentError::class.java) - } - } - - given("listen port is missing") { - it("should throw exception") { - assertThat(cut.parseExpectingFailure( - "-p", kafkaTopics, - "-s", kafkaBootstrapServers - )).isInstanceOf(WrongArgumentError::class.java) - } - } - } - } -}) \ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt new file mode 100644 index 00000000..debe9554 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import reactor.kafka.receiver.ReceiverRecord + + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +internal class ConsumerTest : Spek({ + + lateinit var cut: Consumer + + beforeEachTest { + cut = Consumer() + } + + describe("Consumer which holds the state of received Kafka records") { + it("should contain empty state in the beginning") { + assertEmptyState(cut) + } + + describe("update") { + val value = byteArrayOf(2) + + beforeEachTest { + cut.update(receiverRecord( + topic = "topic", + key = byteArrayOf(1), + value = value + )).unsafeRunSync() + } + + it("should contain one message if it was updated once") { + assertState(cut, value) + } + + it("should contain empty state message if it was reset after update") { + cut.reset().unsafeRunSync() + assertEmptyState(cut) + } + } + } +}) + +fun assertEmptyState(cut: Consumer) { + assertState(cut) +} + +fun assertState(cut: Consumer, vararg values: ByteArray) { + assertThat(cut.currentState().consumedMessages) + .containsOnly(*values) + assertThat(cut.currentState().messagesCount) + .isEqualTo(values.size) +} + +fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = + ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt new file mode 100644 index 00000000..c0ba5812 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -0,0 +1,184 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.Left +import arrow.core.None +import arrow.core.Some +import arrow.effects.IO +import com.google.protobuf.ByteString +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.eq +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.mockito.ArgumentMatchers.anySet +import org.mockito.Mockito +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +internal class DcaeAppSimulatorTest : Spek({ + lateinit var consumerFactory: ConsumerFactory + lateinit var messageStreamValidation: MessageStreamValidation + lateinit var consumer: Consumer + lateinit var cut: DcaeAppSimulator + + beforeEachTest { + consumerFactory = mock() + messageStreamValidation = mock() + consumer = mock() + cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) + + whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer)) + } + + fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList())) + + describe("listenToTopics") { + val topics = setOf("hvMeas", "faults") + + it("should fail when topic list is empty") { + val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync() + assertThat(result.isLeft()).isTrue() + } + + it("should fail when topic list contains empty strings") { + val result = cut.listenToTopics(setOf("hvMeas", " ", "faults")).attempt().unsafeRunSync() + assertThat(result.isLeft()).isTrue() + } + + it("should subscribe to given topics") { + cut.listenToTopics(topics).unsafeRunSync() + verify(consumerFactory).createConsumerForTopics(topics) + } + + it("should subscribe to given topics when called with comma separated list") { + cut.listenToTopics("hvMeas,faults").unsafeRunSync() + verify(consumerFactory).createConsumerForTopics(topics) + } + + it("should handle errors") { + // given + val error = RuntimeException("WTF") + whenever(consumerFactory.createConsumerForTopics(anySet())) + .thenReturn(IO.raiseError(error)) + + // when + val result = cut.listenToTopics("hvMeas").attempt().unsafeRunSync() + + // then + assertThat(result).isEqualTo(Left(error)) + } + } + + describe("state") { + + it("should return None when topics hasn't been initialized") { + assertThat(cut.state()).isEqualTo(None) + } + + describe("when topics are initialized") { + beforeEachTest { + cut.listenToTopics("hvMeas").unsafeRunSync() + } + + it("should return some state when it has been set") { + val state = consumerState() + whenever(consumer.currentState()).thenReturn(state) + + assertThat(cut.state()).isEqualTo(Some(state)) + } + } + } + + describe("resetState") { + it("should do nothing when topics hasn't been initialized") { + cut.resetState().unsafeRunSync() + verify(consumer, never()).reset() + } + + describe("when topics are initialized") { + beforeEachTest { + cut.listenToTopics("hvMeas").unsafeRunSync() + } + + it("should reset the state") { + // given + whenever(consumer.reset()).thenReturn(IO.unit) + + // when + cut.resetState().unsafeRunSync() + + // then + verify(consumer).reset() + } + } + } + + describe("validate") { + beforeEachTest { + whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true)) + } + + it("should use empty list when consumer is unavailable") { + // when + val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + + // then + verify(messageStreamValidation).validate(any(), eq(emptyList())) + assertThat(result).isTrue() + } + + it("should delegate to MessageStreamValidation") { + // given + cut.listenToTopics("hvMeas").unsafeRunSync() + whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) + + // when + val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + + // then + verify(messageStreamValidation).validate(any(), any()) + assertThat(result).isTrue() + } + } +}) + + +private const val DUMMY_EVENT_ID = "aaa" +private const val DUMMY_PAYLOAD = "payload" + +private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setEventId(eventId)) + .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray())) + .build() +} diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt new file mode 100644 index 00000000..0bdd1159 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -0,0 +1,224 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.Either +import arrow.core.Left +import arrow.core.None +import arrow.core.Some +import arrow.effects.IO +import javax.json.stream.JsonParsingException +import com.google.protobuf.ByteString +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.mockito.ArgumentMatchers.anyList +import org.mockito.ArgumentMatchers.anySet +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import java.util.concurrent.ConcurrentLinkedQueue +import javax.json.Json +import javax.json.JsonArray +import javax.json.JsonValue + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +internal class MessageStreamValidationTest : Spek({ + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + lateinit var cut: MessageStreamValidation + + beforeEachTest { + messageParametersParser = mock() + messageGenerator = mock() + cut = MessageStreamValidation(messageParametersParser, messageGenerator) + } + + fun givenParsedMessageParameters(vararg params: MessageParameters) { + whenever(messageParametersParser.parse(any())).thenReturn(params.toList()) + } + + describe("validate") { + + it("should return error when JSON is invalid") { + // when + val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync() + + // then + when(result) { + is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java) + else -> fail("validation should fail") + } + } + + it("should return error when message param list is empty") { + // given + givenParsedMessageParameters() + + // when + val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync() + + // then + assertThat(result.isLeft()).isTrue() + } + + describe("when validating headers only") { + it("should return true when messages are the same") { + // given + val jsonAsStream = sampleJsonAsStream() + val event = vesEvent() + val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray()) + val receivedMessageBytes = event.toByteArray() + + givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return true when messages differ with payload only") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent(payload = "payload A") + val receivedEvent = vesEvent(payload = "payload B") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return false when messages are different") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent() + val receivedEvent = vesEvent(eventId = "bbb") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + } + + describe("when validating whole messages") { + it("should return true when messages are the same") { + // given + val jsonAsStream = sampleJsonAsStream() + val event = vesEvent() + val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray()) + val receivedMessageBytes = event.toByteArray() + + givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return false when messages differ with payload only") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent(payload = "payload A") + val receivedEvent = vesEvent(payload = "payload B") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + + it("should return false when messages are different") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent() + val receivedEvent = vesEvent("bbb") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + } + } +}) + + + +private const val DUMMY_EVENT_ID = "aaa" +private const val DUMMY_PAYLOAD = "payload" + +private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setEventId(eventId)) + .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray())) + .build() +} + +private const val sampleJsonArray = """["headersOnly"]""" + +private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream() diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt new file mode 100644 index 00000000..de74f628 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk @nokia.com> + * @since August 2018 + */ +internal class KafkaSourceTest : Spek({ + val servers = "kafka1:9080,kafka2:9080" + val topics = setOf("topic1", "topic2") + + describe("receiver options") { + val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() + + fun verifyProperty(key: String, expectedValue: Any) { + it("should have $key option set") { + assertThat(options.consumerProperty(key)) + .isEqualTo(expectedValue) + } + } + + verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) + verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") + verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") + verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + } +}) \ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt new file mode 100644 index 00000000..e7a22fcf --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt @@ -0,0 +1,125 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess +import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError + + +internal class ArgDcaeAppSimConfigurationTest : Spek({ + + lateinit var cut: ArgDcaeAppSimConfiguration + val listenPort = "1234" + val kafkaBootstrapServers = "localhosting:123,localhostinger:12345" + val kafkaTopics = "top1,top2" + + beforeEachTest { + cut = ArgDcaeAppSimConfiguration() + } + + describe("parsing arguments") { + lateinit var result: DcaeAppSimConfiguration + + given("all parameters are present in the long form") { + + beforeEachTest { + result = cut.parseExpectingSuccess( + "--listen-port", listenPort, + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "--kafka-topics", kafkaTopics + ) + } + + it("should set proper port") { + assertThat(result.apiPort).isEqualTo(listenPort.toInt()) + } + + + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) + } + + it("should set proper kafka topics") { + assertThat(result.kafkaTopics).isEqualTo( + setOf("top1", "top2") + ) + } + } + + given("some parameters are present in the short form") { + + beforeEachTest { + result = cut.parseExpectingSuccess( + "-p", listenPort, + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "-f", kafkaTopics) + } + + it("should set proper port") { + assertThat(result.apiPort).isEqualTo(listenPort.toInt()) + } + + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers) + } + + it("should set proper kafka topics") { + assertThat(result.kafkaTopics).isEqualTo( + setOf("top1", "top2") + ) + } + } + + describe("required parameter is absent") { + given("kafka topics are missing") { + it("should throw exception") { + assertThat(cut.parseExpectingFailure( + "-p", listenPort, + "-s", kafkaBootstrapServers + )).isInstanceOf(WrongArgumentError::class.java) + } + } + + given("kafka bootstrap servers is missing") { + it("should throw exception") { + assertThat(cut.parseExpectingFailure( + "-p", listenPort, + "-f", kafkaTopics + )).isInstanceOf(WrongArgumentError::class.java) + } + } + + given("listen port is missing") { + it("should throw exception") { + assertThat(cut.parseExpectingFailure( + "-p", kafkaTopics, + "-s", kafkaBootstrapServers + )).isInstanceOf(WrongArgumentError::class.java) + } + } + } + } +}) \ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..ca6ee9ce --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index b2e42509..c61ab266 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R * @author Piotr Jaszczyk * @since June 2018 */ -class WireFrameEncoder(private val allocator: ByteBufAllocator) { +class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { fun encode(frame: PayloadWireFrameMessage): ByteBuf { val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 89d1f32e..fa63c36e 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -40,7 +40,7 @@ import kotlin.test.fail */ object WireFrameCodecsTest : Spek({ val payloadAsString = "coffeebabe" - val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT) + val encoder = WireFrameEncoder() val decoder = WireFrameDecoder() fun createSampleFrame() = diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index 39964c1e..844d18d8 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -20,12 +20,15 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either +import arrow.core.Option import arrow.core.identity +import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk * @since July 2018 */ - fun Either.flatten() = fold(::identity, ::identity) + +fun AtomicReference.getOption() = Option.fromNullable(get()) diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index e37b0d7d..cef537e8 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -20,7 +20,11 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either +import arrow.core.Left +import arrow.core.Right import arrow.effects.IO +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import kotlin.system.exitProcess /** @@ -47,3 +51,15 @@ fun Either, IO>.unsafeRunEitherSync(onError: (Throwable) -> ExitC fun IO.void() = map { Unit } + +fun Mono.asIo() = IO.async { proc -> + subscribe({ proc(Right(it)) }, { proc(Left(it)) }) +} + +fun Flux>.evaluateIo(): Flux = + flatMap { io -> + io.attempt().unsafeRunSync().fold( + { Flux.error(it) }, + { Flux.just(it) } + ) + } diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt new file mode 100644 index 00000000..585851be --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.arrow + +import arrow.core.None +import arrow.core.Some +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import java.util.concurrent.atomic.AtomicReference + + +/** + * @author Piotr Jaszczyk @nokia.com> + * @since August 2018 + */ +internal class CoreKtTest: Spek({ + describe("AtomicReference.getOption") { + given("empty atomic reference") { + val atomicReference = AtomicReference() + + on("getOption") { + val result = atomicReference.getOption() + + it("should be None") { + assertThat(result).isEqualTo(None) + } + } + } + given("non-empty atomic reference") { + val initialValue = "reksio" + val atomicReference = AtomicReference(initialValue) + + on("getOption") { + val result = atomicReference.getOption() + + it("should be Some($initialValue)") { + assertThat(result).isEqualTo(Some(initialValue)) + } + } + } + } +}) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7ab65034..03dac1f9 100644 --- a/pom.xml +++ b/pom.xml @@ -559,6 +559,11 @@ arrow-effects ${arrow.version} + + io.arrow-kt + arrow-effects-reactor + ${arrow.version} + ch.qos.logback logback-classic -- cgit 1.2.3-korg