diff options
Diffstat (limited to 'hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt')
-rw-r--r-- | hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index abebff3d..540c647a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -35,24 +35,27 @@ internal class WireFrameSink( private val streamBuffer: ByteBuf, private val sink: FluxSink<WireFrame>, private val requestedFrameCount: Long) { + private var completed = false fun handleSubscriber() { - logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + if (!completed) { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } - try { - if (requestedFrameCount == Long.MAX_VALUE) { - logger.trace { "Push based strategy" } - pushAvailableFrames() - } else { - logger.trace { "Pull based strategy - req $requestedFrameCount" } - pushUpToNumberOfFrames() + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + completed = true + sink.error(ex) } - } catch (ex: Exception) { - sink.error(ex) - } - - logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + } } private fun pushAvailableFrames() { @@ -61,6 +64,7 @@ internal class WireFrameSink( sink.next(nextFrame) nextFrame = decodeFirstFrameFromBuffer() } + completed = true sink.complete() } @@ -76,6 +80,7 @@ internal class WireFrameSink( } } if (remaining > 0 && nextFrame == null) { + completed = true sink.complete() } } |