summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt8
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt98
2 files changed, 72 insertions, 34 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index e0d99fc6..a949803f 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -66,6 +66,10 @@ class MicrometerMetrics internal constructor(
.maximumExpectedValue(MAX_BUCKET_DURATION)
.publishPercentileHistogram(true)
.register(registry)
+ private val totalLatencyWithoutRouting = Timer.builder(name(MESSAGES, LATENCY, WITHOUT, ROUTING))
+ .maximumExpectedValue(MAX_BUCKET_DURATION)
+ .publishPercentileHistogram(true)
+ .register(registry)
private val totalLatency = Timer.builder(name(MESSAGES, LATENCY))
.maximumExpectedValue(MAX_BUCKET_DURATION)
.publishPercentileHistogram(true)
@@ -104,7 +108,9 @@ class MicrometerMetrics internal constructor(
}
override fun notifyMessageReadyForRouting(msg: VesMessage) {
- processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now()))
+ val now = Instant.now()
+ processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, now))
+ totalLatencyWithoutRouting.record(Duration.between(epochMicroToInstant(msg.header.lastEpochMicrosec), now))
}
override fun notifyMessageReceived(msg: WireFrameMessage) {
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index efd353ec..dd206d08 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -146,8 +146,66 @@ object MicrometerMetricsTest : Spek({
"$PREFIX.messages.received.payload.bytes"
)
}
+
+ on("$PREFIX.messages.to.collector.travel.time") {
+ val counterName = "$PREFIX.messages.to.collector.travel.time"
+ val toCollectorTravelTimeMs = 100L
+
+ it("should update timer") {
+ val now = Instant.now()
+ val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
+ cut.notifyMessageReceived(vesMessage)
+
+ registry.verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
+ }
+
+ verifyCountersAndTimersAreUnchangedBut(counterName)
+ }
+ }
}
+ describe("notifyMessageReadyForRouting"){
+ on("$PREFIX.messages.processing.time.without.routing") {
+ val counterName = "$PREFIX.messages.processing.time.without.routing"
+ val processingTimeMs = 100L
+
+ it("should update timer") {
+
+ cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
+
+ registry.verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
+ }
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.latency.without.routing"
+ )
+ }
+ }
+
+ on("$PREFIX.messages.latency.without.routing") {
+ val counterName = "$PREFIX.messages.latency.without.routing"
+ val latencyWithoutRoutingMs = 200L
+
+ it("should update timer") {
+
+ val sentAt = Instant.now().minusMillis(latencyWithoutRoutingMs)
+
+ cut.notifyMessageReadyForRouting(vesMessageSentAt(sentAt))
+
+ registry.verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(latencyWithoutRoutingMs.toDouble())
+ }
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.processing.time.without.routing"
+ )
+ }
+ }
+ }
+
+
describe("notifyMessageSent") {
val topicName1 = "PERF3GPP"
val topicName2 = "CALLTRACE"
@@ -206,39 +264,6 @@ object MicrometerMetricsTest : Spek({
}
}
- on("$PREFIX.messages.to.collector.travel.time") {
- val counterName = "$PREFIX.messages.to.collector.travel.time"
- val toCollectorTravelTimeMs = 100L
-
- it("should update timer") {
- val now = Instant.now()
- val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
- cut.notifyMessageReceived(vesMessage)
-
- registry.verifyTimer(counterName) { timer ->
- assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
- }
-
- verifyCountersAndTimersAreUnchangedBut(counterName)
- }
- }
-
- on("$PREFIX.messages.processing.time.without.routing") {
- val counterName = "$PREFIX.messages.processing.time.without.routing"
- val processingTimeMs = 100L
-
- it("should update timer") {
-
- cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
-
- registry.verifyTimer(counterName) { timer ->
- assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
- }
-
- verifyCountersAndTimersAreUnchangedBut(counterName)
- }
- }
-
on("$PREFIX.messages.latency") {
val counterName = "$PREFIX.messages.latency"
val latencyMs = 1666L
@@ -398,6 +423,13 @@ object MicrometerMetricsTest : Spek({
}
})
+private fun vesMessageSentAt(sentAt: Instant): VesMessage {
+ val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+ val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
+ return VesMessage(commonHeader,
+ wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements")))
+}
+
private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)