diff options
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
5 files changed, 116 insertions, 59 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index ef09c063..c7645edf 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -74,6 +74,13 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> @@ -91,30 +98,6 @@ <scope>runtime</scope> </dependency> <dependency> - <groupId>com.nhaarman.mockitokotlin2</groupId> - <artifactId>mockito-kotlin</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>io.micrometer</groupId> - <artifactId>micrometer-registry-prometheus</artifactId> - </dependency> - <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index 64a7fb3e..2fabf30e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -20,5 +20,6 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics internal interface Metrics { - fun notifyOffsetChanged(size: Long) -} + fun notifyOffsetChanged(offset: Long) + fun notifyMessageTravelTime(messageSentTimeMicros: Long) +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index f137d074..748e43fc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -19,22 +19,40 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics +import io.micrometer.core.instrument.Timer import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.utils.TimeUtils import reactor.core.publisher.Mono +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - override fun notifyOffsetChanged(size: Long) { - // TODO implementation here - } + + private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) + private val travelTime = Timer.builder(name("travel.time")) + .publishPercentileHistogram(true) + .register(registry) fun lastStatus(): Mono<String> = Mono.fromCallable { registry.scrape() } + override fun notifyOffsetChanged(offset: Long) { + currentOffset.lazySet(offset) + } + + override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { + travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now())) + } + companion object { val INSTANCE by lazy { MicrometerMetrics() } + + private const val PREFIX = "hv-kafka-consumer" + private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt deleted file mode 100644 index b7ea126f..00000000 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer - -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import kotlin.test.assertTrue - -object SampleTest : Spek({ - describe("sample test") { - assertTrue(true) - } -}) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt new file mode 100644 index 00000000..41587867 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer +import java.time.Instant +import java.util.concurrent.TimeUnit + +object MicrometerMetricsTest : Spek({ + val PREFIX = "hv-kafka-consumer" + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: PrometheusMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) + cut = MicrometerMetrics(registry) + } + + describe("Timers") { + val arbitraryMessageTravelTime = 100L + val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000 + val timerName = "$PREFIX.travel.time" + + on("notifyMessageTravelTime") { + it("should update timer $timerName") { + + val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000 + cut.notifyMessageTravelTime(messageSentTimeMicros) + val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000 + + registry.verifyTimer(timerName) { timer -> + val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble() + val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble() + assertThat(timer.totalTime(TimeUnit.MICROSECONDS)) + .isLessThanOrEqualTo(travelTimeAfterNotify) + .isGreaterThanOrEqualTo(travelTimeBeforeNotify) + + } + } + } + } + + describe("Gauges") { + val gaugeName = "$PREFIX.consumer.offset" + + on("notifyOffsetChanged") { + val offset = 966L + + it("should update $gaugeName") { + cut.notifyOffsetChanged(offset) + + registry.verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) + } + } + } + } +}) |