diff options
6 files changed, 58 insertions, 9 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 28b28203..41993e62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.boundary import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause @@ -42,6 +43,7 @@ interface SinkFactory : Closeable { interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) + fun notifyMessageReadyForRouting(msg: VesMessage) fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) fun notifyClientDisconnected() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index ac7c3917..f0d1465b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -92,6 +92,7 @@ internal class HvVesCollector( } private fun route(flux: Flux<VesMessage>) = flux + .doOnNext(metrics::notifyMessageReadyForRouting) .flatMap(router::route) .doOnNext(this::updateSinkMetrics) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index a450b794..3b01d137 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import java.time.Duration import java.time.Instant import kotlin.test.fail @@ -38,6 +39,7 @@ class FakeMetrics : Metrics { var messageBytesReceived: Int = 0; private set var messagesDroppedCount: Int = 0; private set var lastProcessingTimeMicros: Double = -1.0; private set + var lastProcessingTimeWithoutRoutingMicros: Double = -1.0; private set var messagesSentCount: Int = 0; private set var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set @@ -52,6 +54,10 @@ class FakeMetrics : Metrics { messageBytesReceived += msg.payloadSize } + override fun notifyMessageReadyForRouting(msg: VesMessage) { + lastProcessingTimeWithoutRoutingMicros = Duration.between(msg.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 + } + override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ messagesSentToTopic.compute(msg.targetTopic) { k, _ -> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index fa52ac2c..9d417a28 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant import java.time.Duration import java.time.Instant @@ -46,7 +47,6 @@ import java.time.Instant class MicrometerMetrics internal constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) private val receivedMessages = registry.counter(name(MESSAGES, RECEIVED)) private val receivedMessagesPayloadBytes = registry.counter(name(MESSAGES, RECEIVED, PAYLOAD, BYTES)) @@ -58,6 +58,9 @@ class MicrometerMetrics internal constructor( .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) .register(registry) + private val processingTimeWithoutRouting = Timer.builder(name(MESSAGES, PROCESSING, TIME, WITHOUT, ROUTING)) + .publishPercentileHistogram(true) + .register(registry) private val totalLatency = Timer.builder(name(MESSAGES, LATENCY)) .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) @@ -67,12 +70,10 @@ class MicrometerMetrics internal constructor( private val sentMessagesByTopic = { topic: String -> registry.counter(name(MESSAGES, SENT, TOPIC), TOPIC, topic) }.memoize<String, Counter>() - private val droppedMessages = registry.counter(name(MESSAGES, DROPPED)) private val messagesDroppedByCause = { cause: String -> registry.counter(name(MESSAGES, DROPPED, CAUSE), CAUSE, cause) }.memoize<String, Counter>() - private val clientsRejected = registry.counter(name(CLIENTS, REJECTED)) private val clientsRejectedByCause = { cause: String -> registry.counter(name(CLIENTS, REJECTED, CAUSE), CAUSE, cause) @@ -97,6 +98,10 @@ class MicrometerMetrics internal constructor( receivedBytes.increment(size.toDouble()) } + override fun notifyMessageReadyForRouting(msg: VesMessage) { + processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now())) + } + override fun notifyMessageReceived(msg: WireFrameMessage) { receivedMessages.increment() receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble()) @@ -150,6 +155,8 @@ class MicrometerMetrics internal constructor( internal const val LATENCY = "latency" internal const val PAYLOAD = "payload" internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L) + internal const val WITHOUT = "without" + internal const val ROUTING = "routing" internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 66f3a5fc..a3471d46 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Option +import com.google.protobuf.ByteString import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Tags @@ -39,7 +40,9 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge @@ -203,6 +206,25 @@ object MicrometerMetricsTest : Spek({ } } + on("$PREFIX.messages.processing.time.without.routing") { + val counterName = "$PREFIX.messages.processing.time.without.routing" + val processingTimeMs = 100L + + it("should update timer") { + + cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs))) + + registry.verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic", + "$PREFIX.messages.sent", + "$PREFIX.messages.latency") + } + } + on("$PREFIX.messages.latency") { val counterName = "$PREFIX.messages.latency" val latencyMs = 1666L @@ -362,13 +384,19 @@ object MicrometerMetricsTest : Spek({ } }) -fun routedMessage(topic: String, partition: Int = 0) = +private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage { + val commonHeader = commonHeader(domain) + return VesMessage(commonHeader, + wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt)) +} + +private fun routedMessage(topic: String, partition: Int = 0) = vesEvent().run { toRoutedMessage(topic, partition) } -fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = +private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } -fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = +private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = vesEvent().run { val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index ba60d1b0..3013e904 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -30,6 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority +import java.time.Instant +import java.time.temporal.Temporal import java.util.UUID.randomUUID fun vesEvent(domain: VesEventDomain = PERF3GPP, @@ -53,7 +55,7 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP, vesEventListenerVersion: String = "7.0.2", priority: Priority = Priority.NORMAL, lastEpochMicrosec: Long = 100000005 - ): CommonEventHeader = +): CommonEventHeader = CommonEventHeader.newBuilder() .setVersion("sample-version") .setDomain(domain.domainName) @@ -86,14 +88,17 @@ fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMes payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue ) -fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage = +fun wireProtocolFrame(commonHeader: CommonEventHeader, + eventFields: ByteString = ByteString.EMPTY, + receivedAt: Temporal = Instant.now()): WireFrameMessage = vesEventBytes(commonHeader, eventFields).let { payload -> WireFrameMessage( payload = payload, versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR, versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR, payloadSize = payload.size(), - payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + receivedAt = receivedAt ) } |