diff options
author | kjaniak <kornel.janiak@nokia.com> | 2019-06-19 08:39:33 +0200 |
---|---|---|
committer | kjaniak <kornel.janiak@nokia.com> | 2019-06-25 15:52:13 +0200 |
commit | 7808010c1a18531ee9b618f934d31816193cac38 (patch) | |
tree | 3680678be6f909054ff3a022aeda35b1ee5e47ac | |
parent | e52b0ef6c1c3b502f840f08a17b6594e17957db1 (diff) |
Implement message counting in consumer
Issue-ID: DCAEGEN2-1635
Change-Id: I2666de7bad27052d9cefa0f687ad0772d4c9a95d
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
20 files changed, 473 insertions, 54 deletions
diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml index bfde3ae6..f6b40bb9 100644 --- a/build/hv-collector-coverage/pom.xml +++ b/build/hv-collector-coverage/pom.xml @@ -135,6 +135,11 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-kafka</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-kafka-consumer</artifactId> <version>${project.parent.version}</version> </dependency> diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml index 5c32623b..8cd41bea 100644 --- a/sources/hv-collector-dcae-app-simulator/pom.xml +++ b/sources/hv-collector-dcae-app-simulator/pom.xml @@ -87,13 +87,15 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-test-utils</artifactId> + <artifactId>hv-collector-kafka</artifactId> <version>${project.parent.version}</version> - <scope>test</scope> + <scope>compile</scope> </dependency> <dependency> - <groupId>io.projectreactor.kafka</groupId> - <artifactId>reactor-kafka</artifactId> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>com.google.guava</groupId> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 122d9bf0..beacfd79 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -28,9 +28,9 @@ import java.util.Collections.synchronizedMap * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf()) +internal class DcaeAppSimulator(private val consumerFactory: DcaeAppConsumerFactory, + private val messageStreamValidation: MessageStreamValidation) { + private val consumers: MutableMap<String, Consumer> = synchronizedMap(mutableMapOf()) fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) @@ -42,9 +42,9 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, } logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" } - synchronized(consumerState) { - consumerState.clear() - consumerState.putAll(consumerFactory.createConsumersForTopics(topics)) + synchronized(consumers) { + consumers.clear() + consumers.putAll(consumerFactory.createConsumersFor(topics)) } } @@ -69,7 +69,7 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, fun validate(jsonDescription: InputStream, topic: String) = messageStreamValidation.validate(jsonDescription, currentMessages(topic)) - private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic]) + private fun consumerState(topic: String) = Option.fromNullable(consumers[topic]) private fun currentMessages(topic: String): List<ByteArray> = diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 2458b203..992be6e3 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -143,14 +143,14 @@ internal class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { private val responseValid by lazy { Responses.statusResponse( name = "valid", - message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE + message = VALID_RESPONSE_MESSAGE ) } private val responseInvalid by lazy { Responses.statusResponse( name = "invalid", - message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE, + message = INVALID_RESPONSE_MESSAGE, httpStatus = HttpStatus.BAD_REQUEST ) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 2de89aae..8a7aafbe 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -19,9 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory +import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.ReceiverRecord import java.util.concurrent.ConcurrentLinkedQueue /** @@ -40,10 +41,9 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr internal interface ConsumerStateProvider { fun currentState(): ConsumerState - fun reset() } -internal class Consumer : ConsumerStateProvider { +internal class Consumer : KafkaConsumer, ConsumerStateProvider { private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() @@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider { override fun reset() = consumedMessages.clear() - fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + override fun update(record: ConsumerRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -61,20 +61,11 @@ internal class Consumer : ConsumerStateProvider { } } -internal class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> - val topicToConsumer = kafkaTopics.associate { it to Consumer() } - kafkaSource.start() - .map { - val topic = it.topic() - topicToConsumer.get(topic)?.update(it) - ?: logger.warn { "No consumer configured for topic $topic" } - }.subscribe() - topicToConsumer - } +internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) { - companion object { - private val logger = Logger(ConsumerFactory::class) - } + private val consumerProvider = { Consumer() } + + fun createConsumersFor(topics: Set<String>) = + ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider) + .mapValues { it.value as Consumer } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 7f4e62bb..25178594 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer @@ -43,7 +43,7 @@ fun main(args: Array<String>): Unit = private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess { logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" } - val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val consumerFactory = DcaeAppConsumerFactory(config.kafkaBootstrapServers) val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index e3e61c81..4ebfb469 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -46,7 +46,7 @@ import kotlin.test.assertFailsWith * @since August 2018 */ internal class DcaeAppSimulatorTest : Spek({ - lateinit var consumerFactory: ConsumerFactory + lateinit var consumerFactory: DcaeAppConsumerFactory lateinit var messageStreamValidation: MessageStreamValidation lateinit var perf3gpp_consumer: Consumer lateinit var faults_consumer: Consumer @@ -59,7 +59,7 @@ internal class DcaeAppSimulatorTest : Spek({ faults_consumer = mock() cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf( + whenever(consumerFactory.createConsumersFor(anySet())).thenReturn(mapOf( PERF3GPP_TOPIC to perf3gpp_consumer, FAULTS_TOPICS to faults_consumer)) } @@ -81,12 +81,12 @@ internal class DcaeAppSimulatorTest : Spek({ it("should subscribe to given topics") { cut.listenToTopics(TWO_TOPICS) - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } it("should subscribe to given topics when called with comma separated list") { cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS") - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } } diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index 1e20d5b1..ef09c063 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -65,21 +65,21 @@ <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-commandline</artifactId> <version>${project.parent.version}</version> + <scope>compile</scope> </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-test-utils</artifactId> + <artifactId>hv-collector-kafka</artifactId> <version>${project.parent.version}</version> - <scope>test</scope> + <scope>compile</scope> </dependency> <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-stdlib-jdk8</artifactId> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-prometheus</artifactId> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <optional>true</optional> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> @@ -91,6 +91,26 @@ <scope>runtime</scope> </dependency> <dependency> + <groupId>com.nhaarman.mockitokotlin2</groupId> + <artifactId>mockito-kotlin</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index cbdb45dc..64a7fb3e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -19,4 +19,6 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics -internal interface Metrics
\ No newline at end of file +internal interface Metrics { + fun notifyOffsetChanged(size: Long) +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index adb1ff1f..f137d074 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -26,6 +26,9 @@ import reactor.core.publisher.Mono internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { + override fun notifyOffsetChanged(size: Long) { + // TODO implementation here + } fun lastStatus(): Mono<String> = Mono.fromCallable { registry.scrape() diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt new file mode 100644 index 00000000..2c6707f9 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + + +internal class OffsetConsumer(private val metrics: Metrics): KafkaConsumer { + + override fun update(record: ConsumerRecord<ByteArray, ByteArray>) { + val offset = record.offset() + logger.trace { "Current consumer offset $offset" } + metrics.notifyOffsetChanged(offset) + } + + override fun reset() = Unit + + companion object { + private val logger = Logger(OffsetConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..da0f7f4b --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2019 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> +<configuration> + <property name="COMPONENT_NAME" + value="hv-ves-kafka-consumer-app"/> + <property name="COMPONENT_SHORT_NAME" + value="kafka-consumer-app"/> + + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt new file mode 100644 index 00000000..6fb42d81 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics + +object OffsetConsumerTest : Spek({ + describe("OffsetConsumer with metrics") { + val mockedMetrics = mock<Metrics>() + val offsetConsumer = OffsetConsumer(mockedMetrics) + + on("new update method call") { + val consumerRecord = mock<ConsumerRecord<ByteArray, ByteArray>>() + whenever(consumerRecord.offset()).thenReturn(1) + + offsetConsumer.update(consumerRecord) + + it("should notify message offset metric") { + verify(mockedMetrics).notifyOffsetChanged(1) + } + } + } +}) diff --git a/sources/hv-collector-kafka/pom.xml b/sources/hv-collector-kafka/pom.xml new file mode 100644 index 00000000..52105a0e --- /dev/null +++ b/sources/hv-collector-kafka/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.2.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + + <artifactId>hv-collector-kafka</artifactId> + + <description>VES HighVolume Collector :: Kafka</description> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + </dependency> + <dependency> + <groupId>com.nhaarman.mockitokotlin2</groupId> + <artifactId>mockito-kotlin</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> +</project> diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt new file mode 100644 index 00000000..88eb8cec --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.onap.dcae.collectors.veshv.kafka.impl.KafkaSource +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +typealias ConsumerProvider = () -> KafkaConsumer + +object ConsumerFactory { + private val logger = Logger(ConsumerFactory::class) + + fun createConsumersForTopics(kafkaBootstrapServers: String, + kafkaTopics: Set<String>, + consumerProvider: ConsumerProvider): Map<String, KafkaConsumer> = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> + val topicToConsumer = kafkaTopics.associate { it to consumerProvider() } + kafkaSource.start() + .map { + val topic = it.topic() + topicToConsumer.get(topic)?.update(it) + ?: logger.warn { "No consumer configured for topic $topic" } + }.subscribe() + topicToConsumer + } +} diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt new file mode 100644 index 00000000..ae797b6e --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt @@ -0,0 +1,27 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.apache.kafka.clients.consumer.ConsumerRecord + +interface KafkaConsumer { + fun reset() + fun update(record: ConsumerRecord<ByteArray, ByteArray>) +} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt index a108eba7..98934b0d 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +package org.onap.dcae.collectors.veshv.kafka.impl import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig @@ -36,7 +36,6 @@ import reactor.kafka.receiver.ReceiverRecord * @since May 2018 */ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> = receiver.receive() .doOnNext { it.receiverOffset().acknowledge() } @@ -58,8 +57,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { val props = mapOf<String, Any>( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", - ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt new file mode 100644 index 00000000..a8ba4217 --- /dev/null +++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import com.nhaarman.mockitokotlin2.mock +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.entry +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +object ConsumerFactoryTest : Spek({ + describe("ConsumerFactory") { + val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41" + given("consumer provider"){ + val mockedKafkaConsumer = mock<KafkaConsumer>() + val consumerProvider = { mockedKafkaConsumer } + on("creation of consumer") { + val kafkaTopics = setOf("topic1", "topic2") + val consumer = ConsumerFactory.createConsumersForTopics( + kafkaBootstrapServers, + kafkaTopics, + consumerProvider) + it("should create consumer"){ + assertThat(consumer).isNotEmpty.hasSize(2) + assertThat(consumer).contains(entry("topic1", mockedKafkaConsumer), + entry("topic2", mockedKafkaConsumer)) + } + } + on("empty kafkaTopics set"){ + val emptyKafkaTopics = emptySet<String>() + val consumer = ConsumerFactory.createConsumersForTopics( + kafkaBootstrapServers, + emptyKafkaTopics, + consumerProvider) + + it("should not create consumer"){ + assertThat(consumer).isEmpty() + } + } + } + + + } +})
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt index de74f628..43650f34 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt +++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +package org.onap.dcae.collectors.veshv.kafka.impl import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -45,8 +45,8 @@ internal class KafkaSourceTest : Spek({ } verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) - verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") - verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") + verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-consumer") + verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-consumers") verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") diff --git a/sources/pom.xml b/sources/pom.xml index c7ba4886..68ccbf1a 100644 --- a/sources/pom.xml +++ b/sources/pom.xml @@ -142,6 +142,7 @@ <module>hv-collector-dcae-app-simulator</module> <module>hv-collector-domain</module> <module>hv-collector-health-check</module> + <module>hv-collector-kafka</module> <module>hv-collector-kafka-consumer</module> <module>hv-collector-main</module> <module>hv-collector-server</module> |