aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt43
1 files changed, 21 insertions, 22 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 4176de99..0d07504d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Either
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -42,28 +42,27 @@ import reactor.core.publisher.Mono
* @since May 2018
*/
internal class VesHvCollector(
- private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+ private val clientContext: ClientContext,
+ private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val router: Router,
private val sink: Sink,
private val metrics: Metrics) : Collector {
- override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
- wireChunkDecoderSupplier(alloc).let { wireDecoder ->
- dataStream
- .transform { decodeWireFrame(it, wireDecoder) }
- .transform(::filterInvalidWireFrame)
- .transform(::decodeProtobufPayload)
- .transform(::filterInvalidProtobufMessages)
- .transform(::routeMessage)
- .onErrorResume { logger.handleReactiveStreamError(it) }
- .doFinally { releaseBuffersMemory(wireDecoder) }
- .then()
- }
+ override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+ dataStream
+ .transform { decodeWireFrame(it) }
+ .transform(::filterInvalidWireFrame)
+ .transform(::decodeProtobufPayload)
+ .transform(::filterInvalidProtobufMessages)
+ .transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+ .doFinally { releaseBuffersMemory() }
+ .then()
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(decoder::decode)
+ .concatMap(wireChunkDecoder::decode)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
@@ -75,7 +74,7 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
- .filterFailedWithLog(logger,
+ .filterFailedWithLog(logger, clientContext::asMap,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
@@ -89,15 +88,15 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
- .filterEmptyWithLog(logger,
+ .filterEmptyWithLog(logger, clientContext::asMap,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- .also { logger.debug("Released buffer memory after handling message stream") }
+ private fun releaseBuffersMemory() = wireChunkDecoder.release()
+ .also { logger.debug { "Released buffer memory after handling message stream" } }
fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
- filterFailedWithLog(logger, predicate)
+ filterFailedWithLog(logger, clientContext::asMap, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)