From dc59f9dacd3e5a39b9bf2ed092796e1723ce26cf Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Tue, 10 Jul 2018 12:29:32 +0200 Subject: Use Flux.transform in VesHvCollector Goal: split the stream into logical parts Closes ONAP-493 Change-Id: I87aa817a18674fad265df81b6a0b4a8f0c46b866 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-601 --- .../collectors/veshv/factory/CollectorFactory.kt | 2 +- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 40 ++++++++++++++-------- 2 files changed, 26 insertions(+), 16 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 06047fd4..1bde6a12 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -57,7 +57,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, return VesHvCollector( wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, protobufDecoder = VesDecoder(), - validator = MessageValidator(), + messageValidator = MessageValidator(), router = Router(config.routing), sink = sinkProvider(config), metrics = metrics) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index ceae78c9..511ccf30 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,8 +25,8 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -45,7 +45,7 @@ import java.util.function.BiConsumer internal class VesHvCollector( private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, private val protobufDecoder: VesDecoder, - private val validator: MessageValidator, + private val messageValidator: MessageValidator, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { @@ -53,22 +53,32 @@ internal class VesHvCollector( override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux): Mono = wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream - .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(wireDecoder::decode) - .handle(completeStreamOnEOT) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + .transform { decodeWireFrame(it, wireDecoder) } .filter(PayloadWireFrameMessage::isValid) - .map(PayloadWireFrameMessage::payload) - .map(protobufDecoder::decode) - .filter(validator::isValid) - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } + .transform(::decodePayload) + .filter(messageValidator::isValid) + .transform(::routeMessage) .doOnTerminate { releaseBuffersMemory(wireDecoder) } - .onErrorResume(this::handleErrors) + .onErrorResume(::handleErrors) .then() } + private fun decodeWireFrame(flux: Flux, decoder: WireChunkDecoder): Flux = flux + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } + .concatMap(decoder::decode) + .handle(completeStreamOnEOT) + .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + + private fun decodePayload(flux: Flux): Flux = flux + .map(PayloadWireFrameMessage::payload) + .map(protobufDecoder::decode) + + + private fun routeMessage(flux: Flux): Flux = flux + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext { metrics.notifyMessageSent(it.topic) } + private fun findRoute(msg: VesMessage): Mono = omitWhenNull(msg, router::findDestination) private fun omitWhenNull(input: T, mapper: (T) -> Option): Mono = @@ -76,14 +86,14 @@ internal class VesHvCollector( { Mono.empty() }, { Mono.just(it) }) + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + private fun handleErrors(ex: Throwable): Flux { logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") logger.debug("Detailed stack trace", ex) return Flux.empty() } - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - companion object { private val logger = Logger(VesHvCollector::class) -- cgit 1.2.3-korg