aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-17 13:22:52 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-18 11:00:12 +0100
commit30488f1922f789c5b8e18934456968aa354c9671 (patch)
tree48716d0c7d48c55931828bdc056083da838e4796
parent8c180a2101f54d7cc0e3527c2bbe23390eea9cef (diff)
Metric: Message latency
Defined as a difference between now and vesHeader.lastEpochTime. Change-Id: I4aa97e8efc13cb0039fde38b4fd2aa6411c7b89a Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt16
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt95
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt4
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt37
-rw-r--r--sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt44
5 files changed, 167 insertions, 29 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index f060426d..d35e17d6 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -30,9 +30,10 @@ import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
import java.time.Duration
import java.time.Instant
@@ -49,6 +50,9 @@ class MicrometerMetrics internal constructor(
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
+ private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
+ private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME))
+
private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
private val sentToTopicCount = { topic: String ->
registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
@@ -59,8 +63,6 @@ class MicrometerMetrics internal constructor(
registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause)
}.memoize<String, Counter>()
- private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
-
private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT))
private val clientsRejectedCauseCount = { cause: String ->
registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause)
@@ -90,9 +92,12 @@ class MicrometerMetrics internal constructor(
}
override fun notifyMessageSent(msg: RoutedMessage) {
+ val now = Instant.now()
sentCountTotal.increment()
sentToTopicCount(msg.topic).increment()
- processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
+
+ processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
+ totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
}
override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -121,6 +126,7 @@ class MicrometerMetrics internal constructor(
internal const val TOPIC = "topic"
internal const val DROPPED = "dropped"
internal const val TIME = "time"
- fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
+ internal const val LATENCY = "latency"
+ internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 2ecdb26b..71fc8f7f 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
@@ -34,10 +35,10 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -47,6 +48,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSi
import java.time.Instant
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
+import kotlin.reflect.KClass
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -54,6 +56,7 @@ import java.util.concurrent.TimeUnit
*/
object MicrometerMetricsTest : Spek({
val doublePrecision = Percentage.withPercentage(0.5)
+ val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
lateinit var registry: PrometheusMeterRegistry
lateinit var cut: MicrometerMetrics
@@ -84,15 +87,25 @@ object MicrometerMetricsTest : Spek({
fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
verifyCounter(registrySearch(name), verifier)
- fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
- registry.meters
- .filter { it.id.name.startsWith(PREFIX) }
- .filter { it is Counter }
- .map { it as Counter }
- .filterNot { it.id.name in changedCounters }
- .forEach {
- assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
- }
+ fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
+ fun <T : Meter> verifyAllMetersAreUnchangedBut(
+ clazz: KClass<T>,
+ changedCounters: Collection<String>,
+ valueOf: (T) -> Double) {
+ registry.meters
+ .filter { it.id.name.startsWith(PREFIX) }
+ .filter { clazz.isInstance(it) }
+ .map { it as T }
+ .filterNot { it.id.name in changedCounters }
+ .forEach {
+ assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ setOf(*changedMeters).let { changedMetersCollection ->
+ verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
+ verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
+ }
}
describe("notifyBytesReceived") {
@@ -111,7 +124,7 @@ object MicrometerMetricsTest : Spek({
it("should leave all other counters unchanged") {
cut.notifyBytesReceived(128)
- verifyAllCountersAreUnchangedBut(counterName)
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
}
}
@@ -144,7 +157,7 @@ object MicrometerMetricsTest : Spek({
it("should leave all other counters unchanged") {
cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
- verifyAllCountersAreUnchangedBut(
+ verifyCountersAndTimersAreUnchangedBut(
"$PREFIX.messages.received.count",
"$PREFIX.messages.received.bytes"
)
@@ -164,7 +177,11 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.topic.count",
+ "$PREFIX.messages.processing.time",
+ "$PREFIX.messages.latency.time")
}
}
@@ -191,17 +208,41 @@ object MicrometerMetricsTest : Spek({
it("should update timer") {
- cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
+ cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
}
- verifyAllCountersAreUnchangedBut(
+ verifyCountersAndTimersAreUnchangedBut(
counterName,
"$PREFIX.messages.sent.topic.count",
- "$PREFIX.messages.sent.count")
+ "$PREFIX.messages.sent.count",
+ "$PREFIX.messages.latency.time")
}
}
+
+ on("$PREFIX.messages.latency.time") {
+ val counterName = "$PREFIX.messages.latency.time"
+ val latencyMs = 1666L
+
+ it("should update timer") {
+
+ cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
+
+ verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS))
+ .isGreaterThanOrEqualTo(latencyMs.toDouble())
+ .isLessThanOrEqualTo(latencyMs + 10000.0)
+
+ }
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.topic.count",
+ "$PREFIX.messages.sent.count",
+ "$PREFIX.messages.processing.time")
+ }
+ }
+
}
describe("notifyMessageDropped") {
@@ -215,7 +256,7 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
+ verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
}
}
@@ -280,7 +321,7 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
+ verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
}
}
@@ -304,13 +345,23 @@ object MicrometerMetricsTest : Spek({
})
fun routedMessage(topic: String, partition: Int = 0) =
- vesEvent().let {evt ->
+ vesEvent().let { evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
}
-fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
- vesEvent().let {evt ->
+fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
+ vesEvent().let { evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
}
+
+fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
+ vesEvent().let { evt ->
+ val builder = evt.toBuilder()
+ builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+ builder.build()
+ }.let { evt ->
+ RoutedMessage(topic, partition,
+ VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
+ }
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index a8456890..ed0cab63 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
@@ -61,8 +61,8 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP,
.setEventId(id)
.setEventName("sample-event-name")
.setEventType("sample-event-type")
- .setStartEpochMicrosec(120034455)
- .setLastEpochMicrosec(120034455)
+ .setStartEpochMicrosec(100000000)
+ .setLastEpochMicrosec(100000005)
.setNfNamingCode("sample-nf-naming-code")
.setNfcNamingCode("sample-nfc-naming-code")
.setNfVendorName("vendor-name")
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt
new file mode 100644
index 00000000..c07da670
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import java.time.Instant
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object TimeUtils {
+ fun epochMicroToInstant(epochMicroseconds: Long): Instant {
+ val seconds = epochMicroseconds / MICROSECONDS_IN_SECOND
+ val nanos = (epochMicroseconds - seconds * MICROSECONDS_IN_SECOND) * NANOSECONDS_IN_MICROSECOND
+ return Instant.ofEpochSecond(seconds, nanos)
+ }
+
+ private const val MICROSECONDS_IN_SECOND = 1_000_000L
+ private const val NANOSECONDS_IN_MICROSECOND = 1_000L
+}
diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt
new file mode 100644
index 00000000..3ec74ab7
--- /dev/null
+++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import java.time.Instant
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object TimeTest : Spek({
+ describe("epochMicrosecond to Instant converter") {
+ it("should convert") {
+ val epochSeconds = 1545048422L
+ val nanoAdjustment = 666999000L
+ val epochMicros = 1545048422666999L
+
+ val result = TimeUtils.epochMicroToInstant(epochMicros)
+
+ assertThat(result).isEqualTo(Instant.ofEpochSecond(epochSeconds, nanoAdjustment))
+ }
+ }
+})