summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-06-25 14:39:07 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-06-26 11:53:20 +0200
commitfb613396cec50bae2ae59b6bd7280b903149f8b7 (patch)
tree7d1293654a0d65c44b65c9d4b96ba872e1fa0938 /sources/hv-collector-kafka-consumer
parent006be7f70368ce91986037ae7a032ba00836c6c2 (diff)
Implement kafka consumer metrics
- bump Micrometer version 1.0.8 -> 1.1.5 Change-Id: I7a00fbf252df0f31f12f8743e8719701bd282ce2 Issue-ID: DCAEGEN2-1626 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
-rw-r--r--sources/hv-collector-kafka-consumer/pom.xml31
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt5
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt24
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt30
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt85
5 files changed, 116 insertions, 59 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml
index ef09c063..c7645edf 100644
--- a/sources/hv-collector-kafka-consumer/pom.xml
+++ b/sources/hv-collector-kafka-consumer/pom.xml
@@ -74,6 +74,13 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
@@ -91,30 +98,6 @@
<scope>runtime</scope>
</dependency>
<dependency>
- <groupId>com.nhaarman.mockitokotlin2</groupId>
- <artifactId>mockito-kotlin</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.micrometer</groupId>
- <artifactId>micrometer-registry-prometheus</artifactId>
- </dependency>
- <dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
index 64a7fb3e..2fabf30e 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -20,5 +20,6 @@
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
internal interface Metrics {
- fun notifyOffsetChanged(size: Long)
-}
+ fun notifyOffsetChanged(offset: Long)
+ fun notifyMessageTravelTime(messageSentTimeMicros: Long)
+} \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index f137d074..748e43fc 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -19,22 +19,40 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+import io.micrometer.core.instrument.Timer
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
import reactor.core.publisher.Mono
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicLong
internal class MicrometerMetrics constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- override fun notifyOffsetChanged(size: Long) {
- // TODO implementation here
- }
+
+ private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
+ private val travelTime = Timer.builder(name("travel.time"))
+ .publishPercentileHistogram(true)
+ .register(registry)
fun lastStatus(): Mono<String> = Mono.fromCallable {
registry.scrape()
}
+ override fun notifyOffsetChanged(offset: Long) {
+ currentOffset.lazySet(offset)
+ }
+
+ override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
+ travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
+ }
+
companion object {
val INSTANCE by lazy { MicrometerMetrics() }
+
+ private const val PREFIX = "hv-kafka-consumer"
+ private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt
deleted file mode 100644
index b7ea126f..00000000
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.kafkaconsumer
-
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import kotlin.test.assertTrue
-
-object SampleTest : Spek({
- describe("sample test") {
- assertTrue(true)
- }
-})
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
new file mode 100644
index 00000000..41587867
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
+import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+object MicrometerMetricsTest : Spek({
+ val PREFIX = "hv-kafka-consumer"
+ val doublePrecision = Percentage.withPercentage(0.5)
+ lateinit var registry: PrometheusMeterRegistry
+ lateinit var cut: MicrometerMetrics
+
+ beforeEachTest {
+ registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+ cut = MicrometerMetrics(registry)
+ }
+
+ describe("Timers") {
+ val arbitraryMessageTravelTime = 100L
+ val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000
+ val timerName = "$PREFIX.travel.time"
+
+ on("notifyMessageTravelTime") {
+ it("should update timer $timerName") {
+
+ val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000
+ cut.notifyMessageTravelTime(messageSentTimeMicros)
+ val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000
+
+ registry.verifyTimer(timerName) { timer ->
+ val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble()
+ val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble()
+ assertThat(timer.totalTime(TimeUnit.MICROSECONDS))
+ .isLessThanOrEqualTo(travelTimeAfterNotify)
+ .isGreaterThanOrEqualTo(travelTimeBeforeNotify)
+
+ }
+ }
+ }
+ }
+
+ describe("Gauges") {
+ val gaugeName = "$PREFIX.consumer.offset"
+
+ on("notifyOffsetChanged") {
+ val offset = 966L
+
+ it("should update $gaugeName") {
+ cut.notifyOffsetChanged(offset)
+
+ registry.verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+ }
+ }
+ }
+ }
+})