From a313a7fb4ba81e960397ec432ce02c28804a44b4 Mon Sep 17 00:00:00 2001 From: Izabela Zawadzka Date: Wed, 10 Jul 2019 09:29:51 +0200 Subject: Use consumers in main It includes --disable-processing flag. Also fixed some issues with script for local performance test. Also added KafkaConsumer::poll in OffsetKafka Consumer - without it KafkaConsumer::assignment returns empty set Signed-off-by: Izabela Zawadzka Issue-ID: DCAEGEN2-1657 Change-Id: I95fadb45f321398346094dfa0c4a6e9da954c186 --- .../kafkaconsumer/api/MetricsKafkaConsumer.kt | 32 ++++++++++++++++++++++ .../kafkaconsumer/impl/OffsetKafkaConsumer.kt | 14 ++++++---- .../kafkaconsumer/impl/ProcessingKafkaConsumer.kt | 9 +++--- .../dcae/collectors/veshv/kafkaconsumer/main.kt | 19 +++++++++++++ 4 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt (limited to 'sources') diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt new file mode 100644 index 00000000..6dddd0f8 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.api + +import kotlinx.coroutines.Job +import java.time.Duration + +interface MetricsKafkaConsumer { + suspend fun start(updateInterval: Long = defaultUpdateInterval, pollTimeout: Duration = defaultPollTimeoutMs): Job + + companion object{ + private const val defaultUpdateInterval = 500L + private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L) + } +} \ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index d105c4a7..18de6fcc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -28,19 +28,23 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.time.Duration internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer, private val topics: Set, private val metrics: Metrics, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) + : MetricsKafkaConsumer{ - suspend fun start(updateInterval: Long = 500L): Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = GlobalScope.launch(dispatcher) { - kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() + kafkaConsumer.assign(topics.map { TopicPartition(it, 0) }) while (isActive) { + val topicPartitions = kafkaConsumer.assignment() + kafkaConsumer.endOffsets(topicPartitions) .forEach { (topicPartition, offset) -> update(topicPartition, offset) @@ -58,6 +62,6 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer, private val topics: Set, private val metrics: Metrics, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) + : MetricsKafkaConsumer{ - suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = GlobalScope.launch(dispatcher){ kafkaConsumer.subscribe(topics) while (isActive){ - kafkaConsumer.poll(timeout).forEach(::update) + kafkaConsumer.poll(pollTimeout).forEach(::update) kafkaConsumer.commitSync() delay(updateInterval) } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt index 7e77bae9..9bf4310b 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt @@ -19,9 +19,14 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer import org.onap.dcae.collectors.veshv.utils.process.ExitCode @@ -37,6 +42,20 @@ fun main(args: Array): Unit = private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess { + val kafkaConsumer = KafkaConsumer(KafkaPropertiesFactory.create( + config.kafkaBootstrapServers) + ) + + runBlocking { + if (config.disableProcessing) { + OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE) + .start() + } else { + ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE) + .start() + } + } + PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE) .start().block()!!.await().block() // TODO refactor netty server logic -- cgit 1.2.3-korg