From d55f5c0c3df4b2ea136100e61424810ede749778 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 14 Dec 2018 12:05:47 +0100 Subject: Metric: Processing time Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk --- .../veshv/main/metrics/MicrometerMetrics.kt | 16 +++-- .../collectors/veshv/main/MicrometerMetricsTest.kt | 76 +++++++++++++++++----- 2 files changed, 71 insertions(+), 21 deletions(-) (limited to 'sources/hv-collector-main') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 18678ff3..259fa037 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -29,7 +29,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import java.time.Duration +import java.time.Instant /** @@ -53,6 +57,7 @@ class MicrometerMetrics internal constructor( private val droppedCount = { cause: String -> registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause) }.memoize() + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { @@ -71,14 +76,15 @@ class MicrometerMetrics internal constructor( receivedBytes.increment(size.toDouble()) } - override fun notifyMessageReceived(size: Int) { + override fun notifyMessageReceived(msg: WireFrameMessage) { receivedMsgCount.increment() - receivedMsgBytes.increment(size.toDouble()) + receivedMsgBytes.increment(msg.payloadSize.toDouble()) } - override fun notifyMessageSent(topic: String) { + override fun notifyMessageSent(msg: RoutedMessage) { sentCountTotal.increment() - sentCount(topic).increment() + sentCount(msg.topic).increment() + processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now())) } override fun notifyMessageDropped(cause: MessageDropCause) { @@ -100,7 +106,7 @@ class MicrometerMetrics internal constructor( internal const val DROPPED = "dropped" internal const val CAUSE = "cause" internal const val TOTAL = "total" - + internal const val TIME = "time" fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index e2dc2f82..cb5cfc70 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry @@ -35,6 +36,15 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.vesEvent +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize +import java.time.Instant +import java.time.temporal.Temporal +import java.util.concurrent.TimeUnit /** * @author Piotr Jaszczyk @@ -63,6 +73,9 @@ object MicrometerMetricsTest : Spek({ fun verifyGauge(name: String, verifier: (Gauge) -> T) = verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + fun verifyTimer(name: String, verifier: (Timer) -> T) = + verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier) + fun verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = verifyMeter(search, RequiredSearch::counter, verifier) @@ -71,6 +84,7 @@ object MicrometerMetricsTest : Spek({ fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { registry.meters + .filter { it.id.name.startsWith(PREFIX) } .filter { it is Counter } .map { it as Counter } .filterNot { it.id.name in changedCounters } @@ -105,7 +119,7 @@ object MicrometerMetricsTest : Spek({ val counterName = "$PREFIX.messages.received.count" it("should increment counter") { - cut.notifyMessageReceived(777) + cut.notifyMessageReceived(emptyWireProtocolFrame()) verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) @@ -118,7 +132,7 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { val bytes = 888 - cut.notifyMessageReceived(bytes) + cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes)) verifyCounter(counterName) { assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) @@ -127,7 +141,7 @@ object MicrometerMetricsTest : Spek({ } it("should leave all other counters unchanged") { - cut.notifyMessageReceived(128) + cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128)) verifyAllCountersAreUnchangedBut( "$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes" @@ -143,7 +157,7 @@ object MicrometerMetricsTest : Spek({ val counterName = "$PREFIX.messages.sent.count.total" it("should increment counter") { - cut.notifyMessageSent(topicName1) + cut.notifyMessageSent(routedMessage(topicName1)) verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) @@ -155,9 +169,9 @@ object MicrometerMetricsTest : Spek({ on("$PREFIX.messages.sent.topic.count counter") { val counterName = "$PREFIX.messages.sent.count.topic" it("should handle counters for different topics") { - cut.notifyMessageSent(topicName1) - cut.notifyMessageSent(topicName2) - cut.notifyMessageSent(topicName2) + cut.notifyMessageSent(routedMessage(topicName1)) + cut.notifyMessageSent(routedMessage(topicName2)) + cut.notifyMessageSent(routedMessage(topicName2)) verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) @@ -168,6 +182,24 @@ object MicrometerMetricsTest : Spek({ } } } + + on("$PREFIX.messages.processing.time") { + val counterName = "$PREFIX.messages.processing.time" + val processingTimeMs = 100L + + it("should update timer") { + + cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs))) + + verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) + } + verifyAllCountersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.count.topic", + "$PREFIX.messages.sent.count.total") + } + } } describe("notifyMessageDropped") { @@ -207,27 +239,27 @@ object MicrometerMetricsTest : Spek({ it("should show difference between sent and received messages") { on("positive difference") { - cut.notifyMessageReceived(128) - cut.notifyMessageReceived(256) - cut.notifyMessageReceived(256) - cut.notifyMessageSent("perf3gpp") + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageSent(routedMessage("perf3gpp")) verifyGauge("messages.processing.count") { assertThat(it.value()).isCloseTo(2.0, doublePrecision) } } on("zero difference") { - cut.notifyMessageReceived(128) - cut.notifyMessageSent("perf3gpp") + cut.notifyMessageReceived(emptyWireProtocolFrame()) + cut.notifyMessageSent(routedMessage("perf3gpp")) verifyGauge("messages.processing.count") { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } on("negative difference") { - cut.notifyMessageReceived(128) - cut.notifyMessageSent("fault") - cut.notifyMessageSent("perf3gpp") + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) + cut.notifyMessageSent(routedMessage("fault")) + cut.notifyMessageSent(routedMessage("perf3gpp")) verifyGauge("messages.processing.count") { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } @@ -236,3 +268,15 @@ object MicrometerMetricsTest : Spek({ } }) + +fun routedMessage(topic: String, partition: Int = 0) = + vesEvent().let {evt -> + RoutedMessage(topic, partition, + VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) + } + +fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) = + vesEvent().let {evt -> + RoutedMessage(topic, partition, + VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) + } \ No newline at end of file -- cgit 1.2.3-korg