From cc4b14c46ea8796383be180bcafaea4575b96979 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Wed, 30 Oct 2019 14:19:27 +0100 Subject: Add metric for processing without routing Performance tests need better check of processing time in HV-VES. Change-Id: I0792c4ac014a7b8907ef314a3fd9981776dc0b35 Issue-ID: DCAEGEN2-1890 Signed-off-by: kjaniak --- .../veshv/main/metrics/MicrometerMetrics.kt | 13 +++++++-- .../collectors/veshv/main/MicrometerMetricsTest.kt | 34 ++++++++++++++++++++-- 2 files changed, 41 insertions(+), 6 deletions(-) (limited to 'sources/hv-collector-main') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index fa52ac2c..9d417a28 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant import java.time.Duration import java.time.Instant @@ -46,7 +47,6 @@ import java.time.Instant class MicrometerMetrics internal constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) private val receivedMessages = registry.counter(name(MESSAGES, RECEIVED)) private val receivedMessagesPayloadBytes = registry.counter(name(MESSAGES, RECEIVED, PAYLOAD, BYTES)) @@ -58,6 +58,9 @@ class MicrometerMetrics internal constructor( .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) .register(registry) + private val processingTimeWithoutRouting = Timer.builder(name(MESSAGES, PROCESSING, TIME, WITHOUT, ROUTING)) + .publishPercentileHistogram(true) + .register(registry) private val totalLatency = Timer.builder(name(MESSAGES, LATENCY)) .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) @@ -67,12 +70,10 @@ class MicrometerMetrics internal constructor( private val sentMessagesByTopic = { topic: String -> registry.counter(name(MESSAGES, SENT, TOPIC), TOPIC, topic) }.memoize() - private val droppedMessages = registry.counter(name(MESSAGES, DROPPED)) private val messagesDroppedByCause = { cause: String -> registry.counter(name(MESSAGES, DROPPED, CAUSE), CAUSE, cause) }.memoize() - private val clientsRejected = registry.counter(name(CLIENTS, REJECTED)) private val clientsRejectedByCause = { cause: String -> registry.counter(name(CLIENTS, REJECTED, CAUSE), CAUSE, cause) @@ -97,6 +98,10 @@ class MicrometerMetrics internal constructor( receivedBytes.increment(size.toDouble()) } + override fun notifyMessageReadyForRouting(msg: VesMessage) { + processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now())) + } + override fun notifyMessageReceived(msg: WireFrameMessage) { receivedMessages.increment() receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble()) @@ -150,6 +155,8 @@ class MicrometerMetrics internal constructor( internal const val LATENCY = "latency" internal const val PAYLOAD = "payload" internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L) + internal const val WITHOUT = "without" + internal const val ROUTING = "routing" internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 66f3a5fc..a3471d46 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Option +import com.google.protobuf.ByteString import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Tags @@ -39,7 +40,9 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge @@ -203,6 +206,25 @@ object MicrometerMetricsTest : Spek({ } } + on("$PREFIX.messages.processing.time.without.routing") { + val counterName = "$PREFIX.messages.processing.time.without.routing" + val processingTimeMs = 100L + + it("should update timer") { + + cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs))) + + registry.verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic", + "$PREFIX.messages.sent", + "$PREFIX.messages.latency") + } + } + on("$PREFIX.messages.latency") { val counterName = "$PREFIX.messages.latency" val latencyMs = 1666L @@ -362,13 +384,19 @@ object MicrometerMetricsTest : Spek({ } }) -fun routedMessage(topic: String, partition: Int = 0) = +private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage { + val commonHeader = commonHeader(domain) + return VesMessage(commonHeader, + wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt)) +} + +private fun routedMessage(topic: String, partition: Int = 0) = vesEvent().run { toRoutedMessage(topic, partition) } -fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = +private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } -fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = +private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = vesEvent().run { val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 -- cgit 1.2.3-korg