summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-17 12:59:49 +0000
committerGerrit Code Review <gerrit@onap.org>2018-12-17 12:59:49 +0000
commitfb040c0df8ab2b74d02b67feda4e2a161a1311d2 (patch)
tree2eee93d6fc20b29d63597dc4793dd0b55d245cc1
parentbca686a94396f704ecca299d7fe2be861a46557f (diff)
parent8952e9970470b683773bfe3a8f40a10881a3f321 (diff)
Merge "Add metrics for dropped messages"
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt23
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt29
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt71
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt12
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt26
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt26
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt86
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt66
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt13
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt8
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt6
12 files changed, 269 insertions, 99 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index b686b250..3f69c088 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
@@ -32,6 +33,7 @@ interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(size: Int)
fun notifyMessageSent(topic: String)
+ fun notifyMessageDropped(cause: MessageDropCause)
}
@FunctionalInterface
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index ca1605e6..b29432f0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Either
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -29,9 +28,15 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import reactor.core.publisher.Flux
@@ -66,7 +71,11 @@ internal class VesHvCollector(
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
- .filterFailedWithLog(MessageValidator::validateFrameMessage)
+ .filterFailedWithLog {
+ MessageValidator
+ .validateFrameMessage(it)
+ .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ }
private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
.map(WireFrameMessage::payload)
@@ -74,12 +83,17 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
+ .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
.filterFailedWithLog(logger, clientContext::fullMdc,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
- .filterFailedWithLog(MessageValidator::validateProtobufMessage)
+ .filterFailedWithLog {
+ MessageValidator
+ .validateProtobufMessage(it)
+ .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ }
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
@@ -88,6 +102,7 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
+ .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
.filterEmptyWithLog(logger, clientContext::fullMdc,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
@@ -95,7 +110,7 @@ internal class VesHvCollector(
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
- fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+ private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt
new file mode 100644
index 00000000..af43ae67
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since December 2018
+ */
+enum class MessageDropCause(val tag: String) {
+ ROUTE_NOT_FOUND("routing"),
+ INVALID_MESSAGE("invalid")
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 2feca410..dd8acf77 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -25,18 +25,15 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.tests.fakes.NoOpSink
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
-import kotlin.test.fail
+import org.onap.dcae.collectors.veshv.tests.utils.*
object MetricsSpecification : Spek({
debugRx(false)
@@ -45,7 +42,7 @@ object MetricsSpecification : Spek({
it("should sum up all bytes received") {
val sut = vesHvWithNoOpSink()
val vesWireFrameMessage = vesWireFrameMessage()
- val invalidWireFrame = invalidWireFrame()
+ val invalidWireFrame = messageWithInvalidWireFrameHeader()
val bytesSent = invalidWireFrame.readableBytes() +
vesWireFrameMessage.readableBytes()
@@ -93,18 +90,62 @@ object MetricsSpecification : Spek({
)
val metrics = sut.metrics
- assertThat(metrics.messageSentCount)
- .describedAs("messageSentCount metric")
+ assertThat(metrics.messagesSentCount)
+ .describedAs("messagesSentCount metric")
.isEqualTo(3)
- assertThat(messagesOnTopic(metrics, PERF3GPP_TOPIC))
+ assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
.describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
.isEqualTo(2)
- assertThat(messagesOnTopic(metrics, MEASUREMENTS_FOR_VF_SCALING_TOPIC))
+ assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
.describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
.isEqualTo(1)
}
}
-})
-private fun messagesOnTopic(metrics: FakeMetrics, topic: String) =
- metrics.messagesSentToTopic.get(topic) ?: fail("No messages were sent to topic $topic") \ No newline at end of file
+ describe("Messages dropped metrics") {
+ it("should gather metrics for invalid messages") {
+ val sut = vesHvWithNoOpSink(basicConfiguration)
+
+ sut.handleConnection(
+ messageWithInvalidWireFrameHeader(),
+ wireFrameMessageWithInvalidPayload(),
+ vesWireFrameMessage(domain = PERF3GPP),
+ messageWithInvalidListenerVersion()
+ )
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDropped(INVALID_MESSAGE))
+ .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
+ .isEqualTo(3)
+ }
+
+ it("should gather metrics for route not found") {
+ val sut = vesHvWithNoOpSink(basicConfiguration)
+
+ sut.handleConnection(
+ vesWireFrameMessage(domain = PERF3GPP),
+ vesWireFrameMessage(domain = HEARTBEAT)
+ )
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
+ .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
+ .isEqualTo(1)
+ }
+
+ it("should gather summed metrics for dropped messages"){
+ val sut = vesHvWithNoOpSink(basicConfiguration)
+
+ sut.handleConnection(
+ vesWireFrameMessage(domain = PERF3GPP),
+ vesWireFrameMessage(domain = HEARTBEAT),
+ wireFrameMessageWithInvalidPayload()
+ )
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDroppedCount)
+ .describedAs("messagesDroppedCount metric")
+ .isEqualTo(2)
+ }
+ }
+})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index ab59cc2e..338c3734 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -37,11 +37,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
-import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
-import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import org.onap.dcae.collectors.veshv.tests.utils.*
import reactor.core.publisher.Flux
import java.time.Duration
@@ -71,8 +67,8 @@ object VesHvSpecification : Spek({
it("should release memory for each handled and dropped message") {
val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesWireFrameMessage(PERF3GPP)
- val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
+ val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
+ val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
@@ -329,7 +325,7 @@ object VesHvSpecification : Spek({
val handledMessages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP, "first"),
- vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
+ messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
vesWireFrameMessage(PERF3GPP))
assertThat(handledMessages).hasSize(1)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index dd098052..9ddb7115 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -20,8 +20,9 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -29,11 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger
*/
class FakeMetrics : Metrics {
var bytesReceived: Int = 0
-
var messageBytesReceived: Int = 0
+ var messagesSentCount: Int = 0
+ var messagesDroppedCount: Int = 0
- var messageSentCount: Int = 0
- val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+ private val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+ private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap()
override fun notifyBytesReceived(size: Int) {
bytesReceived += size
@@ -44,7 +46,19 @@ class FakeMetrics : Metrics {
}
override fun notifyMessageSent(topic: String) {
- messageSentCount++
- messagesSentToTopic.compute(topic, { k, v -> messagesSentToTopic.get(k)?.inc() ?: 1 })
+ messagesSentCount++
+ messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 }
+ }
+
+ override fun notifyMessageDropped(cause: MessageDropCause) {
+ messagesDroppedCount++
+ messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 }
}
+
+ fun messagesOnTopic(topic: String) =
+ messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic")
+
+ fun messagesDropped(cause: MessageDropCause) =
+ messagesDroppedCause[cause]
+ ?: fail("No messages were dropped due to cause: ${cause.name}")
} \ No newline at end of file
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index cf903591..18678ff3 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -29,6 +29,7 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
/**
@@ -42,7 +43,16 @@ class MicrometerMetrics internal constructor(
private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
- private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+ private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL))
+ private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL))
+
+ private val sentCount = { topic: String ->
+ registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic)
+ }.memoize<String, Counter>()
+
+ private val droppedCount = { cause: String ->
+ registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
+ }.memoize<String, Counter>()
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
@@ -55,10 +65,6 @@ class MicrometerMetrics internal constructor(
JvmThreadMetrics().bindTo(registry)
}
- private val sentCount = { topic: String ->
- registry.counter("hvves.messages.sent.topic.count", "topic", topic)
- }.memoize<String, Counter>()
-
val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
override fun notifyBytesReceived(size: Int) {
@@ -75,6 +81,11 @@ class MicrometerMetrics internal constructor(
sentCount(topic).increment()
}
+ override fun notifyMessageDropped(cause: MessageDropCause) {
+ droppedCountTotal.increment()
+ droppedCount(cause.tag).increment()
+ }
+
companion object {
val INSTANCE = MicrometerMetrics()
internal const val PREFIX = "hvves"
@@ -85,6 +96,11 @@ class MicrometerMetrics internal constructor(
internal const val DATA = "data"
internal const val SENT = "sent"
internal const val PROCESSING = "processing"
+ internal const val TOPIC = "topic"
+ internal const val DROPPED = "dropped"
+ internal const val CAUSE = "cause"
+ internal const val TOTAL = "total"
+
fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 66326ddc..e2dc2f82 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -23,7 +23,6 @@ import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.search.RequiredSearch
-import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.assertj.core.api.Assertions.assertThat
@@ -32,9 +31,10 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -88,8 +88,8 @@ object MicrometerMetricsTest : Spek({
val bytes = 128
cut.notifyBytesReceived(bytes)
- verifyCounter(counterName) { counter ->
- assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
}
}
@@ -107,8 +107,8 @@ object MicrometerMetricsTest : Spek({
it("should increment counter") {
cut.notifyMessageReceived(777)
- verifyCounter(counterName) { counter ->
- assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
}
}
@@ -120,15 +120,18 @@ object MicrometerMetricsTest : Spek({
val bytes = 888
cut.notifyMessageReceived(bytes)
- verifyCounter(counterName) { counter ->
- assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
}
}
}
it("should leave all other counters unchanged") {
cut.notifyMessageReceived(128)
- verifyAllCountersAreUnchangedBut("$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes")
+ verifyAllCountersAreUnchangedBut(
+ "$PREFIX.messages.received.count",
+ "$PREFIX.messages.received.bytes"
+ )
}
}
@@ -136,32 +139,65 @@ object MicrometerMetricsTest : Spek({
val topicName1 = "PERF3GPP"
val topicName2 = "CALLTRACE"
- on("$PREFIX.messages.sent.count counter") {
- val counterName = "$PREFIX.messages.sent.count"
+ on("$PREFIX.messages.sent.count.total counter") {
+ val counterName = "$PREFIX.messages.sent.count.total"
it("should increment counter") {
cut.notifyMessageSent(topicName1)
- verifyCounter(counterName) { counter ->
- assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
}
}
on("$PREFIX.messages.sent.topic.count counter") {
- val counterName = "$PREFIX.messages.sent.topic.count"
+ val counterName = "$PREFIX.messages.sent.count.topic"
it("should handle counters for different topics") {
cut.notifyMessageSent(topicName1)
cut.notifyMessageSent(topicName2)
cut.notifyMessageSent(topicName2)
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { counter ->
- assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
+ assertThat(it.count()).isCloseTo(1.0, doublePrecision)
+ }
+
+ verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+ }
+ }
+
+ describe("notifyMessageDropped") {
+
+ on("$PREFIX.messages.dropped.count.total counter") {
+ val counterName = "$PREFIX.messages.dropped.count.total"
+ it("should increment counter") {
+ cut.notifyMessageDropped(ROUTE_NOT_FOUND)
+ cut.notifyMessageDropped(INVALID_MESSAGE)
+
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
+ }
+ }
+
+ on("$PREFIX.messages.dropped.count.cause counter") {
+ val counterName = "$PREFIX.messages.dropped.count.cause"
+ it("should handle counters for different drop reasons") {
+ cut.notifyMessageDropped(ROUTE_NOT_FOUND)
+ cut.notifyMessageDropped(INVALID_MESSAGE)
+ cut.notifyMessageDropped(INVALID_MESSAGE)
+
+ verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+ assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { counter ->
- assertThat(counter.count()).isCloseTo(2.0, doublePrecision)
+ verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
}
@@ -175,16 +211,16 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageReceived(256)
cut.notifyMessageReceived(256)
cut.notifyMessageSent("perf3gpp")
- verifyGauge("messages.processing.count") { gauge ->
- assertThat(gauge.value()).isCloseTo(2.0, doublePrecision)
+ verifyGauge("messages.processing.count") {
+ assertThat(it.value()).isCloseTo(2.0, doublePrecision)
}
}
on("zero difference") {
cut.notifyMessageReceived(128)
cut.notifyMessageSent("perf3gpp")
- verifyGauge("messages.processing.count") { gauge ->
- assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+ verifyGauge("messages.processing.count") {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
@@ -192,8 +228,8 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageReceived(128)
cut.notifyMessageSent("fault")
cut.notifyMessageSent("perf3gpp")
- verifyGauge("messages.processing.count") { gauge ->
- assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+ verifyGauge("messages.processing.count") {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
}
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index 01e11665..90c4aa13 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
@@ -23,61 +23,67 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.ves.VesEventOuterClass.VesEvent
-
import java.util.UUID.randomUUID
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-private fun ByteBuf.writeValidWireFrameHeaders() {
+private fun validWireFrame() = allocator.buffer().run {
writeByte(0xAA) // always 0xAA
writeByte(0x01) // major version
writeByte(0x00) // minor version
writeZero(RESERVED_BYTE_COUNT) // reserved
- writeShort(0x0001) // content type = GPB
+ writeShort(0x0001) // content type = GPB
+}
+
+private fun invalidWireFrame() = allocator.buffer().run {
+ writeByte(0xAA) // always 0xAA
+ writeByte(0x00) // invalid major version
+ writeByte(0x00) // minor version
+ writeZero(RESERVED_BYTE_COUNT) // reserved
+ writeShort(0x0001) // content type = GPB
+}
+
+fun garbageFrame(): ByteBuf = allocator.buffer().run {
+ writeBytes("the meaning of life is &@)(*_!".toByteArray())
}
fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
id: String = randomUUID().toString(),
- eventFields: ByteString = ByteString.EMPTY): ByteBuf =
- vesWireFrameMessage(vesEvent(domain, id, eventFields))
-
-fun vesWireFrameMessage(vesEvent: VesEvent) =
- allocator.buffer().run {
- writeValidWireFrameHeaders()
+ eventFields: ByteString = ByteString.EMPTY,
+ vesEventListenerVersion: String = "7.0.2"): ByteBuf =
+ vesWireFrameMessage(vesEvent(domain, id, eventFields, vesEventListenerVersion))
+fun vesWireFrameMessage(vesEvent: VesEvent): ByteBuf =
+ validWireFrame().run {
val gpb = vesEvent.toByteString().asReadOnlyByteBuffer()
writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
+ writeBytes(gpb) // ves event as GPB bytes
}
-fun wireFrameMessageWithInvalidPayload(): ByteBuf = allocator.buffer().run {
- writeValidWireFrameHeaders()
-
- val invalidGpb = "some random data".toByteArray(Charsets.UTF_8)
- writeInt(invalidGpb.size) // ves event size in bytes
- writeBytes(invalidGpb)
-}
-
-fun garbageFrame(): ByteBuf = allocator.buffer().run {
- writeBytes("the meaning of life is &@)(*_!".toByteArray())
-}
+fun messageWithInvalidWireFrameHeader(vesEvent: VesEvent = vesEvent()): ByteBuf =
+ invalidWireFrame().run {
+ val gpb = vesEvent.toByteString().asReadOnlyByteBuffer()
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
-fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
- writeByte(0xAA)
- writeByte(0x01) // version major
- writeByte(0x01) // version minor
-}
+fun wireFrameMessageWithInvalidPayload(): ByteBuf =
+ validWireFrame().run {
+ val invalidGpb = "some random data".toByteArray(Charsets.UTF_8)
+ writeInt(invalidGpb.size) // ves event size in bytes
+ writeBytes(invalidGpb)
+ }
-fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
+fun messageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
vesWireFrameMessage(
domain = domain,
eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
)
-
+fun messageWithInvalidListenerVersion() = vesWireFrameMessage(vesEventListenerVersion = "invalid") \ No newline at end of file
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index 569f1a90..cf30d2ce 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
@@ -32,8 +32,12 @@ import java.util.UUID.randomUUID
fun vesEvent(domain: VesEventDomain = PERF3GPP,
id: String = randomUUID().toString(),
- eventFields: ByteString = ByteString.EMPTY
-): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), eventFields)
+ eventFields: ByteString = ByteString.EMPTY,
+ vesEventListenerVersion: String = "7.0.2"
+): VesEventOuterClass.VesEvent = vesEvent(
+ commonHeader(domain, id, vesEventListenerVersion),
+ eventFields
+)
fun vesEvent(commonEventHeader: CommonEventHeader,
eventFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
@@ -44,8 +48,9 @@ fun vesEvent(commonEventHeader: CommonEventHeader,
fun commonHeader(domain: VesEventDomain = PERF3GPP,
id: String = randomUUID().toString(),
- priority: Priority = Priority.NORMAL,
- vesEventListenerVersion: String = "7.0.2"): CommonEventHeader =
+ vesEventListenerVersion: String = "7.0.2",
+ priority: Priority = Priority.NORMAL
+): CommonEventHeader =
CommonEventHeader.newBuilder()
.setVersion("sample-version")
.setDomain(domain.domainName)
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index 7381592d..cb1c6222 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.utils.arrow
import arrow.core.Either
import arrow.core.Option
+import arrow.core.Try
import arrow.core.identity
import arrow.syntax.collections.firstOption
import java.util.concurrent.atomic.AtomicReference
@@ -45,3 +46,10 @@ fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: (
.map { it() }
.filter { it != null }
.firstOption()
+
+
+fun <A, B> Either<A, B>.doOnLeft(action: () -> Unit): Either<A, B> = apply { if (isLeft()) action() }
+
+fun <A> Option<A>.doOnEmpty(action: () -> Unit): Option<A> = apply { if (isEmpty()) action() }
+
+fun <A> Try<A>.doOnFailure(action: () -> Unit): Try<A> = apply { if (isFailure()) action() }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index 95590d9d..e7aca55d 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -25,6 +25,8 @@ import arrow.core.Try
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+typealias MessageEither = Either<() -> String, () -> String>
+
fun <T> Logger.handleReactiveStreamError(
context: MappedDiagnosticContext,
ex: Throwable,
@@ -60,7 +62,7 @@ fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
context: MappedDiagnosticContext,
- predicate: (T) -> Either<() -> String, () -> String>) =
+ predicate: (T) -> MessageEither): Flux<T> =
flatMap { t ->
predicate(t).fold({
logger.warn(context, it)
@@ -69,4 +71,4 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
logger.trace(context, it)
Mono.just<T>(t)
})
- }
+ } \ No newline at end of file