diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 4176de99..0d07504d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Either import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -42,28 +42,27 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) } + .doFinally { releaseBuffersMemory() } + .then() - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) + .concatMap(wireChunkDecoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux @@ -75,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, + .filterFailedWithLog(logger, clientContext::asMap, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -89,15 +88,15 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, + .filterEmptyWithLog(logger, clientContext::asMap, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - .also { logger.debug("Released buffer memory after handling message stream") } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, predicate) + filterFailedWithLog(logger, clientContext::asMap, predicate) companion object { private val logger = Logger(VesHvCollector::class) |