From dde383a2aa75f94c26d7949665b79cc95486a223 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 28 Nov 2018 15:46:50 +0100 Subject: Custom detekt rule for logger usage check Check if logger invocations don't use unoptimal invocations, eg. concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}` Unfortunately to avoid defining dependencies in many places and having circural dependencies it was necessarry to reorganize the maven module structure. The goal was to have `sources` module with production code and `build` module with build-time tooling (detekt rules among them). Issue-ID: DCAEGEN2-1002 Change-Id: I36e677b98972aaae6905d722597cbce5e863d201 Signed-off-by: Piotr Jaszczyk --- .../simulators/dcaeapp/impl/DcaeAppSimulator.kt | 74 +++++++++++++++ .../dcaeapp/impl/MessageStreamValidation.kt | 88 ++++++++++++++++++ .../dcaeapp/impl/adapters/DcaeAppApiServer.kt | 101 +++++++++++++++++++++ .../dcaeapp/impl/adapters/KafkaSource.kt | 66 ++++++++++++++ .../impl/config/ArgDcaeAppSimConfiguration.kt | 67 ++++++++++++++ .../dcaeapp/impl/config/DcaeAppSimConfiguration.kt | 27 ++++++ .../veshv/simulators/dcaeapp/impl/consumer.kt | 70 ++++++++++++++ .../collectors/veshv/simulators/dcaeapp/main.kt | 62 +++++++++++++ .../src/main/resources/logback.xml | 36 ++++++++ 9 files changed, 591 insertions(+) create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt create mode 100644 sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml (limited to 'sources/hv-collector-dcae-app-simulator/src/main') diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt new file mode 100644 index 00000000..490cde4a --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.getOrElse +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.io.InputStream +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, + private val messageStreamValidation: MessageStreamValidation) { + private val consumerState: AtomicReference = AtomicReference() + + fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) + + fun listenToTopics(topics: Set): IO = IO.monadError().bindingCatch { + if (topics.any { it.isBlank() }) + throw IllegalArgumentException("Topic list cannot contain empty elements") + if (topics.isEmpty()) + throw IllegalArgumentException("Topic list cannot be empty") + + logger.info("Received new configuration. Creating consumer for topics: $topics") + consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) + }.fix() + + fun state() = consumerState.getOption().map { it.currentState() } + + fun resetState(): IO = consumerState.getOption().fold( + { IO.unit }, + { it.reset() } + ) + + fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + + private fun currentMessages(): List = + consumerState.getOption() + .map { it.currentState().consumedMessages } + .getOrElse(::emptyList) + + private fun extractTopics(topicsString: String): Set = + topicsString.substringAfter("=") + .split(",") + .toSet() + + companion object { + private val logger = Logger(DcaeAppSimulator::class) + } +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt new file mode 100644 index 00000000..e423191d --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventOuterClass +import java.io.InputStream +import javax.json.Json + +class MessageStreamValidation( + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + + fun validate(jsonDescription: InputStream, consumedMessages: List): IO = + IO.monadError().bindingCatch { + val messageParams = parseMessageParams(jsonDescription) + val expectedEvents = generateEvents(messageParams).bind() + val actualEvents = decodeConsumedEvents(consumedMessages) + if (shouldValidatePayloads(messageParams)) { + expectedEvents == actualEvents + } else { + validateHeaders(actualEvents, expectedEvents) + } + }.fix() + + private fun parseMessageParams(input: InputStream): List { + val expectations = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(expectations) + + return messageParams.fold( + { throw IllegalArgumentException("Parsing error: " + it.message) }, + { + if (it.isEmpty()) + throw IllegalArgumentException("Message param list cannot be empty") + it + } + ) + } + + private fun shouldValidatePayloads(parameters: List) = + parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + + private fun validateHeaders(actual: List, + expected: List): Boolean { + val consumedHeaders = actual.map { it.commonEventHeader } + val generatedHeaders = expected.map { it.commonEventHeader } + return generatedHeaders == consumedHeaders + } + + private fun generateEvents(parameters: List): IO> = + messageGenerator.createMessageFlux(parameters) + .map(WireFrameMessage::payload) + .map(ByteData::unsafeAsArray) + .map(VesEventOuterClass.VesEvent::parseFrom) + .collectList() + .asIo() + + private fun decodeConsumedEvents(consumedMessages: List) = + consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) + +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt new file mode 100644 index 00000000..1eca9317 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -0,0 +1,101 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendOrError +import ratpack.handling.Chain +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { + private val responseValid by lazy { + Responses.statusResponse( + name = "valid", + message = "validation succeeded" + ) + } + + private val responseInvalid by lazy { + Responses.statusResponse( + name = "invalid", + message = "validation failed", + httpStatus = HttpStatus.BAD_REQUEST + ) + } + + + fun start(port: Int, kafkaTopics: Set): IO = + simulator.listenToTopics(kafkaTopics).map { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(::setupHandlers) + } + } + + private fun setupHandlers(chain: Chain) { + chain + .put("configuration/topics") { ctx -> + ctx.request.body.then { body -> + val operation = simulator.listenToTopics(body.text) + ctx.response.sendOrError(operation) + } + + } + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + ctx.response.sendOrError(simulator.resetState()) + } + .get("messages/all/count") { ctx -> + simulator.state().fold( + { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) }, + { + ctx.response + .contentType(CONTENT_TEXT) + .send(it.messagesCount.toString()) + }) + } + .post("messages/all/validate") { ctx -> + ctx.request.body.then { body -> + val response = simulator.validate(body.inputStream) + .map { isValid -> + if (isValid) responseValid else responseInvalid + } + ctx.response.sendAndHandleErrors(response) + } + } + .get("healthcheck") { ctx -> + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + companion object { + private const val CONTENT_TEXT = "text/plain" + } +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt new file mode 100644 index 00000000..10dedbdf --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.effects.IO +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer +import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class KafkaSource(private val receiver: KafkaReceiver) { + + fun start(): IO = IO { + val consumer = Consumer() + receiver.receive().map(consumer::update).evaluateIo().subscribe() + consumer + } + + companion object { + private val logger = Logger(KafkaSource::class) + + fun create(bootstrapServers: String, topics: Set): KafkaSource { + return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) + } + + fun createReceiverOptions(bootstrapServers: String, + topics: Set): ReceiverOptions? { + val props = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" + ) + return ReceiverOptions.create(props) + .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } + .subscription(topics) + } + } +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt new file mode 100644 index 00000000..17eeb5b1 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -0,0 +1,67 @@ +/* +* ============LICENSE_START======================================================= +* dcaegen2-collectors-veshv +* ================================================================================ +* Copyright (C) 2018 NOKIA +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +import arrow.core.Option +import arrow.core.fix +import arrow.instances.option.monad.monad +import arrow.typeclasses.binding +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES +import org.onap.dcae.collectors.veshv.utils.commandline.intValue +import org.onap.dcae.collectors.veshv.utils.commandline.stringValue + +class ArgDcaeAppSimConfiguration : ArgBasedConfiguration(DefaultParser()) { + override val cmdLineOptionsList: List = listOf( + LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, + KAFKA_SERVERS, + KAFKA_TOPICS + ) + + override fun getConfiguration(cmdLine: CommandLine): Option = + Option.monad().binding { + val listenPort = cmdLine + .intValue(LISTEN_PORT) + .bind() + val maxPayloadSizeBytes = cmdLine + .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + val kafkaBootstrapServers = cmdLine + .stringValue(KAFKA_SERVERS) + .bind() + val kafkaTopics = cmdLine + .stringValue(KAFKA_TOPICS) + .map { it.split(",").toSet() } + .bind() + + DcaeAppSimConfiguration( + listenPort, + maxPayloadSizeBytes, + kafkaBootstrapServers, + kafkaTopics) + }.fix() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt new file mode 100644 index 00000000..a6fc8053 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -0,0 +1,27 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config + +data class DcaeAppSimConfiguration( + val apiPort: Int, + val maxPayloadSizeBytes: Int, + val kafkaBootstrapServers: String, + val kafkaTopics: Set +) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt new file mode 100644 index 00000000..1eefdbdb --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -0,0 +1,70 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.kafka.receiver.ReceiverRecord +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class ConsumerState(private val messages: ConcurrentLinkedQueue) { + val messagesCount: Int by lazy { + messages.size + } + + val consumedMessages: List by lazy { + messages.toList() + } +} + +interface ConsumerStateProvider { + fun currentState(): ConsumerState + fun reset(): IO +} + +class Consumer : ConsumerStateProvider { + + private var consumedMessages: ConcurrentLinkedQueue = ConcurrentLinkedQueue() + + override fun currentState(): ConsumerState = ConsumerState(consumedMessages) + + override fun reset(): IO = IO { + consumedMessages.clear() + } + + fun update(record: ReceiverRecord) = IO { + logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } + consumedMessages.add(record.value()) + } + + companion object { + private val logger = Logger(Consumer::class) + } +} + +class ConsumerFactory(private val kafkaBootstrapServers: String) { + fun createConsumerForTopics(kafkaTopics: Set): IO = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt new file mode 100644 index 00000000..06ff4d59 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer +import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync +import org.onap.dcae.collectors.veshv.utils.arrow.unit +import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory + +private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" +private val logger = Logger(PACKAGE_NAME) +const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" + +fun main(args: Array) = + ArgDcaeAppSimConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map(::startApp) + .unsafeRunEitherSync( + { ex -> + logger.error("Failed to start a server", ex) + ExitFailure(1) + }, + { + logger.info("Started DCAE-APP Simulator API server") + } + ) + + +private fun startApp(config: DcaeAppSimConfiguration): IO { + logger.info("Using configuration: $config") + val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) + .start(config.apiPort, config.kafkaTopics) + .unit() +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..48da3b18 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml @@ -0,0 +1,36 @@ + + + + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + + + + + + + ${FILE_LOG_PATTERN} + + ${LOG_FILE} + + ${LOG_FILE}.%d{yyyy-MM-dd}.log + 50MB + 30 + 10GB + + + + + + + + + + + \ No newline at end of file -- cgit 1.2.3-korg