From d55f5c0c3df4b2ea136100e61424810ede749778 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 14 Dec 2018 12:05:47 +0100 Subject: Metric: Processing time Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk --- .../veshv/tests/component/MetricsSpecification.kt | 22 +++++++++++++++++++++- .../dcae/collectors/veshv/tests/component/Sut.kt | 5 +++++ .../dcae/collectors/veshv/tests/fakes/metrics.kt | 18 +++++++++++++----- .../onap/dcae/collectors/veshv/tests/fakes/sink.kt | 10 ++++++++-- 4 files changed, 47 insertions(+), 8 deletions(-) (limited to 'sources/hv-collector-ct/src') diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index dd8acf77..9f5c37e1 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -33,7 +33,12 @@ import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TO import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.* +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader +import org.onap.dcae.collectors.veshv.tests.utils.vesEvent +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import java.time.Duration object MetricsSpecification : Spek({ debugRx(false) @@ -102,6 +107,21 @@ object MetricsSpecification : Spek({ } } + describe("Processing time") { + it("should gather processing time metric") { + val delay = Duration.ofMillis(10) + val sut = vesHvWithDelayingSink(delay) + + sut.handleConnection(vesWireFrameMessage(PERF3GPP)) + + + val metrics = sut.metrics + assertThat(metrics.lastProcessingTimeMicros) + .describedAs("processingTime metric") + .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0) + } + } + describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { val sut = vesHvWithNoOpSink(basicConfiguration) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 0c1b589b..7ebbfba0 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -77,3 +77,8 @@ fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConf Sut(NoOpSink()).apply { configurationProvider.updateConfiguration(collectorConfiguration) } + +fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = + Sut(ProcessingSink { it.delayElements(delay) }).apply { + configurationProvider.updateConfiguration(collectorConfiguration) + } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index 9ddb7115..660ce498 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -20,7 +20,11 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import java.time.Duration +import java.time.Instant import java.util.concurrent.ConcurrentHashMap import kotlin.test.fail @@ -31,6 +35,7 @@ import kotlin.test.fail class FakeMetrics : Metrics { var bytesReceived: Int = 0 var messageBytesReceived: Int = 0 + var lastProcessingTimeMicros: Double = -1.0 var messagesSentCount: Int = 0 var messagesDroppedCount: Int = 0 @@ -41,13 +46,16 @@ class FakeMetrics : Metrics { bytesReceived += size } - override fun notifyMessageReceived(size: Int) { - messageBytesReceived += size + override fun notifyMessageReceived(msg: WireFrameMessage) { + messageBytesReceived += msg.payloadSize } - override fun notifyMessageSent(topic: String) { + override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ - messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } + messagesSentToTopic.compute(msg.topic) { k, _ -> + messagesSentToTopic[k]?.inc() ?: 1 + } + lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 } override fun notifyMessageDropped(cause: MessageDropCause) { @@ -61,4 +69,4 @@ class FakeMetrics : Metrics { fun messagesDropped(cause: MessageDropCause) = messagesDroppedCause[cause] ?: fail("No messages were dropped due to cause: ${cause.name}") -} \ No newline at end of file +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 865dd510..2f731f53 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -19,12 +19,15 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.reactivestreams.Publisher import reactor.core.publisher.Flux import java.util.* import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicLong +import java.util.function.Function /** * @author Piotr Jaszczyk @@ -58,6 +61,9 @@ class CountingSink : Sink { } } -class NoOpSink : Sink { - override fun send(messages: Flux): Flux = messages + +open class ProcessingSink(val transformer: (Flux) -> Publisher) : Sink { + override fun send(messages: Flux): Flux = messages.transform(transformer) } + +class NoOpSink : ProcessingSink(::identity) -- cgit 1.2.3-korg