From d55f5c0c3df4b2ea136100e61424810ede749778 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 14 Dec 2018 12:05:47 +0100 Subject: Metric: Processing time Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/boundary/adapters.kt | 5 +++-- .../onap/dcae/collectors/veshv/impl/VesDecoder.kt | 7 ++++--- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 22 ++++++++++------------ .../veshv/impl/adapters/LoggingSinkProvider.kt | 2 +- .../impl/adapters/kafka/VesMessageSerializer.kt | 4 +++- .../onap/dcae/collectors/veshv/model/VesMessage.kt | 4 ++-- 6 files changed, 23 insertions(+), 21 deletions(-) (limited to 'sources/hv-collector-core/src/main/kotlin') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 3f69c088..1334738a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.MessageDropCause @@ -31,8 +32,8 @@ interface Sink { interface Metrics { fun notifyBytesReceived(size: Int) - fun notifyMessageReceived(size: Int) - fun notifyMessageSent(topic: String) + fun notifyMessageReceived(msg: WireFrameMessage) + fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index c670e1d8..ee499e19 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Try = + fun decode(frame: WireFrameMessage): Try = Try { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - VesMessage(decodedHeader, bytes) + val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader + VesMessage(decodedHeader, frame) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index b29432f0..51f894d3 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -68,7 +67,7 @@ internal class VesHvCollector( private fun decodeWireFrame(flux: Flux): Flux = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireChunkDecoder::decode) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + .doOnNext(metrics::notifyMessageReceived) private fun filterInvalidWireFrame(flux: Flux): Flux = flux .filterFailedWithLog { @@ -78,15 +77,14 @@ internal class VesHvCollector( } private fun decodeProtobufPayload(flux: Flux): Flux = flux - .map(WireFrameMessage::payload) - .flatMap(::decodePayload) - - private fun decodePayload(rawPayload: ByteData): Flux = protobufDecoder - .decode(rawPayload) - .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } - .filterFailedWithLog(logger, clientContext::fullMdc, - { "Ves event header decoded successfully" }, - { "Failed to decode ves event header, reason: ${it.message}" }) + .flatMap { frame -> + protobufDecoder + .decode(frame) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } + .filterFailedWithLog(logger, clientContext::fullMdc, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + } private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux .filterFailedWithLog { @@ -98,7 +96,7 @@ internal class VesHvCollector( private fun routeMessage(flux: Flux): Flux = flux .flatMap(this::findRoute) .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } + .doOnNext(metrics::notifyMessageSent) private fun findRoute(msg: VesMessage) = router .findDestination(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index ec8593af..14966d9b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider { private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) + val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) logger.info(ctx, logMessageSupplier) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt index 7a6ac7c8..c92518a5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt @@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage */ class VesMessageSerializer : Serializer { override fun configure(configs: MutableMap?, isKey: Boolean) { + // not needed } - override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() + override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray() override fun close() { + // not needed } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 1965d78c..d3640193 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.model -import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) +data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage) -- cgit 1.2.3-korg