aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-12-14 15:20:56 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2018-12-17 13:18:13 +0100
commit8952e9970470b683773bfe3a8f40a10881a3f321 (patch)
treefe31f2eeca7fafcb62ea48dbccc08ad02ec9a5a9 /sources/hv-collector-core/src/main
parente32d91541ffa6c3ec13729032979af8160bdb8dc (diff)
Add metrics for dropped messages
Add counters for messages dropped due to validation or undefined routing Slight refactoring Change-Id: Ibe4e38445e81babc745d7a7d95356910845293ce Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1037
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt23
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt29
3 files changed, 50 insertions, 4 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index b686b250..3f69c088 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
@@ -32,6 +33,7 @@ interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(size: Int)
fun notifyMessageSent(topic: String)
+ fun notifyMessageDropped(cause: MessageDropCause)
}
@FunctionalInterface
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index ca1605e6..b29432f0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Either
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -29,9 +28,15 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import reactor.core.publisher.Flux
@@ -66,7 +71,11 @@ internal class VesHvCollector(
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
- .filterFailedWithLog(MessageValidator::validateFrameMessage)
+ .filterFailedWithLog {
+ MessageValidator
+ .validateFrameMessage(it)
+ .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ }
private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
.map(WireFrameMessage::payload)
@@ -74,12 +83,17 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
+ .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
.filterFailedWithLog(logger, clientContext::fullMdc,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
- .filterFailedWithLog(MessageValidator::validateProtobufMessage)
+ .filterFailedWithLog {
+ MessageValidator
+ .validateProtobufMessage(it)
+ .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ }
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
@@ -88,6 +102,7 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
+ .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
.filterEmptyWithLog(logger, clientContext::fullMdc,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
@@ -95,7 +110,7 @@ internal class VesHvCollector(
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
- fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+ private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt
new file mode 100644
index 00000000..af43ae67
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since December 2018
+ */
+enum class MessageDropCause(val tag: String) {
+ ROUTE_NOT_FOUND("routing"),
+ INVALID_MESSAGE("invalid")
+}