aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt31
1 files changed, 18 insertions, 13 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index abebff3d..540c647a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -35,24 +35,27 @@ internal class WireFrameSink(
private val streamBuffer: ByteBuf,
private val sink: FluxSink<WireFrame>,
private val requestedFrameCount: Long) {
+ private var completed = false
fun handleSubscriber() {
- logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+ if (!completed) {
+ logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
- try {
- if (requestedFrameCount == Long.MAX_VALUE) {
- logger.trace { "Push based strategy" }
- pushAvailableFrames()
- } else {
- logger.trace { "Pull based strategy - req $requestedFrameCount" }
- pushUpToNumberOfFrames()
+ try {
+ if (requestedFrameCount == Long.MAX_VALUE) {
+ logger.trace { "Push based strategy" }
+ pushAvailableFrames()
+ } else {
+ logger.trace { "Pull based strategy - req $requestedFrameCount" }
+ pushUpToNumberOfFrames()
+ }
+ } catch (ex: Exception) {
+ completed = true
+ sink.error(ex)
}
- } catch (ex: Exception) {
- sink.error(ex)
- }
-
- logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ }
}
private fun pushAvailableFrames() {
@@ -61,6 +64,7 @@ internal class WireFrameSink(
sink.next(nextFrame)
nextFrame = decodeFirstFrameFromBuffer()
}
+ completed = true
sink.complete()
}
@@ -76,6 +80,7 @@ internal class WireFrameSink(
}
}
if (remaining > 0 && nextFrame == null) {
+ completed = true
sink.complete()
}
}