aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt36
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt74
2 files changed, 79 insertions, 31 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 259fa037..f060426d 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -31,6 +31,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import java.time.Duration
import java.time.Instant
@@ -47,18 +48,24 @@ class MicrometerMetrics internal constructor(
private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
- private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL))
- private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL))
- private val sentCount = { topic: String ->
- registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic)
+ private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+ private val sentToTopicCount = { topic: String ->
+ registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
}.memoize<String, Counter>()
- private val droppedCount = { cause: String ->
- registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
+ private val droppedCount = registry.counter(name(MESSAGES, DROPPED, COUNT))
+ private val droppedCauseCount = { cause: String ->
+ registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause)
}.memoize<String, Counter>()
+
private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
+ private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT))
+ private val clientsRejectedCauseCount = { cause: String ->
+ registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause)
+ }.memoize<String, Counter>()
+
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
(receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
@@ -70,6 +77,7 @@ class MicrometerMetrics internal constructor(
JvmThreadMetrics().bindTo(registry)
}
+
val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
override fun notifyBytesReceived(size: Int) {
@@ -83,13 +91,18 @@ class MicrometerMetrics internal constructor(
override fun notifyMessageSent(msg: RoutedMessage) {
sentCountTotal.increment()
- sentCount(msg.topic).increment()
+ sentToTopicCount(msg.topic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
}
override fun notifyMessageDropped(cause: MessageDropCause) {
- droppedCountTotal.increment()
- droppedCount(cause.tag).increment()
+ droppedCount.increment()
+ droppedCauseCount(cause.tag).increment()
+ }
+
+ override fun notifyClientRejected(cause: ClientRejectionCause) {
+ clientsRejectedCount.increment()
+ clientsRejectedCauseCount(cause.tag).increment()
}
companion object {
@@ -102,10 +115,11 @@ class MicrometerMetrics internal constructor(
internal const val DATA = "data"
internal const val SENT = "sent"
internal const val PROCESSING = "processing"
+ internal const val CAUSE = "cause"
+ internal const val CLIENTS = "clients"
+ internal const val REJECTED = "rejected"
internal const val TOPIC = "topic"
internal const val DROPPED = "dropped"
- internal const val CAUSE = "cause"
- internal const val TOTAL = "total"
internal const val TIME = "time"
fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index cb5cfc70..2ecdb26b 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -36,6 +36,8 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -60,7 +62,7 @@ object MicrometerMetricsTest : Spek({
cut = MicrometerMetrics(registry)
}
- fun registrySearch() = RequiredSearch.`in`(registry)
+ fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
Try {
@@ -71,16 +73,16 @@ object MicrometerMetricsTest : Spek({
)
fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
- verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+ verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
- verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
+ verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
verifyMeter(search, RequiredSearch::counter, verifier)
fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
- verifyCounter(registrySearch().name(name), verifier)
+ verifyCounter(registrySearch(name), verifier)
fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
registry.meters
@@ -153,8 +155,8 @@ object MicrometerMetricsTest : Spek({
val topicName1 = "PERF3GPP"
val topicName2 = "CALLTRACE"
- on("$PREFIX.messages.sent.count.total counter") {
- val counterName = "$PREFIX.messages.sent.count.total"
+ on("$PREFIX.messages.sent.count counter") {
+ val counterName = "$PREFIX.messages.sent.count"
it("should increment counter") {
cut.notifyMessageSent(routedMessage(topicName1))
@@ -162,22 +164,22 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
}
}
on("$PREFIX.messages.sent.topic.count counter") {
- val counterName = "$PREFIX.messages.sent.count.topic"
+ val counterName = "$PREFIX.messages.sent.topic.count"
it("should handle counters for different topics") {
cut.notifyMessageSent(routedMessage(topicName1))
cut.notifyMessageSent(routedMessage(topicName2))
cut.notifyMessageSent(routedMessage(topicName2))
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
+ verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
+ verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -196,16 +198,16 @@ object MicrometerMetricsTest : Spek({
}
verifyAllCountersAreUnchangedBut(
counterName,
- "$PREFIX.messages.sent.count.topic",
- "$PREFIX.messages.sent.count.total")
+ "$PREFIX.messages.sent.topic.count",
+ "$PREFIX.messages.sent.count")
}
}
}
describe("notifyMessageDropped") {
- on("$PREFIX.messages.dropped.count.total counter") {
- val counterName = "$PREFIX.messages.dropped.count.total"
+ on("$PREFIX.messages.dropped.count counter") {
+ val counterName = "$PREFIX.messages.dropped.count"
it("should increment counter") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -213,22 +215,22 @@ object MicrometerMetricsTest : Spek({
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
}
}
- on("$PREFIX.messages.dropped.count.cause counter") {
- val counterName = "$PREFIX.messages.dropped.count.cause"
+ on("$PREFIX.messages.dropped.cause.count counter") {
+ val counterName = "$PREFIX.messages.dropped.cause.count"
it("should handle counters for different drop reasons") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
cut.notifyMessageDropped(INVALID_MESSAGE)
- verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+ verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+ verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -267,6 +269,38 @@ object MicrometerMetricsTest : Spek({
}
}
+ describe("notifyClientRejected") {
+
+ on("$PREFIX.clients.rejected.count") {
+ val counterName = "$PREFIX.clients.rejected.count"
+ it("should increment counter for each possible reason") {
+ cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
+ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
+ }
+ }
+
+ on("$PREFIX.clients.rejected.cause.count counter") {
+ val counterName = "$PREFIX.clients.rejected.cause.count"
+ it("should handle counters for different rejection reasons") {
+ cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
+ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+
+ verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
+ assertThat(it.count()).isCloseTo(1.0, doublePrecision)
+ }
+
+ verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+ }
+ }
})
fun routedMessage(topic: String, partition: Int = 0) =
@@ -279,4 +313,4 @@ fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
vesEvent().let {evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
- } \ No newline at end of file
+ }