aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer/src/main/kotlin
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-06-25 14:39:07 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-06-26 11:53:20 +0200
commitfb613396cec50bae2ae59b6bd7280b903149f8b7 (patch)
tree7d1293654a0d65c44b65c9d4b96ba872e1fa0938 /sources/hv-collector-kafka-consumer/src/main/kotlin
parent006be7f70368ce91986037ae7a032ba00836c6c2 (diff)
Implement kafka consumer metrics
- bump Micrometer version 1.0.8 -> 1.1.5 Change-Id: I7a00fbf252df0f31f12f8743e8719701bd282ce2 Issue-ID: DCAEGEN2-1626 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main/kotlin')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt5
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt24
2 files changed, 24 insertions, 5 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
index 64a7fb3e..2fabf30e 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -20,5 +20,6 @@
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
internal interface Metrics {
- fun notifyOffsetChanged(size: Long)
-}
+ fun notifyOffsetChanged(offset: Long)
+ fun notifyMessageTravelTime(messageSentTimeMicros: Long)
+} \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index f137d074..748e43fc 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -19,22 +19,40 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+import io.micrometer.core.instrument.Timer
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
import reactor.core.publisher.Mono
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicLong
internal class MicrometerMetrics constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- override fun notifyOffsetChanged(size: Long) {
- // TODO implementation here
- }
+
+ private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
+ private val travelTime = Timer.builder(name("travel.time"))
+ .publishPercentileHistogram(true)
+ .register(registry)
fun lastStatus(): Mono<String> = Mono.fromCallable {
registry.scrape()
}
+ override fun notifyOffsetChanged(offset: Long) {
+ currentOffset.lazySet(offset)
+ }
+
+ override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
+ travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
+ }
+
companion object {
val INSTANCE by lazy { MicrometerMetrics() }
+
+ private const val PREFIX = "hv-kafka-consumer"
+ private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}