diff options
5 files changed, 26 insertions, 15 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index e576a88f..55fae457 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -19,7 +19,9 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics +import org.apache.kafka.common.TopicPartition + interface Metrics { - fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0) + fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition) fun notifyMessageTravelTime(messageSentTimeMicros: Long) } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index da1225e9..0af2cb22 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -19,9 +19,12 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics +import arrow.syntax.function.memoize +import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.Timer import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry +import org.apache.kafka.common.TopicPartition import org.onap.dcae.collectors.veshv.utils.TimeUtils import reactor.core.publisher.Mono import java.time.Duration @@ -32,8 +35,11 @@ internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) - private val travelTime = Timer.builder(name("travel.time")) + private val currentOffsetByTopicPartition = { topic: String -> + registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0)) + }.memoize<String, AtomicLong>() + + private val travelTime = Timer.builder(name(TRAVEL,TIME)) .publishPercentileHistogram(true) .register(registry) @@ -41,9 +47,8 @@ internal class MicrometerMetrics constructor( registry.scrape() } - override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) { - // TODO use topic and partition - currentOffset.lazySet(offset) + override fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition) { + currentOffsetByTopicPartition(topicPartition.toString()).set(offset) } override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { @@ -51,8 +56,13 @@ internal class MicrometerMetrics constructor( } companion object { - val INSTANCE by lazy { MicrometerMetrics() } + val INSTANCE by lazy { MicrometerMetrics() } + private const val CONSUMER = "consumer" + private const val OFFSET = "offset" + private const val TOPIC = "topic" + private const val TRAVEL = "travel" + private const val TIME = "time" private const val PREFIX = "hv-kafka-consumer" private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt index 1481a224..57a5f33f 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -28,14 +28,11 @@ internal class OffsetConsumer(private val metrics: Metrics) { fun update(topicPartition: TopicPartition, offset: Long) { logger.trace { - "Current consumer offset $offset for topic ${topicPartition.topic()} " + - "on partition ${topicPartition.partition()}" + "Current consumer offset $offset for topic partition $topicPartition" } - metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition()) + metrics.notifyOffsetChanged(offset, topicPartition) } - fun reset() = Unit - companion object { val logger = Logger(OffsetConsumer::class) } diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt index 96ba588f..93a39ae8 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry +import org.apache.kafka.common.TopicPartition import org.assertj.core.api.Assertions.assertThat import org.assertj.core.data.Percentage import org.jetbrains.spek.api.Spek @@ -68,13 +69,14 @@ object MicrometerMetricsTest : Spek({ } describe("Gauges") { - val gaugeName = "$PREFIX.consumer.offset" + val gaugeName = "$PREFIX.consumer.offset.topic" on("notifyOffsetChanged") { val offset = 966L + val topicPartition = TopicPartition("sample_topic", 1) it("should update $gaugeName") { - cut.notifyOffsetChanged(offset, "sample_topic", 1) + cut.notifyOffsetChanged(offset, topicPartition) registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt index 242f27be..5ccb483a 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -40,7 +40,7 @@ object OffsetConsumerTest : Spek({ offsetConsumer.update(topicPartition, newOffset) it("should notify message newOffset metric") { - verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber) + verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition) } } } |