aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer/src/main
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-09-24 09:46:24 +0200
committerkjaniak <kornel.janiak@nokia.com>2019-09-26 14:34:14 +0200
commit728ca6351b290a15cb7fec07f56e0752f7addd70 (patch)
tree62314472bf5e5180f671e8f8a45239d4bf46c674 /sources/hv-collector-kafka-consumer/src/main
parent6d815b760e927bace06451cf72972a505a080b6c (diff)
Add partition offset metric to each topic partition
Before this commit offset consumer was able to fetch offset just from one partition. This commit solve this. Change-Id: I2c2c300219e43ab422b237094ad775ca8795169e Issue-ID: DCAEGEN2-1783 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt2
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt26
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt9
3 files changed, 20 insertions, 17 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
index 6dddd0f8..acbcbddf 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
@@ -29,4 +29,4 @@ interface MetricsKafkaConsumer {
private const val defaultUpdateInterval = 500L
private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L)
}
-} \ No newline at end of file
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index 18de6fcc..52bcf1e4 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
private val topics: Set<String>,
private val metrics: Metrics,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
- : MetricsKafkaConsumer{
+ : MetricsKafkaConsumer {
- override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
GlobalScope.launch(dispatcher) {
- kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
- while (isActive) {
- val topicPartitions = kafkaConsumer.assignment()
+ val topicPartitions = topics.flatMap {
+ listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
+ }
+ kafkaConsumer.assign(topicPartitions)
- kafkaConsumer.endOffsets(topicPartitions)
- .forEach { (topicPartition, offset) ->
- update(topicPartition, offset)
- }
- kafkaConsumer.commitSync()
- delay(updateInterval)
- }
+ while (isActive) {
+ kafkaConsumer.endOffsets(kafkaConsumer.assignment())
+ .forEach { (topicPartition, offset) ->
+ update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
}
private fun update(topicPartition: TopicPartition, offset: Long) {
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index 0af2cb22..da6a4676 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -35,8 +35,10 @@ internal class MicrometerMetrics constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- private val currentOffsetByTopicPartition = { topic: String ->
- registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
+ private val currentOffsetByTopicPartition = { topicPartition: String ->
+ registry.gauge(name(OFFSET, PARTITION, topicPartition.toLowerCase()),
+ listOf(Tag.of(PARTITION, topicPartition)),
+ AtomicLong(0))
}.memoize<String, AtomicLong>()
private val travelTime = Timer.builder(name(TRAVEL,TIME))
@@ -58,9 +60,8 @@ internal class MicrometerMetrics constructor(
companion object {
val INSTANCE by lazy { MicrometerMetrics() }
- private const val CONSUMER = "consumer"
private const val OFFSET = "offset"
- private const val TOPIC = "topic"
+ private const val PARTITION = "partition"
private const val TRAVEL = "travel"
private const val TIME = "time"
private const val PREFIX = "hv-kafka-consumer"