From 7808010c1a18531ee9b618f934d31816193cac38 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Wed, 19 Jun 2019 08:39:33 +0200 Subject: Implement message counting in consumer Issue-ID: DCAEGEN2-1635 Change-Id: I2666de7bad27052d9cefa0f687ad0772d4c9a95d Signed-off-by: kjaniak --- build/hv-collector-coverage/pom.xml | 5 ++ sources/hv-collector-dcae-app-simulator/pom.xml | 10 ++- .../simulators/dcaeapp/impl/DcaeAppSimulator.kt | 14 ++-- .../dcaeapp/impl/adapters/DcaeAppApiServer.kt | 4 +- .../dcaeapp/impl/adapters/KafkaSource.kt | 79 ------------------ .../veshv/simulators/dcaeapp/impl/consumer.kt | 31 +++---- .../collectors/veshv/simulators/dcaeapp/main.kt | 4 +- .../dcaeapp/impl/DcaeAppSimulatorTest.kt | 8 +- .../dcaeapp/impl/adapters/KafkaSourceTest.kt | 54 ------------- sources/hv-collector-kafka-consumer/pom.xml | 34 ++++++-- .../veshv/kafkaconsumer/metrics/Metrics.kt | 4 +- .../kafkaconsumer/metrics/MicrometerMetrics.kt | 3 + .../veshv/kafkaconsumer/state/OffsetConsumer.kt | 41 ++++++++++ .../src/main/resources/logback.xml | 94 ++++++++++++++++++++++ .../kafkaconsumer/state/OffsetConsumerTest.kt | 48 +++++++++++ sources/hv-collector-kafka/pom.xml | 79 ++++++++++++++++++ .../collectors/veshv/kafka/api/ConsumerFactory.kt | 43 ++++++++++ .../collectors/veshv/kafka/api/KafkaConsumer.kt | 27 +++++++ .../collectors/veshv/kafka/impl/KafkaSource.kt | 78 ++++++++++++++++++ .../veshv/kafka/api/ConsumerFactoryTest.kt | 64 +++++++++++++++ .../collectors/veshv/kafka/impl/KafkaSourceTest.kt | 54 +++++++++++++ sources/pom.xml | 1 + 22 files changed, 599 insertions(+), 180 deletions(-) delete mode 100644 sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt delete mode 100644 sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt create mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt create mode 100644 sources/hv-collector-kafka-consumer/src/main/resources/logback.xml create mode 100644 sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt create mode 100644 sources/hv-collector-kafka/pom.xml create mode 100644 sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt create mode 100644 sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt create mode 100644 sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt create mode 100644 sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt create mode 100644 sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml index bfde3ae6..f6b40bb9 100644 --- a/build/hv-collector-coverage/pom.xml +++ b/build/hv-collector-coverage/pom.xml @@ -133,6 +133,11 @@ hv-collector-health-check ${project.parent.version} + + ${project.parent.groupId} + hv-collector-kafka + ${project.parent.version} + ${project.parent.groupId} hv-collector-kafka-consumer diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml index 5c32623b..8cd41bea 100644 --- a/sources/hv-collector-dcae-app-simulator/pom.xml +++ b/sources/hv-collector-dcae-app-simulator/pom.xml @@ -87,13 +87,15 @@ ${project.parent.groupId} - hv-collector-test-utils + hv-collector-kafka ${project.parent.version} - test + compile - io.projectreactor.kafka - reactor-kafka + ${project.parent.groupId} + hv-collector-test-utils + ${project.parent.version} + test com.google.guava diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 122d9bf0..beacfd79 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -28,9 +28,9 @@ import java.util.Collections.synchronizedMap * @author Piotr Jaszczyk * @since August 2018 */ -internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: MutableMap = synchronizedMap(mutableMapOf()) +internal class DcaeAppSimulator(private val consumerFactory: DcaeAppConsumerFactory, + private val messageStreamValidation: MessageStreamValidation) { + private val consumers: MutableMap = synchronizedMap(mutableMapOf()) fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) @@ -42,9 +42,9 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, } logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" } - synchronized(consumerState) { - consumerState.clear() - consumerState.putAll(consumerFactory.createConsumersForTopics(topics)) + synchronized(consumers) { + consumers.clear() + consumers.putAll(consumerFactory.createConsumersFor(topics)) } } @@ -69,7 +69,7 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, fun validate(jsonDescription: InputStream, topic: String) = messageStreamValidation.validate(jsonDescription, currentMessages(topic)) - private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic]) + private fun consumerState(topic: String) = Option.fromNullable(consumers[topic]) private fun currentMessages(topic: String): List = diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 2458b203..992be6e3 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -143,14 +143,14 @@ internal class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { private val responseValid by lazy { Responses.statusResponse( name = "valid", - message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE + message = VALID_RESPONSE_MESSAGE ) } private val responseInvalid by lazy { Responses.statusResponse( name = "invalid", - message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE, + message = INVALID_RESPONSE_MESSAGE, httpStatus = HttpStatus.BAD_REQUEST ) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt deleted file mode 100644 index a108eba7..00000000 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018,2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import reactor.kafka.receiver.KafkaReceiver -import reactor.kafka.receiver.ReceiverOptions -import reactor.kafka.receiver.ReceiverRecord - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class KafkaSource(private val receiver: KafkaReceiver) { - - fun start(): Flux> = - receiver.receive() - .doOnNext { it.receiverOffset().acknowledge() } - .also { logger.info { "Started Kafka source" } } - - companion object { - private val logger = Logger(KafkaSource::class) - - private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" - private const val USERNAME = "admin" - private const val PASSWORD = "admin_secret" - private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" - private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum).name - - fun create(bootstrapServers: String, topics: Set) = - KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) - - fun createReceiverOptions(bootstrapServers: String, - topics: Set): ReceiverOptions? { - val props = mapOf( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", - ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", - ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", - - - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, - SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, - SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG - ) - return ReceiverOptions.create(props) - .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } - .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } - .subscription(topics) - } - } -} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 2de89aae..8a7aafbe 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -19,9 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory +import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.ReceiverRecord import java.util.concurrent.ConcurrentLinkedQueue /** @@ -40,10 +41,9 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue = ConcurrentLinkedQueue() @@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider { override fun reset() = consumedMessages.clear() - fun update(record: ReceiverRecord) { + override fun update(record: ConsumerRecord) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -61,20 +61,11 @@ internal class Consumer : ConsumerStateProvider { } } -internal class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumersForTopics(kafkaTopics: Set): Map = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> - val topicToConsumer = kafkaTopics.associate { it to Consumer() } - kafkaSource.start() - .map { - val topic = it.topic() - topicToConsumer.get(topic)?.update(it) - ?: logger.warn { "No consumer configured for topic $topic" } - }.subscribe() - topicToConsumer - } +internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) { - companion object { - private val logger = Logger(ConsumerFactory::class) - } + private val consumerProvider = { Consumer() } + + fun createConsumersFor(topics: Set) = + ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider) + .mapValues { it.value as Consumer } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 7f4e62bb..25178594 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer @@ -43,7 +43,7 @@ fun main(args: Array): Unit = private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess { logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" } - val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val consumerFactory = DcaeAppConsumerFactory(config.kafkaBootstrapServers) val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index e3e61c81..4ebfb469 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -46,7 +46,7 @@ import kotlin.test.assertFailsWith * @since August 2018 */ internal class DcaeAppSimulatorTest : Spek({ - lateinit var consumerFactory: ConsumerFactory + lateinit var consumerFactory: DcaeAppConsumerFactory lateinit var messageStreamValidation: MessageStreamValidation lateinit var perf3gpp_consumer: Consumer lateinit var faults_consumer: Consumer @@ -59,7 +59,7 @@ internal class DcaeAppSimulatorTest : Spek({ faults_consumer = mock() cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf( + whenever(consumerFactory.createConsumersFor(anySet())).thenReturn(mapOf( PERF3GPP_TOPIC to perf3gpp_consumer, FAULTS_TOPICS to faults_consumer)) } @@ -81,12 +81,12 @@ internal class DcaeAppSimulatorTest : Spek({ it("should subscribe to given topics") { cut.listenToTopics(TWO_TOPICS) - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } it("should subscribe to given topics when called with comma separated list") { cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS") - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt deleted file mode 100644 index de74f628..00000000 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it - -/** - * @author Piotr Jaszczyk @nokia.com> - * @since August 2018 - */ -internal class KafkaSourceTest : Spek({ - val servers = "kafka1:9080,kafka2:9080" - val topics = setOf("topic1", "topic2") - - describe("receiver options") { - val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() - - fun verifyProperty(key: String, expectedValue: Any) { - it("should have $key option set") { - assertThat(options.consumerProperty(key)) - .isEqualTo(expectedValue) - } - } - - verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) - verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") - verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") - verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - } -}) \ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index 1e20d5b1..ef09c063 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -65,21 +65,21 @@ ${project.parent.groupId} hv-collector-commandline ${project.parent.version} + compile ${project.parent.groupId} - hv-collector-test-utils + hv-collector-kafka ${project.parent.version} - test + compile - org.jetbrains.kotlin - kotlin-stdlib-jdk8 + io.micrometer + micrometer-registry-prometheus - com.google.guava - guava - true + org.jetbrains.kotlin + kotlin-stdlib-jdk8 org.slf4j @@ -90,6 +90,26 @@ logback-classic runtime + + com.nhaarman.mockitokotlin2 + mockito-kotlin + test + + + org.jetbrains.kotlin + kotlin-test + test + + + org.jetbrains.spek + spek-api + test + + + org.jetbrains.spek + spek-junit-platform-engine + test + io.micrometer micrometer-registry-prometheus diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index cbdb45dc..64a7fb3e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -19,4 +19,6 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics -internal interface Metrics \ No newline at end of file +internal interface Metrics { + fun notifyOffsetChanged(size: Long) +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index adb1ff1f..f137d074 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -26,6 +26,9 @@ import reactor.core.publisher.Mono internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { + override fun notifyOffsetChanged(size: Long) { + // TODO implementation here + } fun lastStatus(): Mono = Mono.fromCallable { registry.scrape() diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt new file mode 100644 index 00000000..2c6707f9 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + + +internal class OffsetConsumer(private val metrics: Metrics): KafkaConsumer { + + override fun update(record: ConsumerRecord) { + val offset = record.offset() + logger.trace { "Current consumer offset $offset" } + metrics.notifyOffsetChanged(offset) + } + + override fun reset() = Unit + + companion object { + private val logger = Logger(OffsetConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..da0f7f4b --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml @@ -0,0 +1,94 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + ${LOG_PATTERN_IN_USE} + + + + + + ${LOG_PATTERN_IN_USE} + + ${LOG_PATH}/${LOG_FILENAME}.log + + ${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz + 50MB + 30 + 10GB + + + + + + + + + + \ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt new file mode 100644 index 00000000..6fb42d81 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics + +object OffsetConsumerTest : Spek({ + describe("OffsetConsumer with metrics") { + val mockedMetrics = mock() + val offsetConsumer = OffsetConsumer(mockedMetrics) + + on("new update method call") { + val consumerRecord = mock>() + whenever(consumerRecord.offset()).thenReturn(1) + + offsetConsumer.update(consumerRecord) + + it("should notify message offset metric") { + verify(mockedMetrics).notifyOffsetChanged(1) + } + } + } +}) diff --git a/sources/hv-collector-kafka/pom.xml b/sources/hv-collector-kafka/pom.xml new file mode 100644 index 00000000..52105a0e --- /dev/null +++ b/sources/hv-collector-kafka/pom.xml @@ -0,0 +1,79 @@ + + + + 4.0.0 + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + org.onap.dcaegen2.collectors.hv-ves + hv-collector-sources + 1.2.0-SNAPSHOT + .. + + + + hv-collector-kafka + + VES HighVolume Collector :: Kafka + + + + + kotlin-maven-plugin + org.jetbrains.kotlin + + + maven-surefire-plugin + org.apache.maven.plugins + + + + + + + ${project.parent.groupId} + hv-collector-utils + ${project.parent.version} + + + io.projectreactor.kafka + reactor-kafka + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + com.nhaarman.mockitokotlin2 + mockito-kotlin + test + + + org.assertj + assertj-core + test + + + org.jetbrains.spek + spek-api + test + + + org.jetbrains.spek + spek-junit-platform-engine + test + + + org.slf4j + slf4j-api + + + diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt new file mode 100644 index 00000000..88eb8cec --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.onap.dcae.collectors.veshv.kafka.impl.KafkaSource +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +typealias ConsumerProvider = () -> KafkaConsumer + +object ConsumerFactory { + private val logger = Logger(ConsumerFactory::class) + + fun createConsumersForTopics(kafkaBootstrapServers: String, + kafkaTopics: Set, + consumerProvider: ConsumerProvider): Map = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> + val topicToConsumer = kafkaTopics.associate { it to consumerProvider() } + kafkaSource.start() + .map { + val topic = it.topic() + topicToConsumer.get(topic)?.update(it) + ?: logger.warn { "No consumer configured for topic $topic" } + }.subscribe() + topicToConsumer + } +} diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt new file mode 100644 index 00000000..ae797b6e --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt @@ -0,0 +1,27 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.apache.kafka.clients.consumer.ConsumerRecord + +interface KafkaConsumer { + fun reset() + fun update(record: ConsumerRecord) +} diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt new file mode 100644 index 00000000..98934b0d --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018,2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.impl + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.receiver.ReceiverRecord + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class KafkaSource(private val receiver: KafkaReceiver) { + fun start(): Flux> = + receiver.receive() + .doOnNext { it.receiverOffset().acknowledge() } + .also { logger.info { "Started Kafka source" } } + + companion object { + private val logger = Logger(KafkaSource::class) + + private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" + private const val USERNAME = "admin" + private const val PASSWORD = "admin_secret" + private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" + private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum).name + + fun create(bootstrapServers: String, topics: Set) = + KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) + + fun createReceiverOptions(bootstrapServers: String, + topics: Set): ReceiverOptions? { + val props = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", + + + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, + SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, + SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG + ) + return ReceiverOptions.create(props) + .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } + .subscription(topics) + } + } +} diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt new file mode 100644 index 00000000..a8ba4217 --- /dev/null +++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import com.nhaarman.mockitokotlin2.mock +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.entry +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +object ConsumerFactoryTest : Spek({ + describe("ConsumerFactory") { + val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41" + given("consumer provider"){ + val mockedKafkaConsumer = mock() + val consumerProvider = { mockedKafkaConsumer } + on("creation of consumer") { + val kafkaTopics = setOf("topic1", "topic2") + val consumer = ConsumerFactory.createConsumersForTopics( + kafkaBootstrapServers, + kafkaTopics, + consumerProvider) + it("should create consumer"){ + assertThat(consumer).isNotEmpty.hasSize(2) + assertThat(consumer).contains(entry("topic1", mockedKafkaConsumer), + entry("topic2", mockedKafkaConsumer)) + } + } + on("empty kafkaTopics set"){ + val emptyKafkaTopics = emptySet() + val consumer = ConsumerFactory.createConsumersForTopics( + kafkaBootstrapServers, + emptyKafkaTopics, + consumerProvider) + + it("should not create consumer"){ + assertThat(consumer).isEmpty() + } + } + } + + + } +}) \ No newline at end of file diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt new file mode 100644 index 00000000..43650f34 --- /dev/null +++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.impl + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk @nokia.com> + * @since August 2018 + */ +internal class KafkaSourceTest : Spek({ + val servers = "kafka1:9080,kafka2:9080" + val topics = setOf("topic1", "topic2") + + describe("receiver options") { + val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() + + fun verifyProperty(key: String, expectedValue: Any) { + it("should have $key option set") { + assertThat(options.consumerProperty(key)) + .isEqualTo(expectedValue) + } + } + + verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) + verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-consumer") + verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-consumers") + verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + } +}) \ No newline at end of file diff --git a/sources/pom.xml b/sources/pom.xml index c7ba4886..68ccbf1a 100644 --- a/sources/pom.xml +++ b/sources/pom.xml @@ -142,6 +142,7 @@ hv-collector-dcae-app-simulator hv-collector-domain hv-collector-health-check + hv-collector-kafka hv-collector-kafka-consumer hv-collector-main hv-collector-server -- cgit 1.2.3-korg