From 8c180a2101f54d7cc0e3527c2bbe23390eea9cef Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Mon, 17 Dec 2018 14:25:56 +0100 Subject: Add metric for rejected clients count - renamed few counters to be more verbose about what they count - removed not needed 'total' suffix in metrics name Change-Id: I6be0201e5f39f1298706c536b12410413d49df19 Issue-ID: DCAEGEN2-1043 Signed-off-by: Filip Krzywka --- .../dcae/collectors/veshv/boundary/adapters.kt | 2 + .../dcae/collectors/veshv/impl/VesHvCollector.kt | 5 +- .../veshv/impl/wire/WireFrameException.kt | 2 +- .../collectors/veshv/model/MessageDropCause.kt | 29 --------- .../veshv/model/stream_interruption_cause.kt | 53 ++++++++++++++++ .../veshv/tests/component/MetricsSpecification.kt | 34 +++++++++- .../dcae/collectors/veshv/tests/fakes/metrics.kt | 18 ++++-- .../veshv/main/metrics/MicrometerMetrics.kt | 36 +++++++---- .../collectors/veshv/main/MicrometerMetricsTest.kt | 74 ++++++++++++++++------ 9 files changed, 184 insertions(+), 69 deletions(-) delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt (limited to 'sources') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 1334738a..61d28c2b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -35,6 +36,7 @@ interface Metrics { fun notifyMessageReceived(msg: WireFrameMessage) fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) + fun notifyClientRejected(cause: ClientRejectionCause) } @FunctionalInterface diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 51f894d3..5c3f339c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -60,7 +61,9 @@ internal class VesHvCollector( .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(clientContext, it) } + .onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) } .doFinally { releaseBuffersMemory() } .then() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt index 83a7cd85..81845400 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt @@ -25,5 +25,5 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError * @author Piotr Jaszczyk * @since June 2018 */ -class WireFrameException(error: WireFrameDecodingError) +class WireFrameException(val error: WireFrameDecodingError) : Exception("${error::class.simpleName}: ${error.message}") diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt deleted file mode 100644 index af43ae67..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -/** - * @author Jakub Dudycz - * @since December 2018 - */ -enum class MessageDropCause(val tag: String) { - ROUTE_NOT_FOUND("routing"), - INVALID_MESSAGE("invalid") -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt new file mode 100644 index 00000000..836eab53 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import org.onap.dcae.collectors.veshv.domain.InvalidWireFrameMarker +import org.onap.dcae.collectors.veshv.domain.PayloadSizeExceeded +import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException + +/** + * @author Jakub Dudycz + * @since December 2018 + */ +enum class MessageDropCause(val tag: String) { + ROUTE_NOT_FOUND("routing"), + INVALID_MESSAGE("invalid") +} + +enum class ClientRejectionCause(val tag: String) { + INVALID_WIRE_FRAME_MARKER("invalid_marker"), + PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE("too_big_payload"), + UNEXPECTED_STREAM_ERROR("unexpected"); + + companion object { + fun fromThrowable(err: Throwable): ClientRejectionCause = + when (err) { + is WireFrameException -> fromWireFrameException(err) + else -> UNEXPECTED_STREAM_ERROR + } + + private fun fromWireFrameException(err: WireFrameException) = when (err.error) { + is InvalidWireFrameMarker -> INVALID_WIRE_FRAME_MARKER + is PayloadSizeExceeded -> PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE + else -> UNEXPECTED_STREAM_ERROR + } + } +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index 9f5c37e1..572cc796 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -23,18 +23,23 @@ import com.google.protobuf.ByteString import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader +import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload @@ -153,7 +158,7 @@ object MetricsSpecification : Spek({ .isEqualTo(1) } - it("should gather summed metrics for dropped messages"){ + it("should gather summed metrics for dropped messages") { val sut = vesHvWithNoOpSink(basicConfiguration) sut.handleConnection( @@ -168,4 +173,31 @@ object MetricsSpecification : Spek({ .isEqualTo(2) } } + + describe("clients rejected metrics") { + + given("rejection causes") { + mapOf( + ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to + messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1), + ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame() + ).forEach { cause, vesMessage -> + on("cause $cause") { + it("should notify correct metrics") { + val sut = vesHvWithNoOpSink() + + sut.handleConnection(vesMessage) + + val metrics = sut.metrics + assertThat(metrics.clientRejectionCause.size) + .describedAs("metrics were notified with only one rejection cause") + .isOne() + assertThat(metrics.clientRejectionCause.get(cause)) + .describedAs("metrics were notified only once with correct client rejection cause") + .isOne() + } + } + } + } + } }) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index 660ce498..a27d167a 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import java.time.Duration import java.time.Instant +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import java.util.concurrent.ConcurrentHashMap import kotlin.test.fail @@ -33,14 +34,15 @@ import kotlin.test.fail * @since June 2018 */ class FakeMetrics : Metrics { - var bytesReceived: Int = 0 - var messageBytesReceived: Int = 0 - var lastProcessingTimeMicros: Double = -1.0 - var messagesSentCount: Int = 0 - var messagesDroppedCount: Int = 0 - private val messagesSentToTopic: MutableMap = ConcurrentHashMap() + var bytesReceived: Int = 0 ; private set + var messageBytesReceived: Int = 0 ; private set + var messagesDroppedCount: Int = 0 ; private set + var lastProcessingTimeMicros: Double = -1.0 ; private set private val messagesDroppedCause: MutableMap = ConcurrentHashMap() + var messagesSentCount: Int = 0 ; private set + val messagesSentToTopic: MutableMap = ConcurrentHashMap() + var clientRejectionCause = mutableMapOf() ; private set override fun notifyBytesReceived(size: Int) { bytesReceived += size @@ -63,6 +65,10 @@ class FakeMetrics : Metrics { messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 } } + override fun notifyClientRejected(cause: ClientRejectionCause) { + clientRejectionCause.compute(cause) { k, _ -> clientRejectionCause[k]?.inc() ?: 1 } + } + fun messagesOnTopic(topic: String) = messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic") diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 259fa037..f060426d 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -31,6 +31,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import java.time.Duration import java.time.Instant @@ -47,18 +48,24 @@ class MicrometerMetrics internal constructor( private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) - private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL)) - private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL)) - private val sentCount = { topic: String -> - registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic) + private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) + private val sentToTopicCount = { topic: String -> + registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic) }.memoize() - private val droppedCount = { cause: String -> - registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause) + private val droppedCount = registry.counter(name(MESSAGES, DROPPED, COUNT)) + private val droppedCauseCount = { cause: String -> + registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause) }.memoize() + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) + private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT)) + private val clientsRejectedCauseCount = { cause: String -> + registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause) + }.memoize() + init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) @@ -70,6 +77,7 @@ class MicrometerMetrics internal constructor( JvmThreadMetrics().bindTo(registry) } + val metricsProvider = MicrometerPrometheusMetricsProvider(registry) override fun notifyBytesReceived(size: Int) { @@ -83,13 +91,18 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { sentCountTotal.increment() - sentCount(msg.topic).increment() + sentToTopicCount(msg.topic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now())) } override fun notifyMessageDropped(cause: MessageDropCause) { - droppedCountTotal.increment() - droppedCount(cause.tag).increment() + droppedCount.increment() + droppedCauseCount(cause.tag).increment() + } + + override fun notifyClientRejected(cause: ClientRejectionCause) { + clientsRejectedCount.increment() + clientsRejectedCauseCount(cause.tag).increment() } companion object { @@ -102,10 +115,11 @@ class MicrometerMetrics internal constructor( internal const val DATA = "data" internal const val SENT = "sent" internal const val PROCESSING = "processing" + internal const val CAUSE = "cause" + internal const val CLIENTS = "clients" + internal const val REJECTED = "rejected" internal const val TOPIC = "topic" internal const val DROPPED = "dropped" - internal const val CAUSE = "cause" - internal const val TOTAL = "total" internal const val TIME = "time" fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index cb5cfc70..2ecdb26b 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -36,6 +36,8 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame @@ -60,7 +62,7 @@ object MicrometerMetricsTest : Spek({ cut = MicrometerMetrics(registry) } - fun registrySearch() = RequiredSearch.`in`(registry) + fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName) fun verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = Try { @@ -71,16 +73,16 @@ object MicrometerMetricsTest : Spek({ ) fun verifyGauge(name: String, verifier: (Gauge) -> T) = - verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier) fun verifyTimer(name: String, verifier: (Timer) -> T) = - verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier) + verifyMeter(registrySearch(name), RequiredSearch::timer, verifier) fun verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = verifyMeter(search, RequiredSearch::counter, verifier) fun verifyCounter(name: String, verifier: (Counter) -> T) = - verifyCounter(registrySearch().name(name), verifier) + verifyCounter(registrySearch(name), verifier) fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { registry.meters @@ -153,8 +155,8 @@ object MicrometerMetricsTest : Spek({ val topicName1 = "PERF3GPP" val topicName2 = "CALLTRACE" - on("$PREFIX.messages.sent.count.total counter") { - val counterName = "$PREFIX.messages.sent.count.total" + on("$PREFIX.messages.sent.count counter") { + val counterName = "$PREFIX.messages.sent.count" it("should increment counter") { cut.notifyMessageSent(routedMessage(topicName1)) @@ -162,22 +164,22 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic") + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count") } } on("$PREFIX.messages.sent.topic.count counter") { - val counterName = "$PREFIX.messages.sent.count.topic" + val counterName = "$PREFIX.messages.sent.topic.count" it("should handle counters for different topics") { cut.notifyMessageSent(routedMessage(topicName1)) cut.notifyMessageSent(routedMessage(topicName2)) cut.notifyMessageSent(routedMessage(topicName2)) - verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { + verifyCounter(registrySearch(counterName).tag("topic", topicName1)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { + verifyCounter(registrySearch(counterName).tag("topic", topicName2)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -196,16 +198,16 @@ object MicrometerMetricsTest : Spek({ } verifyAllCountersAreUnchangedBut( counterName, - "$PREFIX.messages.sent.count.topic", - "$PREFIX.messages.sent.count.total") + "$PREFIX.messages.sent.topic.count", + "$PREFIX.messages.sent.count") } } } describe("notifyMessageDropped") { - on("$PREFIX.messages.dropped.count.total counter") { - val counterName = "$PREFIX.messages.dropped.count.total" + on("$PREFIX.messages.dropped.count counter") { + val counterName = "$PREFIX.messages.dropped.count" it("should increment counter") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) @@ -213,22 +215,22 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause") + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count") } } - on("$PREFIX.messages.dropped.count.cause counter") { - val counterName = "$PREFIX.messages.dropped.count.cause" + on("$PREFIX.messages.dropped.cause.count counter") { + val counterName = "$PREFIX.messages.dropped.cause.count" it("should handle counters for different drop reasons") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) cut.notifyMessageDropped(INVALID_MESSAGE) - verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { + verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) { + verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -267,6 +269,38 @@ object MicrometerMetricsTest : Spek({ } } + describe("notifyClientRejected") { + + on("$PREFIX.clients.rejected.count") { + val counterName = "$PREFIX.clients.rejected.count" + it("should increment counter for each possible reason") { + cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER) + cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) + + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) + } + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count") + } + } + + on("$PREFIX.clients.rejected.cause.count counter") { + val counterName = "$PREFIX.clients.rejected.cause.count" + it("should handle counters for different rejection reasons") { + cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER) + cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) + cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) + + verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) { + assertThat(it.count()).isCloseTo(1.0, doublePrecision) + } + + verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) + } + } + } + } }) fun routedMessage(topic: String, partition: Int = 0) = @@ -279,4 +313,4 @@ fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) = vesEvent().let {evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) - } \ No newline at end of file + } -- cgit 1.2.3-korg