diff options
12 files changed, 269 insertions, 99 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index b686b250..3f69c088 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux @@ -32,6 +33,7 @@ interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(size: Int) fun notifyMessageSent(topic: String) + fun notifyMessageDropped(cause: MessageDropCause) } @FunctionalInterface diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index ca1605e6..b29432f0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Either import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics @@ -29,9 +28,15 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure +import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MessageEither import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux @@ -66,7 +71,11 @@ internal class VesHvCollector( .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux - .filterFailedWithLog(MessageValidator::validateFrameMessage) + .filterFailedWithLog { + MessageValidator + .validateFrameMessage(it) + .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } + } private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux .map(WireFrameMessage::payload) @@ -74,12 +83,17 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } .filterFailedWithLog(logger, clientContext::fullMdc, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux - .filterFailedWithLog(MessageValidator::validateProtobufMessage) + .filterFailedWithLog { + MessageValidator + .validateProtobufMessage(it) + .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } + } private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) @@ -88,6 +102,7 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) + .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) } .filterEmptyWithLog(logger, clientContext::fullMdc, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) @@ -95,7 +110,7 @@ internal class VesHvCollector( private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } - fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = + private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> = filterFailedWithLog(logger, clientContext::fullMdc, predicate) companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt new file mode 100644 index 00000000..af43ae67 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since December 2018 + */ +enum class MessageDropCause(val tag: String) { + ROUTE_NOT_FOUND("routing"), + INVALID_MESSAGE("invalid") +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index 2feca410..dd8acf77 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -25,18 +25,15 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.tests.fakes.NoOpSink -import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.dcae.collectors.veshv.tests.utils.vesEvent -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage -import kotlin.test.fail +import org.onap.dcae.collectors.veshv.tests.utils.* object MetricsSpecification : Spek({ debugRx(false) @@ -45,7 +42,7 @@ object MetricsSpecification : Spek({ it("should sum up all bytes received") { val sut = vesHvWithNoOpSink() val vesWireFrameMessage = vesWireFrameMessage() - val invalidWireFrame = invalidWireFrame() + val invalidWireFrame = messageWithInvalidWireFrameHeader() val bytesSent = invalidWireFrame.readableBytes() + vesWireFrameMessage.readableBytes() @@ -93,18 +90,62 @@ object MetricsSpecification : Spek({ ) val metrics = sut.metrics - assertThat(metrics.messageSentCount) - .describedAs("messageSentCount metric") + assertThat(metrics.messagesSentCount) + .describedAs("messagesSentCount metric") .isEqualTo(3) - assertThat(messagesOnTopic(metrics, PERF3GPP_TOPIC)) + assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC)) .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric") .isEqualTo(2) - assertThat(messagesOnTopic(metrics, MEASUREMENTS_FOR_VF_SCALING_TOPIC)) + assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)) .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric") .isEqualTo(1) } } -}) -private fun messagesOnTopic(metrics: FakeMetrics, topic: String) = - metrics.messagesSentToTopic.get(topic) ?: fail("No messages were sent to topic $topic")
\ No newline at end of file + describe("Messages dropped metrics") { + it("should gather metrics for invalid messages") { + val sut = vesHvWithNoOpSink(basicConfiguration) + + sut.handleConnection( + messageWithInvalidWireFrameHeader(), + wireFrameMessageWithInvalidPayload(), + vesWireFrameMessage(domain = PERF3GPP), + messageWithInvalidListenerVersion() + ) + + val metrics = sut.metrics + assertThat(metrics.messagesDropped(INVALID_MESSAGE)) + .describedAs("messagesDroppedCause $INVALID_MESSAGE metric") + .isEqualTo(3) + } + + it("should gather metrics for route not found") { + val sut = vesHvWithNoOpSink(basicConfiguration) + + sut.handleConnection( + vesWireFrameMessage(domain = PERF3GPP), + vesWireFrameMessage(domain = HEARTBEAT) + ) + + val metrics = sut.metrics + assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND)) + .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric") + .isEqualTo(1) + } + + it("should gather summed metrics for dropped messages"){ + val sut = vesHvWithNoOpSink(basicConfiguration) + + sut.handleConnection( + vesWireFrameMessage(domain = PERF3GPP), + vesWireFrameMessage(domain = HEARTBEAT), + wireFrameMessageWithInvalidPayload() + ) + + val metrics = sut.metrics + assertThat(metrics.messagesDroppedCount) + .describedAs("messagesDroppedCount metric") + .isEqualTo(2) + } + } +}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index ab59cc2e..338c3734 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -37,11 +37,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame -import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage -import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import org.onap.dcae.collectors.veshv.tests.utils.* import reactor.core.publisher.Flux import java.time.Duration @@ -71,8 +67,8 @@ object VesHvSpecification : Spek({ it("should release memory for each handled and dropped message") { val (sut, sink) = vesHvWithStoringSink() val validMessage = vesWireFrameMessage(PERF3GPP) - val msgWithInvalidFrame = invalidWireFrame() - val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) + val msgWithInvalidFrame = messageWithInvalidWireFrameHeader() + val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) val expectedRefCnt = 0 val handledEvents = sut.handleConnection( @@ -329,7 +325,7 @@ object VesHvSpecification : Spek({ val handledMessages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP, "first"), - vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), + messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), vesWireFrameMessage(PERF3GPP)) assertThat(handledMessages).hasSize(1) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index dd098052..9ddb7115 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.model.MessageDropCause import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -29,11 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger */ class FakeMetrics : Metrics { var bytesReceived: Int = 0 - var messageBytesReceived: Int = 0 + var messagesSentCount: Int = 0 + var messagesDroppedCount: Int = 0 - var messageSentCount: Int = 0 - val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap() + private val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap() + private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap() override fun notifyBytesReceived(size: Int) { bytesReceived += size @@ -44,7 +46,19 @@ class FakeMetrics : Metrics { } override fun notifyMessageSent(topic: String) { - messageSentCount++ - messagesSentToTopic.compute(topic, { k, v -> messagesSentToTopic.get(k)?.inc() ?: 1 }) + messagesSentCount++ + messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } + } + + override fun notifyMessageDropped(cause: MessageDropCause) { + messagesDroppedCount++ + messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 } } + + fun messagesOnTopic(topic: String) = + messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic") + + fun messagesDropped(cause: MessageDropCause) = + messagesDroppedCause[cause] + ?: fail("No messages were dropped due to cause: ${cause.name}") }
\ No newline at end of file diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index cf903591..18678ff3 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -29,6 +29,7 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.model.MessageDropCause /** @@ -42,7 +43,16 @@ class MicrometerMetrics internal constructor( private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) - private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) + private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL)) + private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL)) + + private val sentCount = { topic: String -> + registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic) + }.memoize<String, Counter>() + + private val droppedCount = { cause: String -> + registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause) + }.memoize<String, Counter>() init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { @@ -55,10 +65,6 @@ class MicrometerMetrics internal constructor( JvmThreadMetrics().bindTo(registry) } - private val sentCount = { topic: String -> - registry.counter("hvves.messages.sent.topic.count", "topic", topic) - }.memoize<String, Counter>() - val metricsProvider = MicrometerPrometheusMetricsProvider(registry) override fun notifyBytesReceived(size: Int) { @@ -75,6 +81,11 @@ class MicrometerMetrics internal constructor( sentCount(topic).increment() } + override fun notifyMessageDropped(cause: MessageDropCause) { + droppedCountTotal.increment() + droppedCount(cause.tag).increment() + } + companion object { val INSTANCE = MicrometerMetrics() internal const val PREFIX = "hvves" @@ -85,6 +96,11 @@ class MicrometerMetrics internal constructor( internal const val DATA = "data" internal const val SENT = "sent" internal const val PROCESSING = "processing" + internal const val TOPIC = "topic" + internal const val DROPPED = "dropped" + internal const val CAUSE = "cause" + internal const val TOTAL = "total" + fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 66326ddc..e2dc2f82 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -23,7 +23,6 @@ import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.search.RequiredSearch -import io.micrometer.core.instrument.simple.SimpleMeterRegistry import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.assertj.core.api.Assertions.assertThat @@ -32,9 +31,10 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -88,8 +88,8 @@ object MicrometerMetricsTest : Spek({ val bytes = 128 cut.notifyBytesReceived(bytes) - verifyCounter(counterName) { counter -> - assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) } } @@ -107,8 +107,8 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { cut.notifyMessageReceived(777) - verifyCounter(counterName) { counter -> - assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(1.0, doublePrecision) } } } @@ -120,15 +120,18 @@ object MicrometerMetricsTest : Spek({ val bytes = 888 cut.notifyMessageReceived(bytes) - verifyCounter(counterName) { counter -> - assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) } } } it("should leave all other counters unchanged") { cut.notifyMessageReceived(128) - verifyAllCountersAreUnchangedBut("$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes") + verifyAllCountersAreUnchangedBut( + "$PREFIX.messages.received.count", + "$PREFIX.messages.received.bytes" + ) } } @@ -136,32 +139,65 @@ object MicrometerMetricsTest : Spek({ val topicName1 = "PERF3GPP" val topicName2 = "CALLTRACE" - on("$PREFIX.messages.sent.count counter") { - val counterName = "$PREFIX.messages.sent.count" + on("$PREFIX.messages.sent.count.total counter") { + val counterName = "$PREFIX.messages.sent.count.total" it("should increment counter") { cut.notifyMessageSent(topicName1) - verifyCounter(counterName) { counter -> - assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count") + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic") } } on("$PREFIX.messages.sent.topic.count counter") { - val counterName = "$PREFIX.messages.sent.topic.count" + val counterName = "$PREFIX.messages.sent.count.topic" it("should handle counters for different topics") { cut.notifyMessageSent(topicName1) cut.notifyMessageSent(topicName2) cut.notifyMessageSent(topicName2) - verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { counter -> - assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { + assertThat(it.count()).isCloseTo(1.0, doublePrecision) + } + + verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) + } + } + } + } + + describe("notifyMessageDropped") { + + on("$PREFIX.messages.dropped.count.total counter") { + val counterName = "$PREFIX.messages.dropped.count.total" + it("should increment counter") { + cut.notifyMessageDropped(ROUTE_NOT_FOUND) + cut.notifyMessageDropped(INVALID_MESSAGE) + + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) + } + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause") + } + } + + on("$PREFIX.messages.dropped.count.cause counter") { + val counterName = "$PREFIX.messages.dropped.count.cause" + it("should handle counters for different drop reasons") { + cut.notifyMessageDropped(ROUTE_NOT_FOUND) + cut.notifyMessageDropped(INVALID_MESSAGE) + cut.notifyMessageDropped(INVALID_MESSAGE) + + verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { + assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { counter -> - assertThat(counter.count()).isCloseTo(2.0, doublePrecision) + verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } } @@ -175,16 +211,16 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageReceived(256) cut.notifyMessageReceived(256) cut.notifyMessageSent("perf3gpp") - verifyGauge("messages.processing.count") { gauge -> - assertThat(gauge.value()).isCloseTo(2.0, doublePrecision) + verifyGauge("messages.processing.count") { + assertThat(it.value()).isCloseTo(2.0, doublePrecision) } } on("zero difference") { cut.notifyMessageReceived(128) cut.notifyMessageSent("perf3gpp") - verifyGauge("messages.processing.count") { gauge -> - assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + verifyGauge("messages.processing.count") { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } @@ -192,8 +228,8 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageReceived(128) cut.notifyMessageSent("fault") cut.notifyMessageSent("perf3gpp") - verifyGauge("messages.processing.count") { gauge -> - assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + verifyGauge("messages.processing.count") { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } } diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt index 01e11665..90c4aa13 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt @@ -23,61 +23,67 @@ import com.google.protobuf.ByteString import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT import org.onap.dcae.collectors.veshv.domain.VesEventDomain -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT import org.onap.ves.VesEventOuterClass.VesEvent - import java.util.UUID.randomUUID val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT -private fun ByteBuf.writeValidWireFrameHeaders() { +private fun validWireFrame() = allocator.buffer().run { writeByte(0xAA) // always 0xAA writeByte(0x01) // major version writeByte(0x00) // minor version writeZero(RESERVED_BYTE_COUNT) // reserved - writeShort(0x0001) // content type = GPB + writeShort(0x0001) // content type = GPB +} + +private fun invalidWireFrame() = allocator.buffer().run { + writeByte(0xAA) // always 0xAA + writeByte(0x00) // invalid major version + writeByte(0x00) // minor version + writeZero(RESERVED_BYTE_COUNT) // reserved + writeShort(0x0001) // content type = GPB +} + +fun garbageFrame(): ByteBuf = allocator.buffer().run { + writeBytes("the meaning of life is &@)(*_!".toByteArray()) } fun vesWireFrameMessage(domain: VesEventDomain = OTHER, id: String = randomUUID().toString(), - eventFields: ByteString = ByteString.EMPTY): ByteBuf = - vesWireFrameMessage(vesEvent(domain, id, eventFields)) - -fun vesWireFrameMessage(vesEvent: VesEvent) = - allocator.buffer().run { - writeValidWireFrameHeaders() + eventFields: ByteString = ByteString.EMPTY, + vesEventListenerVersion: String = "7.0.2"): ByteBuf = + vesWireFrameMessage(vesEvent(domain, id, eventFields, vesEventListenerVersion)) +fun vesWireFrameMessage(vesEvent: VesEvent): ByteBuf = + validWireFrame().run { val gpb = vesEvent.toByteString().asReadOnlyByteBuffer() writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes + writeBytes(gpb) // ves event as GPB bytes } -fun wireFrameMessageWithInvalidPayload(): ByteBuf = allocator.buffer().run { - writeValidWireFrameHeaders() - - val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) - writeInt(invalidGpb.size) // ves event size in bytes - writeBytes(invalidGpb) -} - -fun garbageFrame(): ByteBuf = allocator.buffer().run { - writeBytes("the meaning of life is &@)(*_!".toByteArray()) -} +fun messageWithInvalidWireFrameHeader(vesEvent: VesEvent = vesEvent()): ByteBuf = + invalidWireFrame().run { + val gpb = vesEvent.toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes + } -fun invalidWireFrame(): ByteBuf = allocator.buffer().run { - writeByte(0xAA) - writeByte(0x01) // version major - writeByte(0x01) // version minor -} +fun wireFrameMessageWithInvalidPayload(): ByteBuf = + validWireFrame().run { + val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) + writeInt(invalidGpb.size) // ves event size in bytes + writeBytes(invalidGpb) + } -fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf = +fun messageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf = vesWireFrameMessage( domain = domain, eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes)) ) - +fun messageWithInvalidListenerVersion() = vesWireFrameMessage(vesEventListenerVersion = "invalid")
\ No newline at end of file diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index 569f1a90..cf30d2ce 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -32,8 +32,12 @@ import java.util.UUID.randomUUID fun vesEvent(domain: VesEventDomain = PERF3GPP, id: String = randomUUID().toString(), - eventFields: ByteString = ByteString.EMPTY -): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), eventFields) + eventFields: ByteString = ByteString.EMPTY, + vesEventListenerVersion: String = "7.0.2" +): VesEventOuterClass.VesEvent = vesEvent( + commonHeader(domain, id, vesEventListenerVersion), + eventFields +) fun vesEvent(commonEventHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent = @@ -44,8 +48,9 @@ fun vesEvent(commonEventHeader: CommonEventHeader, fun commonHeader(domain: VesEventDomain = PERF3GPP, id: String = randomUUID().toString(), - priority: Priority = Priority.NORMAL, - vesEventListenerVersion: String = "7.0.2"): CommonEventHeader = + vesEventListenerVersion: String = "7.0.2", + priority: Priority = Priority.NORMAL +): CommonEventHeader = CommonEventHeader.newBuilder() .setVersion("sample-version") .setDomain(domain.domainName) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index 7381592d..cb1c6222 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either import arrow.core.Option +import arrow.core.Try import arrow.core.identity import arrow.syntax.collections.firstOption import java.util.concurrent.atomic.AtomicReference @@ -45,3 +46,10 @@ fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: ( .map { it() } .filter { it != null } .firstOption() + + +fun <A, B> Either<A, B>.doOnLeft(action: () -> Unit): Either<A, B> = apply { if (isLeft()) action() } + +fun <A> Option<A>.doOnEmpty(action: () -> Unit): Option<A> = apply { if (isEmpty()) action() } + +fun <A> Try<A>.doOnFailure(action: () -> Unit): Try<A> = apply { if (isFailure()) action() } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 95590d9d..e7aca55d 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -25,6 +25,8 @@ import arrow.core.Try import reactor.core.publisher.Flux import reactor.core.publisher.Mono +typealias MessageEither = Either<() -> String, () -> String> + fun <T> Logger.handleReactiveStreamError( context: MappedDiagnosticContext, ex: Throwable, @@ -60,7 +62,7 @@ fun <T> Option<T>.filterEmptyWithLog(logger: Logger, fun <T> Flux<T>.filterFailedWithLog(logger: Logger, context: MappedDiagnosticContext, - predicate: (T) -> Either<() -> String, () -> String>) = + predicate: (T) -> MessageEither): Flux<T> = flatMap { t -> predicate(t).fold({ logger.warn(context, it) @@ -69,4 +71,4 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger, logger.trace(context, it) Mono.just<T>(t) }) - } + }
\ No newline at end of file |