From 8952e9970470b683773bfe3a8f40a10881a3f321 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Fri, 14 Dec 2018 15:20:56 +0100 Subject: Add metrics for dropped messages Add counters for messages dropped due to validation or undefined routing Slight refactoring Change-Id: Ibe4e38445e81babc745d7a7d95356910845293ce Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1037 --- .../veshv/tests/component/MetricsSpecification.kt | 71 +++++++++++++++++----- .../veshv/tests/component/VesHvSpecification.kt | 12 ++-- .../dcae/collectors/veshv/tests/fakes/metrics.kt | 26 ++++++-- 3 files changed, 80 insertions(+), 29 deletions(-) (limited to 'sources/hv-collector-ct/src') diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index 2feca410..dd8acf77 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -25,18 +25,15 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.tests.fakes.NoOpSink -import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.dcae.collectors.veshv.tests.utils.vesEvent -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage -import kotlin.test.fail +import org.onap.dcae.collectors.veshv.tests.utils.* object MetricsSpecification : Spek({ debugRx(false) @@ -45,7 +42,7 @@ object MetricsSpecification : Spek({ it("should sum up all bytes received") { val sut = vesHvWithNoOpSink() val vesWireFrameMessage = vesWireFrameMessage() - val invalidWireFrame = invalidWireFrame() + val invalidWireFrame = messageWithInvalidWireFrameHeader() val bytesSent = invalidWireFrame.readableBytes() + vesWireFrameMessage.readableBytes() @@ -93,18 +90,62 @@ object MetricsSpecification : Spek({ ) val metrics = sut.metrics - assertThat(metrics.messageSentCount) - .describedAs("messageSentCount metric") + assertThat(metrics.messagesSentCount) + .describedAs("messagesSentCount metric") .isEqualTo(3) - assertThat(messagesOnTopic(metrics, PERF3GPP_TOPIC)) + assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC)) .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric") .isEqualTo(2) - assertThat(messagesOnTopic(metrics, MEASUREMENTS_FOR_VF_SCALING_TOPIC)) + assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)) .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric") .isEqualTo(1) } } -}) -private fun messagesOnTopic(metrics: FakeMetrics, topic: String) = - metrics.messagesSentToTopic.get(topic) ?: fail("No messages were sent to topic $topic") \ No newline at end of file + describe("Messages dropped metrics") { + it("should gather metrics for invalid messages") { + val sut = vesHvWithNoOpSink(basicConfiguration) + + sut.handleConnection( + messageWithInvalidWireFrameHeader(), + wireFrameMessageWithInvalidPayload(), + vesWireFrameMessage(domain = PERF3GPP), + messageWithInvalidListenerVersion() + ) + + val metrics = sut.metrics + assertThat(metrics.messagesDropped(INVALID_MESSAGE)) + .describedAs("messagesDroppedCause $INVALID_MESSAGE metric") + .isEqualTo(3) + } + + it("should gather metrics for route not found") { + val sut = vesHvWithNoOpSink(basicConfiguration) + + sut.handleConnection( + vesWireFrameMessage(domain = PERF3GPP), + vesWireFrameMessage(domain = HEARTBEAT) + ) + + val metrics = sut.metrics + assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND)) + .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric") + .isEqualTo(1) + } + + it("should gather summed metrics for dropped messages"){ + val sut = vesHvWithNoOpSink(basicConfiguration) + + sut.handleConnection( + vesWireFrameMessage(domain = PERF3GPP), + vesWireFrameMessage(domain = HEARTBEAT), + wireFrameMessageWithInvalidPayload() + ) + + val metrics = sut.metrics + assertThat(metrics.messagesDroppedCount) + .describedAs("messagesDroppedCount metric") + .isEqualTo(2) + } + } +}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index ab59cc2e..338c3734 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -37,11 +37,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame -import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage -import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import org.onap.dcae.collectors.veshv.tests.utils.* import reactor.core.publisher.Flux import java.time.Duration @@ -71,8 +67,8 @@ object VesHvSpecification : Spek({ it("should release memory for each handled and dropped message") { val (sut, sink) = vesHvWithStoringSink() val validMessage = vesWireFrameMessage(PERF3GPP) - val msgWithInvalidFrame = invalidWireFrame() - val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) + val msgWithInvalidFrame = messageWithInvalidWireFrameHeader() + val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) val expectedRefCnt = 0 val handledEvents = sut.handleConnection( @@ -329,7 +325,7 @@ object VesHvSpecification : Spek({ val handledMessages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP, "first"), - vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), + messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), vesWireFrameMessage(PERF3GPP)) assertThat(handledMessages).hasSize(1) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index dd098052..9ddb7115 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.model.MessageDropCause import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.fail /** * @author Piotr Jaszczyk @@ -29,11 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger */ class FakeMetrics : Metrics { var bytesReceived: Int = 0 - var messageBytesReceived: Int = 0 + var messagesSentCount: Int = 0 + var messagesDroppedCount: Int = 0 - var messageSentCount: Int = 0 - val messagesSentToTopic: MutableMap = ConcurrentHashMap() + private val messagesSentToTopic: MutableMap = ConcurrentHashMap() + private val messagesDroppedCause: MutableMap = ConcurrentHashMap() override fun notifyBytesReceived(size: Int) { bytesReceived += size @@ -44,7 +46,19 @@ class FakeMetrics : Metrics { } override fun notifyMessageSent(topic: String) { - messageSentCount++ - messagesSentToTopic.compute(topic, { k, v -> messagesSentToTopic.get(k)?.inc() ?: 1 }) + messagesSentCount++ + messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } + } + + override fun notifyMessageDropped(cause: MessageDropCause) { + messagesDroppedCount++ + messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 } } + + fun messagesOnTopic(topic: String) = + messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic") + + fun messagesDropped(cause: MessageDropCause) = + messagesDroppedCause[cause] + ?: fail("No messages were dropped due to cause: ${cause.name}") } \ No newline at end of file -- cgit 1.2.3-korg