diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b735138d..ca9d28ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.wire -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink @@ -63,26 +62,22 @@ internal class WireChunkDecoder( private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) - .unsafeRunSync() } - private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> + private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err -> when (err) { - is InvalidWireFrame -> IO { + is InvalidWireFrame -> next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { + is MissingWireFrameBytes -> { logEndOfData() next.complete() } } } - private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } + private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame -> + logDecodedWireMessage(frame) + next.next(frame) } private fun logIncomingMessage(wire: ByteBuf) { |