aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-14 12:05:47 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-17 15:06:29 +0100
commitd55f5c0c3df4b2ea136100e61424810ede749778 (patch)
treeb4d60ede755af2a2204b8303d75b4d74489f6802 /sources/hv-collector-core/src/main/kotlin/org
parentfb040c0df8ab2b74d02b67feda4e2a161a1311d2 (diff)
Metric: Processing time
Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt4
6 files changed, 23 insertions, 21 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 3f69c088..1334738a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.MessageDropCause
@@ -31,8 +32,8 @@ interface Sink {
interface Metrics {
fun notifyBytesReceived(size: Int)
- fun notifyMessageReceived(size: Int)
- fun notifyMessageSent(topic: String)
+ fun notifyMessageReceived(msg: WireFrameMessage)
+ fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index c670e1d8..ee499e19 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Try
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.VesEvent
@@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent
*/
internal class VesDecoder {
- fun decode(bytes: ByteData): Try<VesMessage> =
+ fun decode(frame: WireFrameMessage): Try<VesMessage> =
Try {
- val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
- VesMessage(decodedHeader, bytes)
+ val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader
+ VesMessage(decodedHeader, frame)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index b29432f0..51f894d3 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -68,7 +67,7 @@ internal class VesHvCollector(
private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireChunkDecoder::decode)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+ .doOnNext(metrics::notifyMessageReceived)
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
.filterFailedWithLog {
@@ -78,15 +77,14 @@ internal class VesHvCollector(
}
private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
- .map(WireFrameMessage::payload)
- .flatMap(::decodePayload)
-
- private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
- .decode(rawPayload)
- .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
- .filterFailedWithLog(logger, clientContext::fullMdc,
- { "Ves event header decoded successfully" },
- { "Failed to decode ves event header, reason: ${it.message}" })
+ .flatMap { frame ->
+ protobufDecoder
+ .decode(frame)
+ .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ .filterFailedWithLog(logger, clientContext::fullMdc,
+ { "Ves event header decoded successfully" },
+ { "Failed to decode ves event header, reason: ${it.message}" })
+ }
private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
.filterFailedWithLog {
@@ -98,7 +96,7 @@ internal class VesHvCollector(
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
+ .doOnNext(metrics::notifyMessageSent)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index ec8593af..14966d9b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider {
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
+ val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong())
val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
if (msgs % INFO_LOGGING_FREQ == 0L)
logger.info(ctx, logMessageSupplier)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
index 7a6ac7c8..c92518a5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
@@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
*/
class VesMessageSerializer : Serializer<VesMessage> {
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ // not needed
}
- override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+ override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray()
override fun close() {
+ // not needed
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index 1965d78c..d3640193 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.model
-import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
+data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage)