diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-17 07:54:58 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-03 07:18:32 +0200 |
commit | 40c5abeac588ca6c13477675960c94a97dcdeb15 (patch) | |
tree | a4e0b6cb8b843b2eae99349ac7b76d2d9dd9e6fc /hv-collector-core/src/main | |
parent | 8e31525e1003bede54c23a061cee2ca7e59a4be5 (diff) |
Use Try/Option monad when decoding protobuf
Closes ONAP-143
Change-Id: I33cb2d24cd5962318a6f405096db298bbdbab963
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main')
-rw-r--r-- | hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt | 11 | ||||
-rw-r--r-- | hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 13 |
2 files changed, 14 insertions, 10 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 591a48b7..a7780109 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Try +import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventV5.VesEvent @@ -29,8 +31,9 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): VesMessage { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - return VesMessage(decodedHeader, bytes) - } + fun decode(bytes: ByteData): Option<VesMessage> = + Try { + val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader + VesMessage(decodedHeader, bytes) + }.toOption() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2a07b9b8..52689162 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.SynchronousSink @@ -71,19 +72,19 @@ internal class VesHvCollector( private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux .map(PayloadWireFrameMessage::payload) .map(protobufDecoder::decode) - + .flatMap { omitWhenNone(it) } private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) - private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> = - mapper(input).fold( - { Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) + + private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( + { Mono.empty() }, + { Mono.just(it) }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() |