aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-09-21 10:14:03 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2018-09-24 08:22:29 +0200
commite880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (patch)
tree256bd77a86bf86fce96979643a9fe5fcc0318aba /hv-collector-core/src/main/kotlin
parent7333951cfec6b79a92b12e70cf679bff2f01825a (diff)
Remove end-of-transmission message from protocol
Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt27
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt20
2 files changed, 8 insertions, 39 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index f608a2b9..8970e03e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.publisher.SynchronousSink
-import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,7 +48,7 @@ internal class VesHvCollector(
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(PayloadWireFrameMessage::isValid)
+ .filter(WireFrameMessage::isValid)
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
@@ -62,14 +57,13 @@ internal class VesHvCollector(
.then()
}
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(decoder::decode)
- .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
- .map(PayloadWireFrameMessage::payload)
+ private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ .map(WireFrameMessage::payload)
.map(protobufDecoder::decode)
.flatMap { omitWhenNone(it) }
@@ -95,18 +89,5 @@ internal class VesHvCollector(
companion object {
private val logger = Logger(VesHvCollector::class)
-
- private val completeStreamOnEOT by lazy {
- BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
- when (frame) {
- is EndOfTransmissionMessage -> {
- logger.info("Completing stream because of receiving EOT message")
- sink.complete()
- }
- is PayloadWireFrameMessage -> sink.next(frame)
- else -> sink.error(UnknownWireFrameTypeException(frame))
- }
- }
- }
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 80f62d1a..0775c652 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -76,15 +74,9 @@ internal class WireChunkDecoder(
}
private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- when (frame) {
- is PayloadWireFrameMessage -> IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
- is EndOfTransmissionMessage -> IO {
- logEndOfTransmissionWireMessage()
- next.next(frame)
- }
+ IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
}
@@ -92,14 +84,10 @@ internal class WireChunkDecoder(
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ private fun logDecodedWireMessage(wire: WireFrameMessage) {
logger.trace { "Wire payload size: ${wire.payloadSize} B" }
}
- private fun logEndOfTransmissionWireMessage() {
- logger.trace { "Received end-of-transmission message" }
- }
-
private fun logEndOfData() {
logger.trace { "End of data in current TCP buffer" }
}