summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-14 12:05:47 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-17 15:06:29 +0100
commitd55f5c0c3df4b2ea136100e61424810ede749778 (patch)
treeb4d60ede755af2a2204b8303d75b4d74489f6802 /sources/hv-collector-main
parentfb040c0df8ab2b74d02b67feda4e2a161a1311d2 (diff)
Metric: Processing time
Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt16
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt76
2 files changed, 71 insertions, 21 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 18678ff3..259fa037 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -29,7 +29,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import java.time.Duration
+import java.time.Instant
/**
@@ -53,6 +57,7 @@ class MicrometerMetrics internal constructor(
private val droppedCount = { cause: String ->
registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
}.memoize<String, Counter>()
+ private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
@@ -71,14 +76,15 @@ class MicrometerMetrics internal constructor(
receivedBytes.increment(size.toDouble())
}
- override fun notifyMessageReceived(size: Int) {
+ override fun notifyMessageReceived(msg: WireFrameMessage) {
receivedMsgCount.increment()
- receivedMsgBytes.increment(size.toDouble())
+ receivedMsgBytes.increment(msg.payloadSize.toDouble())
}
- override fun notifyMessageSent(topic: String) {
+ override fun notifyMessageSent(msg: RoutedMessage) {
sentCountTotal.increment()
- sentCount(topic).increment()
+ sentCount(msg.topic).increment()
+ processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
}
override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -100,7 +106,7 @@ class MicrometerMetrics internal constructor(
internal const val DROPPED = "dropped"
internal const val CAUSE = "cause"
internal const val TOTAL = "total"
-
+ internal const val TIME = "time"
fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index e2dc2f82..cb5cfc70 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
@@ -35,6 +36,15 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
+import java.time.Instant
+import java.time.temporal.Temporal
+import java.util.concurrent.TimeUnit
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -63,6 +73,9 @@ object MicrometerMetricsTest : Spek({
fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+ fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
+ verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
+
fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
verifyMeter(search, RequiredSearch::counter, verifier)
@@ -71,6 +84,7 @@ object MicrometerMetricsTest : Spek({
fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
registry.meters
+ .filter { it.id.name.startsWith(PREFIX) }
.filter { it is Counter }
.map { it as Counter }
.filterNot { it.id.name in changedCounters }
@@ -105,7 +119,7 @@ object MicrometerMetricsTest : Spek({
val counterName = "$PREFIX.messages.received.count"
it("should increment counter") {
- cut.notifyMessageReceived(777)
+ cut.notifyMessageReceived(emptyWireProtocolFrame())
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -118,7 +132,7 @@ object MicrometerMetricsTest : Spek({
it("should increment counter") {
val bytes = 888
- cut.notifyMessageReceived(bytes)
+ cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
@@ -127,7 +141,7 @@ object MicrometerMetricsTest : Spek({
}
it("should leave all other counters unchanged") {
- cut.notifyMessageReceived(128)
+ cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
verifyAllCountersAreUnchangedBut(
"$PREFIX.messages.received.count",
"$PREFIX.messages.received.bytes"
@@ -143,7 +157,7 @@ object MicrometerMetricsTest : Spek({
val counterName = "$PREFIX.messages.sent.count.total"
it("should increment counter") {
- cut.notifyMessageSent(topicName1)
+ cut.notifyMessageSent(routedMessage(topicName1))
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -155,9 +169,9 @@ object MicrometerMetricsTest : Spek({
on("$PREFIX.messages.sent.topic.count counter") {
val counterName = "$PREFIX.messages.sent.count.topic"
it("should handle counters for different topics") {
- cut.notifyMessageSent(topicName1)
- cut.notifyMessageSent(topicName2)
- cut.notifyMessageSent(topicName2)
+ cut.notifyMessageSent(routedMessage(topicName1))
+ cut.notifyMessageSent(routedMessage(topicName2))
+ cut.notifyMessageSent(routedMessage(topicName2))
verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -168,6 +182,24 @@ object MicrometerMetricsTest : Spek({
}
}
}
+
+ on("$PREFIX.messages.processing.time") {
+ val counterName = "$PREFIX.messages.processing.time"
+ val processingTimeMs = 100L
+
+ it("should update timer") {
+
+ cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
+
+ verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
+ }
+ verifyAllCountersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.count.topic",
+ "$PREFIX.messages.sent.count.total")
+ }
+ }
}
describe("notifyMessageDropped") {
@@ -207,27 +239,27 @@ object MicrometerMetricsTest : Spek({
it("should show difference between sent and received messages") {
on("positive difference") {
- cut.notifyMessageReceived(128)
- cut.notifyMessageReceived(256)
- cut.notifyMessageReceived(256)
- cut.notifyMessageSent("perf3gpp")
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
verifyGauge("messages.processing.count") {
assertThat(it.value()).isCloseTo(2.0, doublePrecision)
}
}
on("zero difference") {
- cut.notifyMessageReceived(128)
- cut.notifyMessageSent("perf3gpp")
+ cut.notifyMessageReceived(emptyWireProtocolFrame())
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
verifyGauge("messages.processing.count") {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
on("negative difference") {
- cut.notifyMessageReceived(128)
- cut.notifyMessageSent("fault")
- cut.notifyMessageSent("perf3gpp")
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+ cut.notifyMessageSent(routedMessage("fault"))
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
verifyGauge("messages.processing.count") {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
@@ -236,3 +268,15 @@ object MicrometerMetricsTest : Spek({
}
})
+
+fun routedMessage(topic: String, partition: Int = 0) =
+ vesEvent().let {evt ->
+ RoutedMessage(topic, partition,
+ VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
+ }
+
+fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
+ vesEvent().let {evt ->
+ RoutedMessage(topic, partition,
+ VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
+ } \ No newline at end of file