From d55f5c0c3df4b2ea136100e61424810ede749778 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 14 Dec 2018 12:05:47 +0100 Subject: Metric: Processing time Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/boundary/adapters.kt | 5 +- .../onap/dcae/collectors/veshv/impl/VesDecoder.kt | 7 +- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 22 +++---- .../veshv/impl/adapters/LoggingSinkProvider.kt | 2 +- .../impl/adapters/kafka/VesMessageSerializer.kt | 4 +- .../onap/dcae/collectors/veshv/model/VesMessage.kt | 4 +- .../collectors/veshv/impl/MessageValidatorTest.kt | 23 +++---- .../onap/dcae/collectors/veshv/impl/RouterTest.kt | 9 ++- .../dcae/collectors/veshv/impl/VesDecoderTest.kt | 12 ++-- .../veshv/tests/component/MetricsSpecification.kt | 22 ++++++- .../dcae/collectors/veshv/tests/component/Sut.kt | 5 ++ .../dcae/collectors/veshv/tests/fakes/metrics.kt | 18 +++-- .../onap/dcae/collectors/veshv/tests/fakes/sink.kt | 10 ++- .../dcae/collectors/veshv/domain/wire_frame.kt | 5 +- .../veshv/main/metrics/MicrometerMetrics.kt | 16 +++-- .../collectors/veshv/main/MicrometerMetricsTest.kt | 76 +++++++++++++++++----- .../dcae/collectors/veshv/tests/utils/vesEvents.kt | 37 ++++++++++- 17 files changed, 203 insertions(+), 74 deletions(-) (limited to 'sources') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 3f69c088..1334738a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.MessageDropCause @@ -31,8 +32,8 @@ interface Sink { interface Metrics { fun notifyBytesReceived(size: Int) - fun notifyMessageReceived(size: Int) - fun notifyMessageSent(topic: String) + fun notifyMessageReceived(msg: WireFrameMessage) + fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index c670e1d8..ee499e19 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Try = + fun decode(frame: WireFrameMessage): Try = Try { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - VesMessage(decodedHeader, bytes) + val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader + VesMessage(decodedHeader, frame) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index b29432f0..51f894d3 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -68,7 +67,7 @@ internal class VesHvCollector( private fun decodeWireFrame(flux: Flux): Flux = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireChunkDecoder::decode) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + .doOnNext(metrics::notifyMessageReceived) private fun filterInvalidWireFrame(flux: Flux): Flux = flux .filterFailedWithLog { @@ -78,15 +77,14 @@ internal class VesHvCollector( } private fun decodeProtobufPayload(flux: Flux): Flux = flux - .map(WireFrameMessage::payload) - .flatMap(::decodePayload) - - private fun decodePayload(rawPayload: ByteData): Flux = protobufDecoder - .decode(rawPayload) - .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } - .filterFailedWithLog(logger, clientContext::fullMdc, - { "Ves event header decoded successfully" }, - { "Failed to decode ves event header, reason: ${it.message}" }) + .flatMap { frame -> + protobufDecoder + .decode(frame) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } + .filterFailedWithLog(logger, clientContext::fullMdc, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + } private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux .filterFailedWithLog { @@ -98,7 +96,7 @@ internal class VesHvCollector( private fun routeMessage(flux: Flux): Flux = flux .flatMap(this::findRoute) .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } + .doOnNext(metrics::notifyMessageSent) private fun findRoute(msg: VesMessage) = router .findDestination(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index ec8593af..14966d9b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider { private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) + val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) logger.info(ctx, logMessageSupplier) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt index 7a6ac7c8..c92518a5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt @@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage */ class VesMessageSerializer : Serializer { override fun configure(configs: MutableMap?, isKey: Boolean) { + // not needed } - override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() + override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray() override fun close() { + // not needed } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 1965d78c..d3640193 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.model -import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) +data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index f784daa4..7d136ef1 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -29,7 +29,8 @@ import org.jetbrains.spek.api.dsl.* import org.onap.dcae.collectors.veshv.domain.* import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import org.onap.ves.VesEventOuterClass.CommonEventHeader.* import kotlin.test.assertTrue import kotlin.test.fail @@ -43,7 +44,7 @@ internal object MessageValidatorTest : Spek({ val commonHeader = commonHeader() it("should accept message with fully initialized message header") { - val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader)) + val vesMessage = VesMessage(commonHeader, wireProtocolFrame(commonHeader)) with(cut) { assertThat(validateProtobufMessage(vesMessage).isRight()) .describedAs("message validation result").isTrue() @@ -53,7 +54,7 @@ internal object MessageValidatorTest : Spek({ VesEventDomain.values().forEach { domain -> it("should accept message with $domain domain") { val header = commonHeader(domain) - val vesMessage = VesMessage(header, vesEventBytes(header)) + val vesMessage = VesMessage(header, wireProtocolFrame(header)) with(cut) { assertThat(validateProtobufMessage(vesMessage).isRight()) .describedAs("message validation result").isTrue() @@ -63,7 +64,7 @@ internal object MessageValidatorTest : Spek({ } on("ves hv message bytes") { - val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) + val vesMessage = VesMessage(getDefaultInstance(), emptyWireProtocolFrame()) it("should not accept message with default header") { with(cut) { @@ -100,7 +101,7 @@ internal object MessageValidatorTest : Spek({ ).forEach { value, expectedResult -> on("ves hv message including header with priority $value") { val commonEventHeader = commonHeader(priority = value) - val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) + val vesMessage = VesMessage(commonEventHeader, wireProtocolFrame(commonEventHeader)) it("should resolve validation result") { with(cut) { @@ -121,7 +122,7 @@ internal object MessageValidatorTest : Spek({ .setEventId("Sample event Id") .setSourceName("Sample Source") .build() - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept not fully initialized message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) @@ -148,7 +149,7 @@ internal object MessageValidatorTest : Spek({ on("ves hv message including header.vesEventListenerVersion with non-string major part") { val commonHeader = commonHeader(vesEventListenerVersion = "sample-version") - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept message header") { @@ -169,7 +170,7 @@ internal object MessageValidatorTest : Spek({ on("ves hv message including header.vesEventListenerVersion with major part != 7") { val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3") - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) @@ -190,7 +191,7 @@ internal object MessageValidatorTest : Spek({ on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") { val commonHeader = commonHeader(vesEventListenerVersion = "7.test") - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) @@ -237,7 +238,7 @@ internal object MessageValidatorTest : Spek({ with(cut) { on("valid message as input") { val commonHeader = commonHeader() - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) val vesMessage = VesMessage(commonHeader, rawMessageBytes) it("should be right") { @@ -247,7 +248,7 @@ internal object MessageValidatorTest : Spek({ } on("invalid message as input") { val commonHeader = newBuilder().build() - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) val vesMessage = VesMessage(commonHeader, rawMessageBytes) it("should be left") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index e4190163..90b850c0 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -21,13 +21,11 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None import arrow.core.Some -import io.netty.buffer.ByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG @@ -36,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame /** @@ -61,7 +60,7 @@ object RouterTest : Spek({ val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) + val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) val result = cut.findDestination(message) it("should have route available") { @@ -82,7 +81,7 @@ object RouterTest : Spek({ } on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY) + val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) val result = cut.findDestination(message) it("should have route available") { @@ -103,7 +102,7 @@ object RouterTest : Spek({ } on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY) + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) val result = cut.findDestination(message) it("should not have route available") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 605e7a6e..74f33a78 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -42,16 +43,16 @@ internal object VesDecoderTest : Spek({ on("ves hv message bytes") { val commonHeader = commonHeader(HEARTBEAT) - val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) + val wtpFrame = wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) it("should decode only header and pass it on along with raw message") { val expectedMessage = VesMessage( commonHeader, - rawMessageBytes + wtpFrame ) assertTrue { - cut.decode(rawMessageBytes).exists { + cut.decode(wtpFrame).exists { it == expectedMessage } } @@ -60,9 +61,10 @@ internal object VesDecoderTest : Spek({ on("invalid ves hv message bytes") { val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) + val wtpFrame = emptyWireProtocolFrame().copy(payload = rawMessageBytes, payloadSize = rawMessageBytes.size()) it("should throw error") { - assertFailedWithError(cut.decode(rawMessageBytes)) + assertFailedWithError(cut.decode(wtpFrame)) } } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index dd8acf77..9f5c37e1 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -33,7 +33,12 @@ import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TO import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.* +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader +import org.onap.dcae.collectors.veshv.tests.utils.vesEvent +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import java.time.Duration object MetricsSpecification : Spek({ debugRx(false) @@ -102,6 +107,21 @@ object MetricsSpecification : Spek({ } } + describe("Processing time") { + it("should gather processing time metric") { + val delay = Duration.ofMillis(10) + val sut = vesHvWithDelayingSink(delay) + + sut.handleConnection(vesWireFrameMessage(PERF3GPP)) + + + val metrics = sut.metrics + assertThat(metrics.lastProcessingTimeMicros) + .describedAs("processingTime metric") + .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0) + } + } + describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { val sut = vesHvWithNoOpSink(basicConfiguration) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 0c1b589b..7ebbfba0 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -77,3 +77,8 @@ fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConf Sut(NoOpSink()).apply { configurationProvider.updateConfiguration(collectorConfiguration) } + +fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = + Sut(ProcessingSink { it.delayElements(delay) }).apply { + configurationProvider.updateConfiguration(collectorConfiguration) + } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index 9ddb7115..660ce498 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -20,7 +20,11 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import java.time.Duration +import java.time.Instant import java.util.concurrent.ConcurrentHashMap import kotlin.test.fail @@ -31,6 +35,7 @@ import kotlin.test.fail class FakeMetrics : Metrics { var bytesReceived: Int = 0 var messageBytesReceived: Int = 0 + var lastProcessingTimeMicros: Double = -1.0 var messagesSentCount: Int = 0 var messagesDroppedCount: Int = 0 @@ -41,13 +46,16 @@ class FakeMetrics : Metrics { bytesReceived += size } - override fun notifyMessageReceived(size: Int) { - messageBytesReceived += size + override fun notifyMessageReceived(msg: WireFrameMessage) { + messageBytesReceived += msg.payloadSize } - override fun notifyMessageSent(topic: String) { + override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ - messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } + messagesSentToTopic.compute(msg.topic) { k, _ -> + messagesSentToTopic[k]?.inc() ?: 1 + } + lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 } override fun notifyMessageDropped(cause: MessageDropCause) { @@ -61,4 +69,4 @@ class FakeMetrics : Metrics { fun messagesDropped(cause: MessageDropCause) = messagesDroppedCause[cause] ?: fail("No messages were dropped due to cause: ${cause.name}") -} \ No newline at end of file +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 865dd510..2f731f53 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -19,12 +19,15 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.reactivestreams.Publisher import reactor.core.publisher.Flux import java.util.* import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicLong +import java.util.function.Function /** * @author Piotr Jaszczyk @@ -58,6 +61,9 @@ class CountingSink : Sink { } } -class NoOpSink : Sink { - override fun send(messages: Flux): Flux = messages + +open class ProcessingSink(val transformer: (Flux) -> Publisher) : Sink { + override fun send(messages: Flux): Flux = messages.transform(transformer) } + +class NoOpSink : ProcessingSink(::identity) diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index 1257c6bb..d1fdb10c 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -22,6 +22,8 @@ package org.onap.dcae.collectors.veshv.domain import arrow.core.Either import arrow.core.Either.Companion.left import arrow.core.Either.Companion.right +import java.time.Instant +import java.time.temporal.Temporal /** @@ -58,7 +60,8 @@ data class WireFrameMessage(val payload: ByteData, val versionMajor: Short, val versionMinor: Short, val payloadType: Int, - val payloadSize: Int + val payloadSize: Int, + val receivedAt: Temporal = Instant.now() ) { constructor(payload: ByteArray) : this( ByteData(payload), diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 18678ff3..259fa037 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -29,7 +29,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import java.time.Duration +import java.time.Instant /** @@ -53,6 +57,7 @@ class MicrometerMetrics internal constructor( private val droppedCount = { cause: String -> registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause) }.memoize() + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { @@ -71,14 +76,15 @@ class MicrometerMetrics internal constructor( receivedBytes.increment(size.toDouble()) } - override fun notifyMessageReceived(size: Int) { + override fun notifyMessageReceived(msg: WireFrameMessage) { receivedMsgCount.increment() - receivedMsgBytes.increment(size.toDouble()) + receivedMsgBytes.increment(msg.payloadSize.toDouble()) } - override fun notifyMessageSent(topic: String) { + override fun notifyMessageSent(msg: RoutedMessage) { sentCountTotal.increment() - sentCount(topic).increment() + sentCount(msg.topic).increment() + processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now())) } override fun notifyMessageDropped(cause: MessageDropCause) { @@ -100,7 +106,7 @@ class MicrometerMetrics internal constructor( internal const val DROPPED = "dropped" internal const val CAUSE = "cause" internal const val TOTAL = "total" - + internal const val TIME = "time" fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index e2dc2f82..cb5cfc70 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry @@ -35,6 +36,15 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.vesEvent +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize +import java.time.Instant +import java.time.temporal.Temporal +import java.util.concurrent.TimeUnit /** * @author Piotr Jaszczyk @@ -63,6 +73,9 @@ object MicrometerMetricsTest : Spek({ fun verifyGauge(name: String, verifier: (Gauge) -> T) = verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + fun verifyTimer(name: String, verifier: (Timer) -> T) = + verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier) + fun verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = verifyMeter(search, RequiredSearch::counter, verifier) @@ -71,6 +84,7 @@ object MicrometerMetricsTest : Spek({ fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { registry.meters + .filter { it.id.name.startsWith(PREFIX) } .filter { it is Counter } .map { it as Counter } .filterNot { it.id.name in changedCounters } @@ -105,7 +119,7 @@ object MicrometerMetricsTest : Spek({ val counterName = "$PREFIX.messages.received.count" it("should increment counter") { - cut.notifyMessageReceived(777) + cut.notifyMessageReceived(emptyWireProtocolFrame()) verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) @@ -118,7 +132,7 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { val bytes = 888 - cut.notifyMessageReceived(bytes) + cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes)) verifyCounter(counterName) { assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) @@ -127,7 +141,7 @@ object MicrometerMetricsTest : Spek({ } it("should leave all other counters unchanged") { - cut.notifyMessageReceived(128) + cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128)) verifyAllCountersAreUnchangedBut( "$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes" @@ -143,7 +157,7 @@ object MicrometerMetricsTest : Spek({ val counterName = "$PREFIX.messages.sent.count.total" it("should increment counter") { - cut.notifyMessageSent(topicName1) + cut.notifyMessageSent(routedMessage(topicName1)) verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) @@ -155,9 +169,9 @@ object MicrometerMetricsTest : Spek({ on("$PREFIX.messages.sent.topic.count counter") { val counterName = "$PREFIX.messages.sent.count.topic" it("should handle counters for different topics") { - cut.notifyMessageSent(topicName1) - cut.notifyMessageSent(topicName2) - cut.notifyMessageSent(topicName2) + cut.notifyMessageSent(routedMessage(topicName1)) + cut.notifyMessageSent(routedMessage(topicName2)) + cut.notifyMessageSent(routedMessage(topicName2)) verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) @@ -168,6 +182,24 @@ object MicrometerMetricsTest : Spek({ } } } + + on("$PREFIX.messages.processing.time") { + val counterName = "$PREFIX.messages.processing.time" + val processingTimeMs = 100L + + it("should update timer") { + + cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs))) + + verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) + } + verifyAllCountersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.count.topic", + "$PREFIX.messages.sent.count.total") + } + } } describe("notifyMessageDropped") { @@ -207,27 +239,27 @@ object MicrometerMetricsTest : Spek({ it("should show difference between sent and received messages") { on("positive difference") { - cut.notifyMessageReceived(128) - cut.notifyMessageReceived(256) - cut.notifyMessageReceived(256) - cut.notifyMessageSent("perf3gpp") + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageSent(routedMessage("perf3gpp")) verifyGauge("messages.processing.count") { assertThat(it.value()).isCloseTo(2.0, doublePrecision) } } on("zero difference") { - cut.notifyMessageReceived(128) - cut.notifyMessageSent("perf3gpp") + cut.notifyMessageReceived(emptyWireProtocolFrame()) + cut.notifyMessageSent(routedMessage("perf3gpp")) verifyGauge("messages.processing.count") { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } on("negative difference") { - cut.notifyMessageReceived(128) - cut.notifyMessageSent("fault") - cut.notifyMessageSent("perf3gpp") + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) + cut.notifyMessageSent(routedMessage("fault")) + cut.notifyMessageSent(routedMessage("perf3gpp")) verifyGauge("messages.processing.count") { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } @@ -236,3 +268,15 @@ object MicrometerMetricsTest : Spek({ } }) + +fun routedMessage(topic: String, partition: Int = 0) = + vesEvent().let {evt -> + RoutedMessage(topic, partition, + VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) + } + +fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) = + vesEvent().let {evt -> + RoutedMessage(topic, partition, + VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) + } \ No newline at end of file diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index cf30d2ce..a8456890 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -23,8 +23,10 @@ package org.onap.dcae.collectors.veshv.tests.utils import com.google.protobuf.ByteString import com.google.protobuf.MessageLite import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadContentType import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority @@ -72,7 +74,38 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP, .setVesEventListenerVersion(vesEventListenerVersion) .build() -fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData = - vesEvent(commonHeader, byteString).toByteData() +fun emptyWireProtocolFrame(): WireFrameMessage = wireProtocolFrameWithPayloadSize(0) + + +fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMessage( + payload = ByteData(ByteArray(size)), + versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR, + versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR, + payloadSize = size, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue +) + +fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage = + vesEventBytes(commonHeader, eventFields).let { payload -> + WireFrameMessage( + payload = payload, + versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR, + versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR, + payloadSize = payload.size(), + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue + ) + } + +fun wireProtocolFrame(evt: VesEventOuterClass.VesEvent) = + WireFrameMessage( + payload = ByteData(evt.toByteArray()), + payloadSize = evt.serializedSize, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR, + versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR + ) + +fun vesEventBytes(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): ByteData = + vesEvent(commonHeader, eventFields).toByteData() fun MessageLite.toByteData(): ByteData = ByteData(toByteArray()) \ No newline at end of file -- cgit 1.2.3-korg