diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2f12e0cd..4176de99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,17 +19,20 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.Either import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -49,9 +52,9 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } @@ -63,26 +66,38 @@ internal class VesHvCollector( .concatMap(decoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux + private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux + .filterFailedWithLog(MessageValidator::validateFrameMessage) + + private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } + .flatMap(::decodePayload) + + private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder + .decode(rawPayload) + .filterFailedWithLog(logger, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + + private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux + .filterFailedWithLog(MessageValidator::validateProtobufMessage) private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) - - private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( - { - logger.info("ommiting the message" + 5) - Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage) = router + .findDestination(msg) + .filterEmptyWithLog(logger, + { "Found route for message: ${it.topic}, partition: ${it.partition}" }, + { "Could not find route for message" }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + .also { logger.debug("Released buffer memory after handling message stream") } + + fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = + filterFailedWithLog(logger, predicate) companion object { private val logger = Logger(VesHvCollector::class) |