diff options
Diffstat (limited to 'sources')
5 files changed, 167 insertions, 29 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index f060426d..d35e17d6 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -30,9 +30,10 @@ import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.ClientRejectionCause +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant import java.time.Duration import java.time.Instant @@ -49,6 +50,9 @@ class MicrometerMetrics internal constructor( private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) + private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME)) + private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) private val sentToTopicCount = { topic: String -> registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic) @@ -59,8 +63,6 @@ class MicrometerMetrics internal constructor( registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause) }.memoize<String, Counter>() - private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) - private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT)) private val clientsRejectedCauseCount = { cause: String -> registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause) @@ -90,9 +92,12 @@ class MicrometerMetrics internal constructor( } override fun notifyMessageSent(msg: RoutedMessage) { + val now = Instant.now() sentCountTotal.increment() sentToTopicCount(msg.topic).increment() - processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now())) + + processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) + totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now)) } override fun notifyMessageDropped(cause: MessageDropCause) { @@ -121,6 +126,7 @@ class MicrometerMetrics internal constructor( internal const val TOPIC = "topic" internal const val DROPPED = "dropped" internal const val TIME = "time" - fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" + internal const val LATENCY = "latency" + internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 2ecdb26b..71fc8f7f 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig @@ -34,10 +35,10 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX -import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame @@ -47,6 +48,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSi import java.time.Instant import java.time.temporal.Temporal import java.util.concurrent.TimeUnit +import kotlin.reflect.KClass /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -54,6 +56,7 @@ import java.util.concurrent.TimeUnit */ object MicrometerMetricsTest : Spek({ val doublePrecision = Percentage.withPercentage(0.5) + val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time") lateinit var registry: PrometheusMeterRegistry lateinit var cut: MicrometerMetrics @@ -84,15 +87,25 @@ object MicrometerMetricsTest : Spek({ fun <T> verifyCounter(name: String, verifier: (Counter) -> T) = verifyCounter(registrySearch(name), verifier) - fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { - registry.meters - .filter { it.id.name.startsWith(PREFIX) } - .filter { it is Counter } - .map { it as Counter } - .filterNot { it.id.name in changedCounters } - .forEach { - assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) - } + fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) { + fun <T : Meter> verifyAllMetersAreUnchangedBut( + clazz: KClass<T>, + changedCounters: Collection<String>, + valueOf: (T) -> Double) { + registry.meters + .filter { it.id.name.startsWith(PREFIX) } + .filter { clazz.isInstance(it) } + .map { it as T } + .filterNot { it.id.name in changedCounters } + .forEach { + assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) + } + } + + setOf(*changedMeters).let { changedMetersCollection -> + verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() } + verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() } + } } describe("notifyBytesReceived") { @@ -111,7 +124,7 @@ object MicrometerMetricsTest : Spek({ it("should leave all other counters unchanged") { cut.notifyBytesReceived(128) - verifyAllCountersAreUnchangedBut(counterName) + verifyCountersAndTimersAreUnchangedBut(counterName) } } } @@ -144,7 +157,7 @@ object MicrometerMetricsTest : Spek({ it("should leave all other counters unchanged") { cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128)) - verifyAllCountersAreUnchangedBut( + verifyCountersAndTimersAreUnchangedBut( "$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes" ) @@ -164,7 +177,11 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count") + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic.count", + "$PREFIX.messages.processing.time", + "$PREFIX.messages.latency.time") } } @@ -191,17 +208,41 @@ object MicrometerMetricsTest : Spek({ it("should update timer") { - cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs))) + cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs))) verifyTimer(counterName) { timer -> assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) } - verifyAllCountersAreUnchangedBut( + verifyCountersAndTimersAreUnchangedBut( counterName, "$PREFIX.messages.sent.topic.count", - "$PREFIX.messages.sent.count") + "$PREFIX.messages.sent.count", + "$PREFIX.messages.latency.time") } } + + on("$PREFIX.messages.latency.time") { + val counterName = "$PREFIX.messages.latency.time" + val latencyMs = 1666L + + it("should update timer") { + + cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs))) + + verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)) + .isGreaterThanOrEqualTo(latencyMs.toDouble()) + .isLessThanOrEqualTo(latencyMs + 10000.0) + + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic.count", + "$PREFIX.messages.sent.count", + "$PREFIX.messages.processing.time") + } + } + } describe("notifyMessageDropped") { @@ -215,7 +256,7 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count") + verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count") } } @@ -280,7 +321,7 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count") + verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count") } } @@ -304,13 +345,23 @@ object MicrometerMetricsTest : Spek({ }) fun routedMessage(topic: String, partition: Int = 0) = - vesEvent().let {evt -> + vesEvent().let { evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) } -fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) = - vesEvent().let {evt -> +fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = + vesEvent().let { evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) } + +fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = + vesEvent().let { evt -> + val builder = evt.toBuilder() + builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 + builder.build() + }.let { evt -> + RoutedMessage(topic, partition, + VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) + } diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index a8456890..ed0cab63 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -61,8 +61,8 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP, .setEventId(id) .setEventName("sample-event-name") .setEventType("sample-event-type") - .setStartEpochMicrosec(120034455) - .setLastEpochMicrosec(120034455) + .setStartEpochMicrosec(100000000) + .setLastEpochMicrosec(100000005) .setNfNamingCode("sample-nf-naming-code") .setNfcNamingCode("sample-nfc-naming-code") .setNfVendorName("vendor-name") diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt new file mode 100644 index 00000000..c07da670 --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import java.time.Instant + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +object TimeUtils { + fun epochMicroToInstant(epochMicroseconds: Long): Instant { + val seconds = epochMicroseconds / MICROSECONDS_IN_SECOND + val nanos = (epochMicroseconds - seconds * MICROSECONDS_IN_SECOND) * NANOSECONDS_IN_MICROSECOND + return Instant.ofEpochSecond(seconds, nanos) + } + + private const val MICROSECONDS_IN_SECOND = 1_000_000L + private const val NANOSECONDS_IN_MICROSECOND = 1_000L +} diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt new file mode 100644 index 00000000..3ec74ab7 --- /dev/null +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import java.time.Instant + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +internal object TimeTest : Spek({ + describe("epochMicrosecond to Instant converter") { + it("should convert") { + val epochSeconds = 1545048422L + val nanoAdjustment = 666999000L + val epochMicros = 1545048422666999L + + val result = TimeUtils.epochMicroToInstant(epochMicros) + + assertThat(result).isEqualTo(Instant.ofEpochSecond(epochSeconds, nanoAdjustment)) + } + } +}) |