From e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 21 Sep 2018 10:14:03 +0200 Subject: Remove end-of-transmission message from protocol Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka --- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 27 ++++------------------ .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 20 ++++------------ 2 files changed, 8 insertions(+), 39 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index f608a2b9..8970e03e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.core.publisher.SynchronousSink -import java.util.function.BiConsumer /** * @author Piotr Jaszczyk @@ -53,7 +48,7 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(PayloadWireFrameMessage::isValid) + .filter(WireFrameMessage::isValid) .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) @@ -62,14 +57,13 @@ internal class VesHvCollector( .then() } - private fun decodeWireFrame(flux: Flux, decoder: WireChunkDecoder): Flux = flux + private fun decodeWireFrame(flux: Flux, decoder: WireChunkDecoder): Flux = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(decoder::decode) - .handle(completeStreamOnEOT) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux): Flux = flux - .map(PayloadWireFrameMessage::payload) + private fun decodePayload(flux: Flux): Flux = flux + .map(WireFrameMessage::payload) .map(protobufDecoder::decode) .flatMap { omitWhenNone(it) } @@ -95,18 +89,5 @@ internal class VesHvCollector( companion object { private val logger = Logger(VesHvCollector::class) - - private val completeStreamOnEOT by lazy { - BiConsumer> { frame, sink -> - when (frame) { - is EndOfTransmissionMessage -> { - logger.info("Completing stream because of receiving EOT message") - sink.complete() - } - is PayloadWireFrameMessage -> sink.next(frame) - else -> sink.error(UnknownWireFrameTypeException(frame)) - } - } - } } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 80f62d1a..0775c652 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -76,15 +74,9 @@ internal class WireChunkDecoder( } private fun onSuccess(next: SynchronousSink): (WireFrameMessage) -> IO = { frame -> - when (frame) { - is PayloadWireFrameMessage -> IO { - logDecodedWireMessage(frame) - next.next(frame) - } - is EndOfTransmissionMessage -> IO { - logEndOfTransmissionWireMessage() - next.next(frame) - } + IO { + logDecodedWireMessage(frame) + next.next(frame) } } @@ -92,14 +84,10 @@ internal class WireChunkDecoder( logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } - private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) { + private fun logDecodedWireMessage(wire: WireFrameMessage) { logger.trace { "Wire payload size: ${wire.payloadSize} B" } } - private fun logEndOfTransmissionWireMessage() { - logger.trace { "Received end-of-transmission message" } - } - private fun logEndOfData() { logger.trace { "End of data in current TCP buffer" } } -- cgit 1.2.3-korg