diff options
2 files changed, 26 insertions, 16 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 06047fd4..1bde6a12 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -57,7 +57,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, return VesHvCollector( wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, protobufDecoder = VesDecoder(), - validator = MessageValidator(), + messageValidator = MessageValidator(), router = Router(config.routing), sink = sinkProvider(config), metrics = metrics) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index ceae78c9..511ccf30 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,8 +25,8 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -45,7 +45,7 @@ import java.util.function.BiConsumer internal class VesHvCollector( private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, private val protobufDecoder: VesDecoder, - private val validator: MessageValidator, + private val messageValidator: MessageValidator, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { @@ -53,22 +53,32 @@ internal class VesHvCollector( override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream - .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(wireDecoder::decode) - .handle(completeStreamOnEOT) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + .transform { decodeWireFrame(it, wireDecoder) } .filter(PayloadWireFrameMessage::isValid) - .map(PayloadWireFrameMessage::payload) - .map(protobufDecoder::decode) - .filter(validator::isValid) - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } + .transform(::decodePayload) + .filter(messageValidator::isValid) + .transform(::routeMessage) .doOnTerminate { releaseBuffersMemory(wireDecoder) } - .onErrorResume(this::handleErrors) + .onErrorResume(::handleErrors) .then() } + private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } + .concatMap(decoder::decode) + .handle(completeStreamOnEOT) + .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + + private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux + .map(PayloadWireFrameMessage::payload) + .map(protobufDecoder::decode) + + + private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext { metrics.notifyMessageSent(it.topic) } + private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> = @@ -76,14 +86,14 @@ internal class VesHvCollector( { Mono.empty() }, { Mono.just(it) }) + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + private fun handleErrors(ex: Throwable): Flux<RoutedMessage> { logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") logger.debug("Detailed stack trace", ex) return Flux.empty() } - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - companion object { private val logger = Logger(VesHvCollector::class) |