summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt4
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt22
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt7
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt6
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt2
5 files changed, 26 insertions, 15 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
index e576a88f..55fae457 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -19,7 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+import org.apache.kafka.common.TopicPartition
+
interface Metrics {
- fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0)
+ fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition)
fun notifyMessageTravelTime(messageSentTimeMicros: Long)
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index da1225e9..0af2cb22 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -19,9 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Timer
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.apache.kafka.common.TopicPartition
import org.onap.dcae.collectors.veshv.utils.TimeUtils
import reactor.core.publisher.Mono
import java.time.Duration
@@ -32,8 +35,11 @@ internal class MicrometerMetrics constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
- private val travelTime = Timer.builder(name("travel.time"))
+ private val currentOffsetByTopicPartition = { topic: String ->
+ registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
+ }.memoize<String, AtomicLong>()
+
+ private val travelTime = Timer.builder(name(TRAVEL,TIME))
.publishPercentileHistogram(true)
.register(registry)
@@ -41,9 +47,8 @@ internal class MicrometerMetrics constructor(
registry.scrape()
}
- override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) {
- // TODO use topic and partition
- currentOffset.lazySet(offset)
+ override fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition) {
+ currentOffsetByTopicPartition(topicPartition.toString()).set(offset)
}
override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
@@ -51,8 +56,13 @@ internal class MicrometerMetrics constructor(
}
companion object {
- val INSTANCE by lazy { MicrometerMetrics() }
+ val INSTANCE by lazy { MicrometerMetrics() }
+ private const val CONSUMER = "consumer"
+ private const val OFFSET = "offset"
+ private const val TOPIC = "topic"
+ private const val TRAVEL = "travel"
+ private const val TIME = "time"
private const val PREFIX = "hv-kafka-consumer"
private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
index 1481a224..57a5f33f 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
@@ -28,14 +28,11 @@ internal class OffsetConsumer(private val metrics: Metrics) {
fun update(topicPartition: TopicPartition, offset: Long) {
logger.trace {
- "Current consumer offset $offset for topic ${topicPartition.topic()} " +
- "on partition ${topicPartition.partition()}"
+ "Current consumer offset $offset for topic partition $topicPartition"
}
- metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition())
+ metrics.notifyOffsetChanged(offset, topicPartition)
}
- fun reset() = Unit
-
companion object {
val logger = Logger(OffsetConsumer::class)
}
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
index 96ba588f..93a39ae8 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.apache.kafka.common.TopicPartition
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.data.Percentage
import org.jetbrains.spek.api.Spek
@@ -68,13 +69,14 @@ object MicrometerMetricsTest : Spek({
}
describe("Gauges") {
- val gaugeName = "$PREFIX.consumer.offset"
+ val gaugeName = "$PREFIX.consumer.offset.topic"
on("notifyOffsetChanged") {
val offset = 966L
+ val topicPartition = TopicPartition("sample_topic", 1)
it("should update $gaugeName") {
- cut.notifyOffsetChanged(offset, "sample_topic", 1)
+ cut.notifyOffsetChanged(offset, topicPartition)
registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
index 242f27be..5ccb483a 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
@@ -40,7 +40,7 @@ object OffsetConsumerTest : Spek({
offsetConsumer.update(topicPartition, newOffset)
it("should notify message newOffset metric") {
- verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber)
+ verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
}
}
}