From 4ab95420e42f6df59bd4851eee41be6579bdbbe1 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Mon, 17 Dec 2018 16:03:10 +0100 Subject: Add metrics for active connections count * Fix and refactor gauges tests in MicrometerMetricsTests as they were not executing * Fix client disconnection handler in NettyTcpServer * Add metrics gauge and counters required to measure active connections Change-Id: I5620d398525c6859679cd5a49dc55a9fefd8b592 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1041 --- .../veshv/main/metrics/MicrometerMetrics.kt | 27 +++- .../collectors/veshv/main/servers/VesServer.kt | 2 +- .../collectors/veshv/main/MicrometerMetricsTest.kt | 136 ++++++++++++++++----- 3 files changed, 129 insertions(+), 36 deletions(-) (limited to 'sources/hv-collector-main/src') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index d35e17d6..288145aa 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -50,10 +50,13 @@ class MicrometerMetrics internal constructor( private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) + private val connectionsTotalCount = registry.counter(name(CONNECTIONS, TOTAL, COUNT)) + private val disconnectionsCount = registry.counter(name(DISCONNECTIONS, COUNT)) + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME)) - private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) + private val sentCount = registry.counter(name(MESSAGES, SENT, COUNT)) private val sentToTopicCount = { topic: String -> registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic) }.memoize() @@ -70,8 +73,13 @@ class MicrometerMetrics internal constructor( init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { - (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) + (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0) + } + + registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) { + (connectionsTotalCount.count() - disconnectionsCount.count()).coerceAtLeast(0.0) } + ClassLoaderMetrics().bindTo(registry) JvmMemoryMetrics().bindTo(registry) JvmGcMetrics().bindTo(registry) @@ -79,7 +87,6 @@ class MicrometerMetrics internal constructor( JvmThreadMetrics().bindTo(registry) } - val metricsProvider = MicrometerPrometheusMetricsProvider(registry) override fun notifyBytesReceived(size: Int) { @@ -93,7 +100,7 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { val now = Instant.now() - sentCountTotal.increment() + sentCount.increment() sentToTopicCount(msg.topic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) @@ -110,11 +117,22 @@ class MicrometerMetrics internal constructor( clientsRejectedCauseCount(cause.tag).increment() } + override fun notifyClientConnected() { + connectionsTotalCount.increment() + } + + override fun notifyClientDisconnected() { + disconnectionsCount.increment() + } + companion object { val INSTANCE = MicrometerMetrics() internal const val PREFIX = "hvves" internal const val MESSAGES = "messages" internal const val RECEIVED = "received" + internal const val DISCONNECTIONS = "disconnections" + internal const val CONNECTIONS = "connections" + internal const val ACTIVE = "active" internal const val BYTES = "bytes" internal const val COUNT = "count" internal const val DATA = "data" @@ -125,6 +143,7 @@ class MicrometerMetrics internal constructor( internal const val REJECTED = "rejected" internal const val TOPIC = "topic" internal const val DROPPED = "dropped" + internal const val TOTAL = "total" internal const val TIME = "time" internal const val LATENCY = "latency" internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index b35dc53d..f9be546a 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -44,7 +44,7 @@ object VesServer : ServerStarter() { config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() - return ServerFactory.createNettyTcpServer(config, collectorProvider) + return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE) } override fun serverStartedMessage(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 71fc8f7f..24355d5d 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge -import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig @@ -35,10 +34,10 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame @@ -46,6 +45,8 @@ import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize import java.time.Instant +import io.micrometer.core.instrument.Meter + import java.time.temporal.Temporal import java.util.concurrent.TimeUnit import kotlin.reflect.KClass @@ -56,7 +57,6 @@ import kotlin.reflect.KClass */ object MicrometerMetricsTest : Spek({ val doublePrecision = Percentage.withPercentage(0.5) - val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time") lateinit var registry: PrometheusMeterRegistry lateinit var cut: MicrometerMetrics @@ -87,6 +87,7 @@ object MicrometerMetricsTest : Spek({ fun verifyCounter(name: String, verifier: (Counter) -> T) = verifyCounter(registrySearch(name), verifier) + fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) { fun verifyAllMetersAreUnchangedBut( clazz: KClass, @@ -98,7 +99,9 @@ object MicrometerMetricsTest : Spek({ .map { it as T } .filterNot { it.id.name in changedCounters } .forEach { - assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) + assertThat(valueOf(it)) + .describedAs(it.id.toString()) + .isCloseTo(0.0, doublePrecision) } } @@ -108,8 +111,8 @@ object MicrometerMetricsTest : Spek({ } } - describe("notifyBytesReceived") { + describe("notifyBytesReceived") { on("$PREFIX.data.received.bytes counter") { val counterName = "$PREFIX.data.received.bytes" @@ -187,6 +190,7 @@ object MicrometerMetricsTest : Spek({ on("$PREFIX.messages.sent.topic.count counter") { val counterName = "$PREFIX.messages.sent.topic.count" + it("should handle counters for different topics") { cut.notifyMessageSent(routedMessage(topicName1)) cut.notifyMessageSent(routedMessage(topicName2)) @@ -242,13 +246,12 @@ object MicrometerMetricsTest : Spek({ "$PREFIX.messages.processing.time") } } - } describe("notifyMessageDropped") { - on("$PREFIX.messages.dropped.count counter") { val counterName = "$PREFIX.messages.dropped.count" + it("should increment counter") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) @@ -262,6 +265,7 @@ object MicrometerMetricsTest : Spek({ on("$PREFIX.messages.dropped.cause.count counter") { val counterName = "$PREFIX.messages.dropped.cause.count" + it("should handle counters for different drop reasons") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) @@ -278,36 +282,38 @@ object MicrometerMetricsTest : Spek({ } } - describe("processing gauge") { - it("should show difference between sent and received messages") { + describe("notifyClientConnected") { + on("$PREFIX.connections.total.count counter") { + val counterName = "$PREFIX.connections.total.count" - on("positive difference") { - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) - cut.notifyMessageSent(routedMessage("perf3gpp")) - verifyGauge("messages.processing.count") { - assertThat(it.value()).isCloseTo(2.0, doublePrecision) - } - } + it("should increment counter") { + cut.notifyClientConnected() + cut.notifyClientConnected() - on("zero difference") { - cut.notifyMessageReceived(emptyWireProtocolFrame()) - cut.notifyMessageSent(routedMessage("perf3gpp")) - verifyGauge("messages.processing.count") { - assertThat(it.value()).isCloseTo(0.0, doublePrecision) + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) } + verifyCountersAndTimersAreUnchangedBut(counterName) } + } - on("negative difference") { - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) - cut.notifyMessageSent(routedMessage("fault")) - cut.notifyMessageSent(routedMessage("perf3gpp")) - verifyGauge("messages.processing.count") { - assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + + describe("notifyClientDisconnected") { + on("$PREFIX.disconnections.count counter") { + val counterName = "$PREFIX.disconnections.count" + + it("should increment counter") { + cut.notifyClientDisconnected() + cut.notifyClientDisconnected() + + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) } + verifyCountersAndTimersAreUnchangedBut(counterName) } } + } describe("notifyClientRejected") { @@ -342,6 +348,74 @@ object MicrometerMetricsTest : Spek({ } } } + + describe("$PREFIX.messages.processing.count gauge") { + val gaugeName = "$PREFIX.messages.processing.count" + + on("message traffic") { + it("should calculate positive difference between sent and received messages") { + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageSent(routedMessage("perf3gpp")) + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(2.0, doublePrecision) + } + } + + it("should calculate no difference between sent and received messages") { + cut.notifyMessageSent(routedMessage("perf3gpp")) + cut.notifyMessageSent(routedMessage("fault")) + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + + it("should calculate negative difference between sent and received messages") { + cut.notifyMessageSent(routedMessage("fault")) + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } + + describe("$PREFIX.connections.active.count gauge") { + val gaugeName = "$PREFIX.connections.active.count" + + on("connection traffic") { + it("should calculate positive difference between connected and disconnected clients") { + cut.notifyClientConnected() + cut.notifyClientConnected() + cut.notifyClientConnected() + cut.notifyClientDisconnected() + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(2.0, doublePrecision) + } + } + + it("should calculate no difference between connected and disconnected clients") { + cut.notifyClientDisconnected() + cut.notifyClientDisconnected() + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + + it("should calculate negative difference between connected and disconnected clients") { + cut.notifyClientDisconnected() + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } }) fun routedMessage(topic: String, partition: Int = 0) = @@ -364,4 +438,4 @@ fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = }.let { evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } + } \ No newline at end of file -- cgit 1.2.3-korg