diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin')
6 files changed, 23 insertions, 21 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 3f69c088..1334738a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.MessageDropCause @@ -31,8 +32,8 @@ interface Sink { interface Metrics { fun notifyBytesReceived(size: Int) - fun notifyMessageReceived(size: Int) - fun notifyMessageSent(topic: String) + fun notifyMessageReceived(msg: WireFrameMessage) + fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index c670e1d8..ee499e19 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Try<VesMessage> = + fun decode(frame: WireFrameMessage): Try<VesMessage> = Try { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - VesMessage(decodedHeader, bytes) + val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader + VesMessage(decodedHeader, frame) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index b29432f0..51f894d3 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -68,7 +67,7 @@ internal class VesHvCollector( private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireChunkDecoder::decode) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + .doOnNext(metrics::notifyMessageReceived) private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux .filterFailedWithLog { @@ -78,15 +77,14 @@ internal class VesHvCollector( } private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux - .map(WireFrameMessage::payload) - .flatMap(::decodePayload) - - private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder - .decode(rawPayload) - .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } - .filterFailedWithLog(logger, clientContext::fullMdc, - { "Ves event header decoded successfully" }, - { "Failed to decode ves event header, reason: ${it.message}" }) + .flatMap { frame -> + protobufDecoder + .decode(frame) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } + .filterFailedWithLog(logger, clientContext::fullMdc, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + } private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux .filterFailedWithLog { @@ -98,7 +96,7 @@ internal class VesHvCollector( private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } + .doOnNext(metrics::notifyMessageSent) private fun findRoute(msg: VesMessage) = router .findDestination(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index ec8593af..14966d9b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider { private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) + val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) logger.info(ctx, logMessageSupplier) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt index 7a6ac7c8..c92518a5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt @@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage */ class VesMessageSerializer : Serializer<VesMessage> { override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + // not needed } - override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() + override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray() override fun close() { + // not needed } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 1965d78c..d3640193 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.model -import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) +data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage) |