diff options
Diffstat (limited to 'sources/hv-collector-main')
2 files changed, 79 insertions, 31 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 259fa037..f060426d 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -31,6 +31,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import java.time.Duration import java.time.Instant @@ -47,18 +48,24 @@ class MicrometerMetrics internal constructor( private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) - private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL)) - private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL)) - private val sentCount = { topic: String -> - registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic) + private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) + private val sentToTopicCount = { topic: String -> + registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic) }.memoize<String, Counter>() - private val droppedCount = { cause: String -> - registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause) + private val droppedCount = registry.counter(name(MESSAGES, DROPPED, COUNT)) + private val droppedCauseCount = { cause: String -> + registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause) }.memoize<String, Counter>() + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) + private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT)) + private val clientsRejectedCauseCount = { cause: String -> + registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause) + }.memoize<String, Counter>() + init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) @@ -70,6 +77,7 @@ class MicrometerMetrics internal constructor( JvmThreadMetrics().bindTo(registry) } + val metricsProvider = MicrometerPrometheusMetricsProvider(registry) override fun notifyBytesReceived(size: Int) { @@ -83,13 +91,18 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { sentCountTotal.increment() - sentCount(msg.topic).increment() + sentToTopicCount(msg.topic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now())) } override fun notifyMessageDropped(cause: MessageDropCause) { - droppedCountTotal.increment() - droppedCount(cause.tag).increment() + droppedCount.increment() + droppedCauseCount(cause.tag).increment() + } + + override fun notifyClientRejected(cause: ClientRejectionCause) { + clientsRejectedCount.increment() + clientsRejectedCauseCount(cause.tag).increment() } companion object { @@ -102,10 +115,11 @@ class MicrometerMetrics internal constructor( internal const val DATA = "data" internal const val SENT = "sent" internal const val PROCESSING = "processing" + internal const val CAUSE = "cause" + internal const val CLIENTS = "clients" + internal const val REJECTED = "rejected" internal const val TOPIC = "topic" internal const val DROPPED = "dropped" - internal const val CAUSE = "cause" - internal const val TOTAL = "total" internal const val TIME = "time" fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index cb5cfc70..2ecdb26b 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -36,6 +36,8 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame @@ -60,7 +62,7 @@ object MicrometerMetricsTest : Spek({ cut = MicrometerMetrics(registry) } - fun registrySearch() = RequiredSearch.`in`(registry) + fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName) fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = Try { @@ -71,16 +73,16 @@ object MicrometerMetricsTest : Spek({ ) fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) = - verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier) fun <T> verifyTimer(name: String, verifier: (Timer) -> T) = - verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier) + verifyMeter(registrySearch(name), RequiredSearch::timer, verifier) fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = verifyMeter(search, RequiredSearch::counter, verifier) fun <T> verifyCounter(name: String, verifier: (Counter) -> T) = - verifyCounter(registrySearch().name(name), verifier) + verifyCounter(registrySearch(name), verifier) fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { registry.meters @@ -153,8 +155,8 @@ object MicrometerMetricsTest : Spek({ val topicName1 = "PERF3GPP" val topicName2 = "CALLTRACE" - on("$PREFIX.messages.sent.count.total counter") { - val counterName = "$PREFIX.messages.sent.count.total" + on("$PREFIX.messages.sent.count counter") { + val counterName = "$PREFIX.messages.sent.count" it("should increment counter") { cut.notifyMessageSent(routedMessage(topicName1)) @@ -162,22 +164,22 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic") + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count") } } on("$PREFIX.messages.sent.topic.count counter") { - val counterName = "$PREFIX.messages.sent.count.topic" + val counterName = "$PREFIX.messages.sent.topic.count" it("should handle counters for different topics") { cut.notifyMessageSent(routedMessage(topicName1)) cut.notifyMessageSent(routedMessage(topicName2)) cut.notifyMessageSent(routedMessage(topicName2)) - verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { + verifyCounter(registrySearch(counterName).tag("topic", topicName1)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { + verifyCounter(registrySearch(counterName).tag("topic", topicName2)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -196,16 +198,16 @@ object MicrometerMetricsTest : Spek({ } verifyAllCountersAreUnchangedBut( counterName, - "$PREFIX.messages.sent.count.topic", - "$PREFIX.messages.sent.count.total") + "$PREFIX.messages.sent.topic.count", + "$PREFIX.messages.sent.count") } } } describe("notifyMessageDropped") { - on("$PREFIX.messages.dropped.count.total counter") { - val counterName = "$PREFIX.messages.dropped.count.total" + on("$PREFIX.messages.dropped.count counter") { + val counterName = "$PREFIX.messages.dropped.count" it("should increment counter") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) @@ -213,22 +215,22 @@ object MicrometerMetricsTest : Spek({ verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause") + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count") } } - on("$PREFIX.messages.dropped.count.cause counter") { - val counterName = "$PREFIX.messages.dropped.count.cause" + on("$PREFIX.messages.dropped.cause.count counter") { + val counterName = "$PREFIX.messages.dropped.cause.count" it("should handle counters for different drop reasons") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) cut.notifyMessageDropped(INVALID_MESSAGE) - verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { + verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) { + verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -267,6 +269,38 @@ object MicrometerMetricsTest : Spek({ } } + describe("notifyClientRejected") { + + on("$PREFIX.clients.rejected.count") { + val counterName = "$PREFIX.clients.rejected.count" + it("should increment counter for each possible reason") { + cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER) + cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) + + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) + } + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count") + } + } + + on("$PREFIX.clients.rejected.cause.count counter") { + val counterName = "$PREFIX.clients.rejected.cause.count" + it("should handle counters for different rejection reasons") { + cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER) + cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) + cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) + + verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) { + assertThat(it.count()).isCloseTo(1.0, doublePrecision) + } + + verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) + } + } + } + } }) fun routedMessage(topic: String, partition: Int = 0) = @@ -279,4 +313,4 @@ fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) = vesEvent().let {evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) - }
\ No newline at end of file + } |