From 30488f1922f789c5b8e18934456968aa354c9671 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Mon, 17 Dec 2018 13:22:52 +0100 Subject: Metric: Message latency Defined as a difference between now and vesHeader.lastEpochTime. Change-Id: I4aa97e8efc13cb0039fde38b4fd2aa6411c7b89a Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk --- .../collectors/veshv/main/MicrometerMetricsTest.kt | 95 +++++++++++++++++----- 1 file changed, 73 insertions(+), 22 deletions(-) (limited to 'sources/hv-collector-main/src/test') diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 2ecdb26b..71fc8f7f 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig @@ -34,10 +35,10 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX -import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame @@ -47,6 +48,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSi import java.time.Instant import java.time.temporal.Temporal import java.util.concurrent.TimeUnit +import kotlin.reflect.KClass /** * @author Piotr Jaszczyk @@ -54,6 +56,7 @@ import java.util.concurrent.TimeUnit */ object MicrometerMetricsTest : Spek({ val doublePrecision = Percentage.withPercentage(0.5) + val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time") lateinit var registry: PrometheusMeterRegistry lateinit var cut: MicrometerMetrics @@ -84,15 +87,25 @@ object MicrometerMetricsTest : Spek({ fun verifyCounter(name: String, verifier: (Counter) -> T) = verifyCounter(registrySearch(name), verifier) - fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { - registry.meters - .filter { it.id.name.startsWith(PREFIX) } - .filter { it is Counter } - .map { it as Counter } - .filterNot { it.id.name in changedCounters } - .forEach { - assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) - } + fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) { + fun verifyAllMetersAreUnchangedBut( + clazz: KClass, + changedCounters: Collection, + valueOf: (T) -> Double) { + registry.meters + .filter { it.id.name.startsWith(PREFIX) } + .filter { clazz.isInstance(it) } + .map { it as T } + .filterNot { it.id.name in changedCounters } + .forEach { + assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) + } + } + + setOf(*changedMeters).let { changedMetersCollection -> + verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() } + verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() } + } } describe("notifyBytesReceived") { @@ -111,7 +124,7 @@ object MicrometerMetricsTest : Spek({ it("should leave all other counters unchanged") { cut.notifyBytesReceived(128) - verifyAllCountersAreUnchangedBut(counterName) + verifyCountersAndTimersAreUnchangedBut(counterName) } } } @@ -144,7 +157,7 @@ object MicrometerMetricsTest : Spek({ it("should leave all other counters unchanged") { cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128)) - verifyAllCountersAreUnchangedBut( + verifyCountersAndTimersAreUnchangedBut( "$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes" ) @@ -164,7 +177,11 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count") + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic.count", + "$PREFIX.messages.processing.time", + "$PREFIX.messages.latency.time") } } @@ -191,17 +208,41 @@ object MicrometerMetricsTest : Spek({ it("should update timer") { - cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs))) + cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs))) verifyTimer(counterName) { timer -> assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) } - verifyAllCountersAreUnchangedBut( + verifyCountersAndTimersAreUnchangedBut( counterName, "$PREFIX.messages.sent.topic.count", - "$PREFIX.messages.sent.count") + "$PREFIX.messages.sent.count", + "$PREFIX.messages.latency.time") } } + + on("$PREFIX.messages.latency.time") { + val counterName = "$PREFIX.messages.latency.time" + val latencyMs = 1666L + + it("should update timer") { + + cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs))) + + verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)) + .isGreaterThanOrEqualTo(latencyMs.toDouble()) + .isLessThanOrEqualTo(latencyMs + 10000.0) + + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic.count", + "$PREFIX.messages.sent.count", + "$PREFIX.messages.processing.time") + } + } + } describe("notifyMessageDropped") { @@ -215,7 +256,7 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count") + verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count") } } @@ -280,7 +321,7 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count") + verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count") } } @@ -304,13 +345,23 @@ object MicrometerMetricsTest : Spek({ }) fun routedMessage(topic: String, partition: Int = 0) = - vesEvent().let {evt -> + vesEvent().let { evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) } -fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) = - vesEvent().let {evt -> +fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = + vesEvent().let { evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) } + +fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = + vesEvent().let { evt -> + val builder = evt.toBuilder() + builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 + builder.build() + }.let { evt -> + RoutedMessage(topic, partition, + VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) + } -- cgit 1.2.3-korg