summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt)24
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt34
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt18
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt36
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt74
8 files changed, 155 insertions, 40 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 1334738a..61d28c2b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,6 +36,7 @@ interface Metrics {
fun notifyMessageReceived(msg: WireFrameMessage)
fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
+ fun notifyClientRejected(cause: ClientRejectionCause)
}
@FunctionalInterface
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 51f894d3..5c3f339c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -60,7 +61,9 @@ internal class VesHvCollector(
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
.transform(::routeMessage)
- .onErrorResume { logger.handleReactiveStreamError(clientContext, it) }
+ .onErrorResume {
+ metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
+ logger.handleReactiveStreamError(clientContext, it) }
.doFinally { releaseBuffersMemory() }
.then()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
index 83a7cd85..81845400 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
@@ -25,5 +25,5 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameException(error: WireFrameDecodingError)
+class WireFrameException(val error: WireFrameDecodingError)
: Exception("${error::class.simpleName}: ${error.message}")
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
index af43ae67..836eab53 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
@@ -19,6 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.model
+import org.onap.dcae.collectors.veshv.domain.InvalidWireFrameMarker
+import org.onap.dcae.collectors.veshv.domain.PayloadSizeExceeded
+import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException
+
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since December 2018
@@ -27,3 +31,23 @@ enum class MessageDropCause(val tag: String) {
ROUTE_NOT_FOUND("routing"),
INVALID_MESSAGE("invalid")
}
+
+enum class ClientRejectionCause(val tag: String) {
+ INVALID_WIRE_FRAME_MARKER("invalid_marker"),
+ PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE("too_big_payload"),
+ UNEXPECTED_STREAM_ERROR("unexpected");
+
+ companion object {
+ fun fromThrowable(err: Throwable): ClientRejectionCause =
+ when (err) {
+ is WireFrameException -> fromWireFrameException(err)
+ else -> UNEXPECTED_STREAM_ERROR
+ }
+
+ private fun fromWireFrameException(err: WireFrameException) = when (err.error) {
+ is InvalidWireFrameMarker -> INVALID_WIRE_FRAME_MARKER
+ is PayloadSizeExceeded -> PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
+ else -> UNEXPECTED_STREAM_ERROR
+ }
+ }
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 9f5c37e1..572cc796 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -23,18 +23,23 @@ import com.google.protobuf.ByteString
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
@@ -153,7 +158,7 @@ object MetricsSpecification : Spek({
.isEqualTo(1)
}
- it("should gather summed metrics for dropped messages"){
+ it("should gather summed metrics for dropped messages") {
val sut = vesHvWithNoOpSink(basicConfiguration)
sut.handleConnection(
@@ -168,4 +173,31 @@ object MetricsSpecification : Spek({
.isEqualTo(2)
}
}
+
+ describe("clients rejected metrics") {
+
+ given("rejection causes") {
+ mapOf(
+ ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
+ messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
+ ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
+ ).forEach { cause, vesMessage ->
+ on("cause $cause") {
+ it("should notify correct metrics") {
+ val sut = vesHvWithNoOpSink()
+
+ sut.handleConnection(vesMessage)
+
+ val metrics = sut.metrics
+ assertThat(metrics.clientRejectionCause.size)
+ .describedAs("metrics were notified with only one rejection cause")
+ .isOne()
+ assertThat(metrics.clientRejectionCause.get(cause))
+ .describedAs("metrics were notified only once with correct client rejection cause")
+ .isOne()
+ }
+ }
+ }
+ }
+ }
})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index 660ce498..a27d167a 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import java.time.Duration
import java.time.Instant
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import java.util.concurrent.ConcurrentHashMap
import kotlin.test.fail
@@ -33,14 +34,15 @@ import kotlin.test.fail
* @since June 2018
*/
class FakeMetrics : Metrics {
- var bytesReceived: Int = 0
- var messageBytesReceived: Int = 0
- var lastProcessingTimeMicros: Double = -1.0
- var messagesSentCount: Int = 0
- var messagesDroppedCount: Int = 0
- private val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+ var bytesReceived: Int = 0 ; private set
+ var messageBytesReceived: Int = 0 ; private set
+ var messagesDroppedCount: Int = 0 ; private set
+ var lastProcessingTimeMicros: Double = -1.0 ; private set
private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap()
+ var messagesSentCount: Int = 0 ; private set
+ val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+ var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>() ; private set
override fun notifyBytesReceived(size: Int) {
bytesReceived += size
@@ -63,6 +65,10 @@ class FakeMetrics : Metrics {
messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 }
}
+ override fun notifyClientRejected(cause: ClientRejectionCause) {
+ clientRejectionCause.compute(cause) { k, _ -> clientRejectionCause[k]?.inc() ?: 1 }
+ }
+
fun messagesOnTopic(topic: String) =
messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic")
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 259fa037..f060426d 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -31,6 +31,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import java.time.Duration
import java.time.Instant
@@ -47,18 +48,24 @@ class MicrometerMetrics internal constructor(
private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
- private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL))
- private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL))
- private val sentCount = { topic: String ->
- registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic)
+ private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+ private val sentToTopicCount = { topic: String ->
+ registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
}.memoize<String, Counter>()
- private val droppedCount = { cause: String ->
- registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
+ private val droppedCount = registry.counter(name(MESSAGES, DROPPED, COUNT))
+ private val droppedCauseCount = { cause: String ->
+ registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause)
}.memoize<String, Counter>()
+
private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
+ private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT))
+ private val clientsRejectedCauseCount = { cause: String ->
+ registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause)
+ }.memoize<String, Counter>()
+
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
(receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
@@ -70,6 +77,7 @@ class MicrometerMetrics internal constructor(
JvmThreadMetrics().bindTo(registry)
}
+
val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
override fun notifyBytesReceived(size: Int) {
@@ -83,13 +91,18 @@ class MicrometerMetrics internal constructor(
override fun notifyMessageSent(msg: RoutedMessage) {
sentCountTotal.increment()
- sentCount(msg.topic).increment()
+ sentToTopicCount(msg.topic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
}
override fun notifyMessageDropped(cause: MessageDropCause) {
- droppedCountTotal.increment()
- droppedCount(cause.tag).increment()
+ droppedCount.increment()
+ droppedCauseCount(cause.tag).increment()
+ }
+
+ override fun notifyClientRejected(cause: ClientRejectionCause) {
+ clientsRejectedCount.increment()
+ clientsRejectedCauseCount(cause.tag).increment()
}
companion object {
@@ -102,10 +115,11 @@ class MicrometerMetrics internal constructor(
internal const val DATA = "data"
internal const val SENT = "sent"
internal const val PROCESSING = "processing"
+ internal const val CAUSE = "cause"
+ internal const val CLIENTS = "clients"
+ internal const val REJECTED = "rejected"
internal const val TOPIC = "topic"
internal const val DROPPED = "dropped"
- internal const val CAUSE = "cause"
- internal const val TOTAL = "total"
internal const val TIME = "time"
fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index cb5cfc70..2ecdb26b 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -36,6 +36,8 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -60,7 +62,7 @@ object MicrometerMetricsTest : Spek({
cut = MicrometerMetrics(registry)
}
- fun registrySearch() = RequiredSearch.`in`(registry)
+ fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
Try {
@@ -71,16 +73,16 @@ object MicrometerMetricsTest : Spek({
)
fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
- verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+ verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
- verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
+ verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
verifyMeter(search, RequiredSearch::counter, verifier)
fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
- verifyCounter(registrySearch().name(name), verifier)
+ verifyCounter(registrySearch(name), verifier)
fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
registry.meters
@@ -153,8 +155,8 @@ object MicrometerMetricsTest : Spek({
val topicName1 = "PERF3GPP"
val topicName2 = "CALLTRACE"
- on("$PREFIX.messages.sent.count.total counter") {
- val counterName = "$PREFIX.messages.sent.count.total"
+ on("$PREFIX.messages.sent.count counter") {
+ val counterName = "$PREFIX.messages.sent.count"
it("should increment counter") {
cut.notifyMessageSent(routedMessage(topicName1))
@@ -162,22 +164,22 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
}
}
on("$PREFIX.messages.sent.topic.count counter") {
- val counterName = "$PREFIX.messages.sent.count.topic"
+ val counterName = "$PREFIX.messages.sent.topic.count"
it("should handle counters for different topics") {
cut.notifyMessageSent(routedMessage(topicName1))
cut.notifyMessageSent(routedMessage(topicName2))
cut.notifyMessageSent(routedMessage(topicName2))
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
+ verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
+ verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -196,16 +198,16 @@ object MicrometerMetricsTest : Spek({
}
verifyAllCountersAreUnchangedBut(
counterName,
- "$PREFIX.messages.sent.count.topic",
- "$PREFIX.messages.sent.count.total")
+ "$PREFIX.messages.sent.topic.count",
+ "$PREFIX.messages.sent.count")
}
}
}
describe("notifyMessageDropped") {
- on("$PREFIX.messages.dropped.count.total counter") {
- val counterName = "$PREFIX.messages.dropped.count.total"
+ on("$PREFIX.messages.dropped.count counter") {
+ val counterName = "$PREFIX.messages.dropped.count"
it("should increment counter") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -213,22 +215,22 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
}
}
- on("$PREFIX.messages.dropped.count.cause counter") {
- val counterName = "$PREFIX.messages.dropped.count.cause"
+ on("$PREFIX.messages.dropped.cause.count counter") {
+ val counterName = "$PREFIX.messages.dropped.cause.count"
it("should handle counters for different drop reasons") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
cut.notifyMessageDropped(INVALID_MESSAGE)
- verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+ verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+ verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -267,6 +269,38 @@ object MicrometerMetricsTest : Spek({
}
}
+ describe("notifyClientRejected") {
+
+ on("$PREFIX.clients.rejected.count") {
+ val counterName = "$PREFIX.clients.rejected.count"
+ it("should increment counter for each possible reason") {
+ cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
+ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
+ }
+ }
+
+ on("$PREFIX.clients.rejected.cause.count counter") {
+ val counterName = "$PREFIX.clients.rejected.cause.count"
+ it("should handle counters for different rejection reasons") {
+ cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
+ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+
+ verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
+ assertThat(it.count()).isCloseTo(1.0, doublePrecision)
+ }
+
+ verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+ }
+ }
})
fun routedMessage(topic: String, partition: Int = 0) =
@@ -279,4 +313,4 @@ fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
vesEvent().let {evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
- } \ No newline at end of file
+ }