diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-06-25 14:39:07 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-06-26 11:53:20 +0200 |
commit | fb613396cec50bae2ae59b6bd7280b903149f8b7 (patch) | |
tree | 7d1293654a0d65c44b65c9d4b96ba872e1fa0938 /sources/hv-collector-kafka-consumer/src | |
parent | 006be7f70368ce91986037ae7a032ba00836c6c2 (diff) |
Implement kafka consumer metrics
- bump Micrometer version 1.0.8 -> 1.1.5
Change-Id: I7a00fbf252df0f31f12f8743e8719701bd282ce2
Issue-ID: DCAEGEN2-1626
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src')
4 files changed, 109 insertions, 35 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index 64a7fb3e..2fabf30e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -20,5 +20,6 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics internal interface Metrics { - fun notifyOffsetChanged(size: Long) -} + fun notifyOffsetChanged(offset: Long) + fun notifyMessageTravelTime(messageSentTimeMicros: Long) +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index f137d074..748e43fc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -19,22 +19,40 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics +import io.micrometer.core.instrument.Timer import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.utils.TimeUtils import reactor.core.publisher.Mono +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - override fun notifyOffsetChanged(size: Long) { - // TODO implementation here - } + + private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) + private val travelTime = Timer.builder(name("travel.time")) + .publishPercentileHistogram(true) + .register(registry) fun lastStatus(): Mono<String> = Mono.fromCallable { registry.scrape() } + override fun notifyOffsetChanged(offset: Long) { + currentOffset.lazySet(offset) + } + + override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { + travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now())) + } + companion object { val INSTANCE by lazy { MicrometerMetrics() } + + private const val PREFIX = "hv-kafka-consumer" + private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt deleted file mode 100644 index b7ea126f..00000000 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer - -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import kotlin.test.assertTrue - -object SampleTest : Spek({ - describe("sample test") { - assertTrue(true) - } -}) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt new file mode 100644 index 00000000..41587867 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer +import java.time.Instant +import java.util.concurrent.TimeUnit + +object MicrometerMetricsTest : Spek({ + val PREFIX = "hv-kafka-consumer" + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: PrometheusMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) + cut = MicrometerMetrics(registry) + } + + describe("Timers") { + val arbitraryMessageTravelTime = 100L + val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000 + val timerName = "$PREFIX.travel.time" + + on("notifyMessageTravelTime") { + it("should update timer $timerName") { + + val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000 + cut.notifyMessageTravelTime(messageSentTimeMicros) + val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000 + + registry.verifyTimer(timerName) { timer -> + val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble() + val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble() + assertThat(timer.totalTime(TimeUnit.MICROSECONDS)) + .isLessThanOrEqualTo(travelTimeAfterNotify) + .isGreaterThanOrEqualTo(travelTimeBeforeNotify) + + } + } + } + } + + describe("Gauges") { + val gaugeName = "$PREFIX.consumer.offset" + + on("notifyOffsetChanged") { + val offset = 966L + + it("should update $gaugeName") { + cut.notifyOffsetChanged(offset) + + registry.verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) + } + } + } + } +}) |