aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2020-04-21 12:44:53 +0200
committerKornel Janiak <kornel.janiak@nokia.com>2020-04-22 11:19:54 +0000
commitda498bfc1f006a17f1d8174b10bc33acbd4b2fa0 (patch)
tree3711f4b175ce046feceaab482a9d786a2f152a55
parentc61dcc75290d24ec0f0188b32e0ab0a7f15ea420 (diff)
Add of message travel time metric
Message travel time: Producer -> HV-VES input introduced. Tests for new metric added. Change-Id: I36347ff53abb3f274e4358af26db49fe8bac95ed Issue-ID: DCAEGEN2-1576 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt9
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt19
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt39
5 files changed, 57 insertions, 15 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 41993e62..3fe5fd5c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -43,6 +43,7 @@ interface SinkFactory : Closeable {
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
+ fun notifyMessageReceived(msg: VesMessage)
fun notifyMessageReadyForRouting(msg: VesMessage)
fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index f0d1465b..98b3ce9c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -82,7 +82,7 @@ internal class HvVesCollector(
.filterFailedWithLog(logger, clientContext::fullMdc,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
- }
+ }.doOnNext(metrics::notifyMessageReceived)
private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
.filterFailedWithLog {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index 3b01d137..12555965 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
import java.time.Duration
import java.time.Instant
import kotlin.test.fail
@@ -40,6 +41,7 @@ class FakeMetrics : Metrics {
var messagesDroppedCount: Int = 0; private set
var lastProcessingTimeMicros: Double = -1.0; private set
var lastProcessingTimeWithoutRoutingMicros: Double = -1.0; private set
+ var lastToCollectorTravelTime: Double = -1.0; private set
var messagesSentCount: Int = 0; private set
var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set
@@ -54,6 +56,11 @@ class FakeMetrics : Metrics {
messageBytesReceived += msg.payloadSize
}
+ override fun notifyMessageReceived(msg: VesMessage) {
+ lastToCollectorTravelTime = Duration.between(TimeUtils.epochMicroToInstant(msg.header.lastEpochMicrosec),
+ Instant.now()).toNanos() / 1000.0
+ }
+
override fun notifyMessageReadyForRouting(msg: VesMessage) {
lastProcessingTimeWithoutRoutingMicros = Duration.between(msg.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 2f3470a4..e0d99fc6 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,11 +30,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
import java.time.Duration
import java.time.Instant
@@ -54,6 +54,10 @@ class MicrometerMetrics internal constructor(
private val totalConnections = registry.counter(name(CONNECTIONS))
private val disconnections = registry.counter(name(DISCONNECTIONS))
+ private val travelTimeToCollector = Timer.builder(name(MESSAGES, TO, COLLECTOR, TRAVEL, TIME))
+ .maximumExpectedValue(MAX_BUCKET_DURATION)
+ .publishPercentileHistogram(true)
+ .register(registry)
private val processingTime = Timer.builder(name(MESSAGES, PROCESSING, TIME))
.maximumExpectedValue(MAX_BUCKET_DURATION)
.publishPercentileHistogram(true)
@@ -108,6 +112,12 @@ class MicrometerMetrics internal constructor(
receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble())
}
+ override fun notifyMessageReceived(msg: VesMessage) {
+ travelTimeToCollector.record(
+ Duration.between(epochMicroToInstant(msg.header.lastEpochMicrosec), msg.wtpFrame.receivedAt)
+ )
+ }
+
override fun notifyMessageSent(msg: RoutedMessage) {
val now = Instant.now()
sentMessages.increment()
@@ -157,6 +167,9 @@ class MicrometerMetrics internal constructor(
internal const val PAYLOAD = "payload"
internal const val WITHOUT = "without"
internal const val ROUTING = "routing"
+ internal const val TRAVEL = "travel"
+ internal const val TO = "to"
+ internal const val COLLECTOR = "collector"
internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L)
internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index a3471d46..efd353ec 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,15 +33,15 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
@@ -206,6 +206,23 @@ object MicrometerMetricsTest : Spek({
}
}
+ on("$PREFIX.messages.to.collector.travel.time") {
+ val counterName = "$PREFIX.messages.to.collector.travel.time"
+ val toCollectorTravelTimeMs = 100L
+
+ it("should update timer") {
+ val now = Instant.now()
+ val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
+ cut.notifyMessageReceived(vesMessage)
+
+ registry.verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
+ }
+
+ verifyCountersAndTimersAreUnchangedBut(counterName)
+ }
+ }
+
on("$PREFIX.messages.processing.time.without.routing") {
val counterName = "$PREFIX.messages.processing.time.without.routing"
val processingTimeMs = 100L
@@ -217,11 +234,8 @@ object MicrometerMetricsTest : Spek({
registry.verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
}
- verifyCountersAndTimersAreUnchangedBut(
- counterName,
- "$PREFIX.messages.sent.topic",
- "$PREFIX.messages.sent",
- "$PREFIX.messages.latency")
+
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
}
@@ -384,6 +398,13 @@ object MicrometerMetricsTest : Spek({
}
})
+private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
+ val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+ val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
+ return VesMessage(commonHeader,
+ wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
+}
+
private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
val commonHeader = commonHeader(domain)
return VesMessage(commonHeader,