summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt45
1 files changed, 30 insertions, 15 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 2f12e0cd..4176de99 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,17 +19,20 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
+import arrow.core.Either
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -49,9 +52,9 @@ internal class VesHvCollector(
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(WireFrameMessage::isValid)
- .transform(::decodePayload)
- .filter(VesMessage::isValid)
+ .transform(::filterInvalidWireFrame)
+ .transform(::decodeProtobufPayload)
+ .transform(::filterInvalidProtobufMessages)
.transform(::routeMessage)
.onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
@@ -63,26 +66,38 @@ internal class VesHvCollector(
.concatMap(decoder::decode)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
+ .filterFailedWithLog(MessageValidator::validateFrameMessage)
+
+ private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
.map(WireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .flatMap { omitWhenNone(it) }
+ .flatMap(::decodePayload)
+
+ private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
+ .decode(rawPayload)
+ .filterFailedWithLog(logger,
+ { "Ves event header decoded successfully" },
+ { "Failed to decode ves event header, reason: ${it.message}" })
+
+ private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
+ .filterFailedWithLog(MessageValidator::validateProtobufMessage)
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
.doOnNext { metrics.notifyMessageSent(it.topic) }
-
- private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
- private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
- {
- logger.info("ommiting the message" + 5)
- Mono.empty() },
- { Mono.just(it) })
+ private fun findRoute(msg: VesMessage) = router
+ .findDestination(msg)
+ .filterEmptyWithLog(logger,
+ { "Found route for message: ${it.topic}, partition: ${it.partition}" },
+ { "Could not find route for message" })
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+ .also { logger.debug("Released buffer memory after handling message stream") }
+
+ fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+ filterFailedWithLog(logger, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)