aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt27
1 files changed, 4 insertions, 23 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index f608a2b9..8970e03e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.publisher.SynchronousSink
-import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,7 +48,7 @@ internal class VesHvCollector(
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(PayloadWireFrameMessage::isValid)
+ .filter(WireFrameMessage::isValid)
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
@@ -62,14 +57,13 @@ internal class VesHvCollector(
.then()
}
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(decoder::decode)
- .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
- .map(PayloadWireFrameMessage::payload)
+ private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ .map(WireFrameMessage::payload)
.map(protobufDecoder::decode)
.flatMap { omitWhenNone(it) }
@@ -95,18 +89,5 @@ internal class VesHvCollector(
companion object {
private val logger = Logger(VesHvCollector::class)
-
- private val completeStreamOnEOT by lazy {
- BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
- when (frame) {
- is EndOfTransmissionMessage -> {
- logger.info("Completing stream because of receiving EOT message")
- sink.complete()
- }
- is PayloadWireFrameMessage -> sink.next(frame)
- else -> sink.error(UnknownWireFrameTypeException(frame))
- }
- }
- }
}
}