aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorJakub Dudycz <jdudycz@nokia.com>2018-07-10 12:29:32 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 14:03:40 +0200
commitdc59f9dacd3e5a39b9bf2ed092796e1723ce26cf (patch)
treea1a1e46eab782a7a123ba63c75e9a6c8a6417c4a /hv-collector-core/src/main/kotlin
parent1ffcd293e43b0ca5e18784521b04376a65119690 (diff)
Use Flux.transform in VesHvCollector
Goal: split the stream into logical parts Closes ONAP-493 Change-Id: I87aa817a18674fad265df81b6a0b4a8f0c46b866 Signed-off-by: Jakub Dudycz <jdudycz@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt40
2 files changed, 26 insertions, 16 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 06047fd4..1bde6a12 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -57,7 +57,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
return VesHvCollector(
wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
protobufDecoder = VesDecoder(),
- validator = MessageValidator(),
+ messageValidator = MessageValidator(),
router = Router(config.routing),
sink = sinkProvider(config),
metrics = metrics)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index ceae78c9..511ccf30 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,8 +25,8 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -45,7 +45,7 @@ import java.util.function.BiConsumer
internal class VesHvCollector(
private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
private val protobufDecoder: VesDecoder,
- private val validator: MessageValidator,
+ private val messageValidator: MessageValidator,
private val router: Router,
private val sink: Sink,
private val metrics: Metrics) : Collector {
@@ -53,22 +53,32 @@ internal class VesHvCollector(
override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
- .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(wireDecoder::decode)
- .handle(completeStreamOnEOT)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+ .transform { decodeWireFrame(it, wireDecoder) }
.filter(PayloadWireFrameMessage::isValid)
- .map(PayloadWireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .filter(validator::isValid)
- .flatMap(this::findRoute)
- .compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
+ .transform(::decodePayload)
+ .filter(messageValidator::isValid)
+ .transform(::routeMessage)
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
- .onErrorResume(this::handleErrors)
+ .onErrorResume(::handleErrors)
.then()
}
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+ .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
+ .concatMap(decoder::decode)
+ .handle(completeStreamOnEOT)
+ .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+
+ private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
+ .map(PayloadWireFrameMessage::payload)
+ .map(protobufDecoder::decode)
+
+
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext { metrics.notifyMessageSent(it.topic) }
+
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
@@ -76,14 +86,14 @@ internal class VesHvCollector(
{ Mono.empty() },
{ Mono.just(it) })
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+
private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
logger.debug("Detailed stack trace", ex)
return Flux.empty()
}
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-
companion object {
private val logger = Logger(VesHvCollector::class)