From 66254ca1c5df0d7971764799088c8f20c79f4ca7 Mon Sep 17 00:00:00 2001 From: Remigiusz Janeczek Date: Thu, 23 Apr 2020 08:02:47 +0200 Subject: Add metric for total latency without routing Add metric for time between Producer and HV-VES output without sending to Kafka Refactor metrics test Add new latencies and individual cores usage to Grafana Issue-ID: DCAEGEN2-1576 Signed-off-by: Remigiusz Janeczek Change-Id: I6112db76be1c7108c18336b50f9f12d5ce62c24a --- .../veshv/main/metrics/MicrometerMetrics.kt | 8 +- .../collectors/veshv/main/MicrometerMetricsTest.kt | 98 ++++++++++++++-------- 2 files changed, 72 insertions(+), 34 deletions(-) (limited to 'sources/hv-collector-main') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index e0d99fc6..a949803f 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -66,6 +66,10 @@ class MicrometerMetrics internal constructor( .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) .register(registry) + private val totalLatencyWithoutRouting = Timer.builder(name(MESSAGES, LATENCY, WITHOUT, ROUTING)) + .maximumExpectedValue(MAX_BUCKET_DURATION) + .publishPercentileHistogram(true) + .register(registry) private val totalLatency = Timer.builder(name(MESSAGES, LATENCY)) .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) @@ -104,7 +108,9 @@ class MicrometerMetrics internal constructor( } override fun notifyMessageReadyForRouting(msg: VesMessage) { - processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now())) + val now = Instant.now() + processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, now)) + totalLatencyWithoutRouting.record(Duration.between(epochMicroToInstant(msg.header.lastEpochMicrosec), now)) } override fun notifyMessageReceived(msg: WireFrameMessage) { diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index efd353ec..dd206d08 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -146,8 +146,66 @@ object MicrometerMetricsTest : Spek({ "$PREFIX.messages.received.payload.bytes" ) } + + on("$PREFIX.messages.to.collector.travel.time") { + val counterName = "$PREFIX.messages.to.collector.travel.time" + val toCollectorTravelTimeMs = 100L + + it("should update timer") { + val now = Instant.now() + val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs)) + cut.notifyMessageReceived(vesMessage) + + registry.verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble()) + } + + verifyCountersAndTimersAreUnchangedBut(counterName) + } + } } + describe("notifyMessageReadyForRouting"){ + on("$PREFIX.messages.processing.time.without.routing") { + val counterName = "$PREFIX.messages.processing.time.without.routing" + val processingTimeMs = 100L + + it("should update timer") { + + cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs))) + + registry.verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.latency.without.routing" + ) + } + } + + on("$PREFIX.messages.latency.without.routing") { + val counterName = "$PREFIX.messages.latency.without.routing" + val latencyWithoutRoutingMs = 200L + + it("should update timer") { + + val sentAt = Instant.now().minusMillis(latencyWithoutRoutingMs) + + cut.notifyMessageReadyForRouting(vesMessageSentAt(sentAt)) + + registry.verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(latencyWithoutRoutingMs.toDouble()) + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.processing.time.without.routing" + ) + } + } + } + + describe("notifyMessageSent") { val topicName1 = "PERF3GPP" val topicName2 = "CALLTRACE" @@ -206,39 +264,6 @@ object MicrometerMetricsTest : Spek({ } } - on("$PREFIX.messages.to.collector.travel.time") { - val counterName = "$PREFIX.messages.to.collector.travel.time" - val toCollectorTravelTimeMs = 100L - - it("should update timer") { - val now = Instant.now() - val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs)) - cut.notifyMessageReceived(vesMessage) - - registry.verifyTimer(counterName) { timer -> - assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble()) - } - - verifyCountersAndTimersAreUnchangedBut(counterName) - } - } - - on("$PREFIX.messages.processing.time.without.routing") { - val counterName = "$PREFIX.messages.processing.time.without.routing" - val processingTimeMs = 100L - - it("should update timer") { - - cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs))) - - registry.verifyTimer(counterName) { timer -> - assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) - } - - verifyCountersAndTimersAreUnchangedBut(counterName) - } - } - on("$PREFIX.messages.latency") { val counterName = "$PREFIX.messages.latency" val latencyMs = 1666L @@ -398,6 +423,13 @@ object MicrometerMetricsTest : Spek({ } }) +private fun vesMessageSentAt(sentAt: Instant): VesMessage { + val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 + val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec) + return VesMessage(commonHeader, + wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))) +} + private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage { val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec) -- cgit 1.2.3-korg