From 728ca6351b290a15cb7fec07f56e0752f7addd70 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Tue, 24 Sep 2019 09:46:24 +0200 Subject: Add partition offset metric to each topic partition Before this commit offset consumer was able to fetch offset just from one partition. This commit solve this. Change-Id: I2c2c300219e43ab422b237094ad775ca8795169e Issue-ID: DCAEGEN2-1783 Signed-off-by: kjaniak --- .../kafkaconsumer/api/MetricsKafkaConsumer.kt | 2 +- .../kafkaconsumer/impl/OffsetKafkaConsumer.kt | 26 ++++++++++++---------- .../kafkaconsumer/metrics/MicrometerMetrics.kt | 9 ++++---- 3 files changed, 20 insertions(+), 17 deletions(-) (limited to 'sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae') diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt index 6dddd0f8..acbcbddf 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt @@ -29,4 +29,4 @@ interface MetricsKafkaConsumer { private const val defaultUpdateInterval = 500L private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L) } -} \ No newline at end of file +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index 18de6fcc..52bcf1e4 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer, private val metrics: Metrics, private val dispatcher: CoroutineDispatcher = Dispatchers.IO) - : MetricsKafkaConsumer{ + : MetricsKafkaConsumer { - override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job = GlobalScope.launch(dispatcher) { - kafkaConsumer.assign(topics.map { TopicPartition(it, 0) }) - while (isActive) { - val topicPartitions = kafkaConsumer.assignment() + val topicPartitions = topics.flatMap { + listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2)) + } + kafkaConsumer.assign(topicPartitions) - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - update(topicPartition, offset) - } - kafkaConsumer.commitSync() - delay(updateInterval) - } + while (isActive) { + kafkaConsumer.endOffsets(kafkaConsumer.assignment()) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } } private fun update(topicPartition: TopicPartition, offset: Long) { diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index 0af2cb22..da6a4676 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -35,8 +35,10 @@ internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - private val currentOffsetByTopicPartition = { topic: String -> - registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0)) + private val currentOffsetByTopicPartition = { topicPartition: String -> + registry.gauge(name(OFFSET, PARTITION, topicPartition.toLowerCase()), + listOf(Tag.of(PARTITION, topicPartition)), + AtomicLong(0)) }.memoize() private val travelTime = Timer.builder(name(TRAVEL,TIME)) @@ -58,9 +60,8 @@ internal class MicrometerMetrics constructor( companion object { val INSTANCE by lazy { MicrometerMetrics() } - private const val CONSUMER = "consumer" private const val OFFSET = "offset" - private const val TOPIC = "topic" + private const val PARTITION = "partition" private const val TRAVEL = "travel" private const val TIME = "time" private const val PREFIX = "hv-kafka-consumer" -- cgit 1.2.3-korg