summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-12-17 16:03:10 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2018-12-18 12:27:09 +0100
commit4ab95420e42f6df59bd4851eee41be6579bdbbe1 (patch)
tree09b81b772a4050d92c90bf6a6e09ecdf6c6db402 /sources/hv-collector-main
parent30488f1922f789c5b8e18934456968aa354c9671 (diff)
Add metrics for active connections count
* Fix and refactor gauges tests in MicrometerMetricsTests as they were not executing * Fix client disconnection handler in NettyTcpServer * Add metrics gauge and counters required to measure active connections Change-Id: I5620d398525c6859679cd5a49dc55a9fefd8b592 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1041
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt27
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt2
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt136
3 files changed, 129 insertions, 36 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index d35e17d6..288145aa 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -50,10 +50,13 @@ class MicrometerMetrics internal constructor(
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
+ private val connectionsTotalCount = registry.counter(name(CONNECTIONS, TOTAL, COUNT))
+ private val disconnectionsCount = registry.counter(name(DISCONNECTIONS, COUNT))
+
private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME))
- private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+ private val sentCount = registry.counter(name(MESSAGES, SENT, COUNT))
private val sentToTopicCount = { topic: String ->
registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
}.memoize<String, Counter>()
@@ -70,8 +73,13 @@ class MicrometerMetrics internal constructor(
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
- (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+ (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
+ }
+
+ registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
+ (connectionsTotalCount.count() - disconnectionsCount.count()).coerceAtLeast(0.0)
}
+
ClassLoaderMetrics().bindTo(registry)
JvmMemoryMetrics().bindTo(registry)
JvmGcMetrics().bindTo(registry)
@@ -79,7 +87,6 @@ class MicrometerMetrics internal constructor(
JvmThreadMetrics().bindTo(registry)
}
-
val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
override fun notifyBytesReceived(size: Int) {
@@ -93,7 +100,7 @@ class MicrometerMetrics internal constructor(
override fun notifyMessageSent(msg: RoutedMessage) {
val now = Instant.now()
- sentCountTotal.increment()
+ sentCount.increment()
sentToTopicCount(msg.topic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
@@ -110,11 +117,22 @@ class MicrometerMetrics internal constructor(
clientsRejectedCauseCount(cause.tag).increment()
}
+ override fun notifyClientConnected() {
+ connectionsTotalCount.increment()
+ }
+
+ override fun notifyClientDisconnected() {
+ disconnectionsCount.increment()
+ }
+
companion object {
val INSTANCE = MicrometerMetrics()
internal const val PREFIX = "hvves"
internal const val MESSAGES = "messages"
internal const val RECEIVED = "received"
+ internal const val DISCONNECTIONS = "disconnections"
+ internal const val CONNECTIONS = "connections"
+ internal const val ACTIVE = "active"
internal const val BYTES = "bytes"
internal const val COUNT = "count"
internal const val DATA = "data"
@@ -125,6 +143,7 @@ class MicrometerMetrics internal constructor(
internal const val REJECTED = "rejected"
internal const val TOPIC = "topic"
internal const val DROPPED = "dropped"
+ internal const val TOTAL = "total"
internal const val TIME = "time"
internal const val LATENCY = "latency"
internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index b35dc53d..f9be546a 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -44,7 +44,7 @@ object VesServer : ServerStarter() {
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
- return ServerFactory.createNettyTcpServer(config, collectorProvider)
+ return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE)
}
override fun serverStartedMessage(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 71fc8f7f..24355d5d 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
-import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
@@ -35,10 +34,10 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -46,6 +45,8 @@ import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
import java.time.Instant
+import io.micrometer.core.instrument.Meter
+
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
import kotlin.reflect.KClass
@@ -56,7 +57,6 @@ import kotlin.reflect.KClass
*/
object MicrometerMetricsTest : Spek({
val doublePrecision = Percentage.withPercentage(0.5)
- val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
lateinit var registry: PrometheusMeterRegistry
lateinit var cut: MicrometerMetrics
@@ -87,6 +87,7 @@ object MicrometerMetricsTest : Spek({
fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
verifyCounter(registrySearch(name), verifier)
+
fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
fun <T : Meter> verifyAllMetersAreUnchangedBut(
clazz: KClass<T>,
@@ -98,7 +99,9 @@ object MicrometerMetricsTest : Spek({
.map { it as T }
.filterNot { it.id.name in changedCounters }
.forEach {
- assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+ assertThat(valueOf(it))
+ .describedAs(it.id.toString())
+ .isCloseTo(0.0, doublePrecision)
}
}
@@ -108,8 +111,8 @@ object MicrometerMetricsTest : Spek({
}
}
- describe("notifyBytesReceived") {
+ describe("notifyBytesReceived") {
on("$PREFIX.data.received.bytes counter") {
val counterName = "$PREFIX.data.received.bytes"
@@ -187,6 +190,7 @@ object MicrometerMetricsTest : Spek({
on("$PREFIX.messages.sent.topic.count counter") {
val counterName = "$PREFIX.messages.sent.topic.count"
+
it("should handle counters for different topics") {
cut.notifyMessageSent(routedMessage(topicName1))
cut.notifyMessageSent(routedMessage(topicName2))
@@ -242,13 +246,12 @@ object MicrometerMetricsTest : Spek({
"$PREFIX.messages.processing.time")
}
}
-
}
describe("notifyMessageDropped") {
-
on("$PREFIX.messages.dropped.count counter") {
val counterName = "$PREFIX.messages.dropped.count"
+
it("should increment counter") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -262,6 +265,7 @@ object MicrometerMetricsTest : Spek({
on("$PREFIX.messages.dropped.cause.count counter") {
val counterName = "$PREFIX.messages.dropped.cause.count"
+
it("should handle counters for different drop reasons") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -278,36 +282,38 @@ object MicrometerMetricsTest : Spek({
}
}
- describe("processing gauge") {
- it("should show difference between sent and received messages") {
+ describe("notifyClientConnected") {
+ on("$PREFIX.connections.total.count counter") {
+ val counterName = "$PREFIX.connections.total.count"
- on("positive difference") {
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
- cut.notifyMessageSent(routedMessage("perf3gpp"))
- verifyGauge("messages.processing.count") {
- assertThat(it.value()).isCloseTo(2.0, doublePrecision)
- }
- }
+ it("should increment counter") {
+ cut.notifyClientConnected()
+ cut.notifyClientConnected()
- on("zero difference") {
- cut.notifyMessageReceived(emptyWireProtocolFrame())
- cut.notifyMessageSent(routedMessage("perf3gpp"))
- verifyGauge("messages.processing.count") {
- assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
+ }
- on("negative difference") {
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
- cut.notifyMessageSent(routedMessage("fault"))
- cut.notifyMessageSent(routedMessage("perf3gpp"))
- verifyGauge("messages.processing.count") {
- assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+
+ describe("notifyClientDisconnected") {
+ on("$PREFIX.disconnections.count counter") {
+ val counterName = "$PREFIX.disconnections.count"
+
+ it("should increment counter") {
+ cut.notifyClientDisconnected()
+ cut.notifyClientDisconnected()
+
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
}
+
}
describe("notifyClientRejected") {
@@ -342,6 +348,74 @@ object MicrometerMetricsTest : Spek({
}
}
}
+
+ describe("$PREFIX.messages.processing.count gauge") {
+ val gaugeName = "$PREFIX.messages.processing.count"
+
+ on("message traffic") {
+ it("should calculate positive difference between sent and received messages") {
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+
+ it("should calculate no difference between sent and received messages") {
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
+ cut.notifyMessageSent(routedMessage("fault"))
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ it("should calculate negative difference between sent and received messages") {
+ cut.notifyMessageSent(routedMessage("fault"))
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+ }
+ }
+
+ describe("$PREFIX.connections.active.count gauge") {
+ val gaugeName = "$PREFIX.connections.active.count"
+
+ on("connection traffic") {
+ it("should calculate positive difference between connected and disconnected clients") {
+ cut.notifyClientConnected()
+ cut.notifyClientConnected()
+ cut.notifyClientConnected()
+ cut.notifyClientDisconnected()
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+
+ it("should calculate no difference between connected and disconnected clients") {
+ cut.notifyClientDisconnected()
+ cut.notifyClientDisconnected()
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ it("should calculate negative difference between connected and disconnected clients") {
+ cut.notifyClientDisconnected()
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+ }
+ }
})
fun routedMessage(topic: String, partition: Int = 0) =
@@ -364,4 +438,4 @@ fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
}.let { evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- }
+ } \ No newline at end of file