summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt16
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt95
2 files changed, 84 insertions, 27 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index f060426d..d35e17d6 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -30,9 +30,10 @@ import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
import java.time.Duration
import java.time.Instant
@@ -49,6 +50,9 @@ class MicrometerMetrics internal constructor(
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
+ private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
+ private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME))
+
private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
private val sentToTopicCount = { topic: String ->
registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
@@ -59,8 +63,6 @@ class MicrometerMetrics internal constructor(
registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause)
}.memoize<String, Counter>()
- private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
-
private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT))
private val clientsRejectedCauseCount = { cause: String ->
registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause)
@@ -90,9 +92,12 @@ class MicrometerMetrics internal constructor(
}
override fun notifyMessageSent(msg: RoutedMessage) {
+ val now = Instant.now()
sentCountTotal.increment()
sentToTopicCount(msg.topic).increment()
- processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
+
+ processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
+ totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
}
override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -121,6 +126,7 @@ class MicrometerMetrics internal constructor(
internal const val TOPIC = "topic"
internal const val DROPPED = "dropped"
internal const val TIME = "time"
- fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
+ internal const val LATENCY = "latency"
+ internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 2ecdb26b..71fc8f7f 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
@@ -34,10 +35,10 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -47,6 +48,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSi
import java.time.Instant
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
+import kotlin.reflect.KClass
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -54,6 +56,7 @@ import java.util.concurrent.TimeUnit
*/
object MicrometerMetricsTest : Spek({
val doublePrecision = Percentage.withPercentage(0.5)
+ val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
lateinit var registry: PrometheusMeterRegistry
lateinit var cut: MicrometerMetrics
@@ -84,15 +87,25 @@ object MicrometerMetricsTest : Spek({
fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
verifyCounter(registrySearch(name), verifier)
- fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
- registry.meters
- .filter { it.id.name.startsWith(PREFIX) }
- .filter { it is Counter }
- .map { it as Counter }
- .filterNot { it.id.name in changedCounters }
- .forEach {
- assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
- }
+ fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
+ fun <T : Meter> verifyAllMetersAreUnchangedBut(
+ clazz: KClass<T>,
+ changedCounters: Collection<String>,
+ valueOf: (T) -> Double) {
+ registry.meters
+ .filter { it.id.name.startsWith(PREFIX) }
+ .filter { clazz.isInstance(it) }
+ .map { it as T }
+ .filterNot { it.id.name in changedCounters }
+ .forEach {
+ assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ setOf(*changedMeters).let { changedMetersCollection ->
+ verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
+ verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
+ }
}
describe("notifyBytesReceived") {
@@ -111,7 +124,7 @@ object MicrometerMetricsTest : Spek({
it("should leave all other counters unchanged") {
cut.notifyBytesReceived(128)
- verifyAllCountersAreUnchangedBut(counterName)
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
}
}
@@ -144,7 +157,7 @@ object MicrometerMetricsTest : Spek({
it("should leave all other counters unchanged") {
cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
- verifyAllCountersAreUnchangedBut(
+ verifyCountersAndTimersAreUnchangedBut(
"$PREFIX.messages.received.count",
"$PREFIX.messages.received.bytes"
)
@@ -164,7 +177,11 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.topic.count",
+ "$PREFIX.messages.processing.time",
+ "$PREFIX.messages.latency.time")
}
}
@@ -191,17 +208,41 @@ object MicrometerMetricsTest : Spek({
it("should update timer") {
- cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
+ cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
}
- verifyAllCountersAreUnchangedBut(
+ verifyCountersAndTimersAreUnchangedBut(
counterName,
"$PREFIX.messages.sent.topic.count",
- "$PREFIX.messages.sent.count")
+ "$PREFIX.messages.sent.count",
+ "$PREFIX.messages.latency.time")
}
}
+
+ on("$PREFIX.messages.latency.time") {
+ val counterName = "$PREFIX.messages.latency.time"
+ val latencyMs = 1666L
+
+ it("should update timer") {
+
+ cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
+
+ verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS))
+ .isGreaterThanOrEqualTo(latencyMs.toDouble())
+ .isLessThanOrEqualTo(latencyMs + 10000.0)
+
+ }
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.topic.count",
+ "$PREFIX.messages.sent.count",
+ "$PREFIX.messages.processing.time")
+ }
+ }
+
}
describe("notifyMessageDropped") {
@@ -215,7 +256,7 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
+ verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
}
}
@@ -280,7 +321,7 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
+ verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
}
}
@@ -304,13 +345,23 @@ object MicrometerMetricsTest : Spek({
})
fun routedMessage(topic: String, partition: Int = 0) =
- vesEvent().let {evt ->
+ vesEvent().let { evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
}
-fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
- vesEvent().let {evt ->
+fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
+ vesEvent().let { evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
}
+
+fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
+ vesEvent().let { evt ->
+ val builder = evt.toBuilder()
+ builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+ builder.build()
+ }.let { evt ->
+ RoutedMessage(topic, partition,
+ VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
+ }