diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-06-25 14:39:07 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-06-26 11:53:20 +0200 |
commit | fb613396cec50bae2ae59b6bd7280b903149f8b7 (patch) | |
tree | 7d1293654a0d65c44b65c9d4b96ba872e1fa0938 /sources/hv-collector-kafka-consumer/src/main | |
parent | 006be7f70368ce91986037ae7a032ba00836c6c2 (diff) |
Implement kafka consumer metrics
- bump Micrometer version 1.0.8 -> 1.1.5
Change-Id: I7a00fbf252df0f31f12f8743e8719701bd282ce2
Issue-ID: DCAEGEN2-1626
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main')
2 files changed, 24 insertions, 5 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index 64a7fb3e..2fabf30e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -20,5 +20,6 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics internal interface Metrics { - fun notifyOffsetChanged(size: Long) -} + fun notifyOffsetChanged(offset: Long) + fun notifyMessageTravelTime(messageSentTimeMicros: Long) +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index f137d074..748e43fc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -19,22 +19,40 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics +import io.micrometer.core.instrument.Timer import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.utils.TimeUtils import reactor.core.publisher.Mono +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - override fun notifyOffsetChanged(size: Long) { - // TODO implementation here - } + + private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) + private val travelTime = Timer.builder(name("travel.time")) + .publishPercentileHistogram(true) + .register(registry) fun lastStatus(): Mono<String> = Mono.fromCallable { registry.scrape() } + override fun notifyOffsetChanged(offset: Long) { + currentOffset.lazySet(offset) + } + + override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { + travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now())) + } + companion object { val INSTANCE by lazy { MicrometerMetrics() } + + private const val PREFIX = "hv-kafka-consumer" + private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } |