summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt76
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt76
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt99
3 files changed, 41 insertions, 210 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
deleted file mode 100644
index b788f511..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.CompositeByteBuf
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import reactor.core.publisher.FluxSink
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.function.Consumer
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class StreamBufferEmitter(
- private val decoder: WireFrameDecoder,
- private val streamBuffer: CompositeByteBuf,
- private val newFrame: ByteBuf)
- : Consumer<FluxSink<WireFrame>> {
-
- private val subscribed = AtomicBoolean(false)
-
- override fun accept(sink: FluxSink<WireFrame>) {
- when {
-
- subscribed.getAndSet(true) ->
- sink.error(IllegalStateException("Wire frame emitter supports only one subscriber"))
-
- newFrame.readableBytes() == 0 -> {
- logger.trace { "Discarding empty buffer" }
- newFrame.release()
- sink.complete()
- }
-
- else -> {
- streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
- sink.onDispose {
- logger.trace { "Disposing read components" }
- streamBuffer.discardReadComponents()
- }
- sink.onRequest { requestedFrameCount ->
- WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber()
- }
- }
- }
- }
-
- companion object {
- fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
- Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame))
-
- private const val INCREASE_WRITER_INDEX = true
- private val logger = Logger(StreamBufferEmitter::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index cfb61b3e..d1d72592 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
@@ -35,46 +36,47 @@ internal class WireChunkDecoder(
private val decoder: WireFrameDecoder,
alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
private val streamBuffer = alloc.compositeBuffer()
-
-// TODO: use this implementation and cleanup the rest
-// fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> {
-// if (byteBuf.readableBytes() == 0) {
-// byteBuf.release()
-// Flux.empty()
-// } else {
-// streamBuffer.addComponent(true, byteBuf)
-// Flux.generate { next ->
-// try {
-// val frame = decodeFirstFrameFromBuffer()
-// if (frame == null)
-// next.complete()
-// else
-// next.next(frame)
-// } catch (ex: Exception) {
-// next.error(ex)
-// }
-// }
-// }
-// }.doOnTerminate { streamBuffer.discardReadComponents() }
-//
-//
-// private fun decodeFirstFrameFromBuffer(): WireFrame? =
-// try {
-// decoder.decodeFirst(streamBuffer)
-// } catch (ex: MissingWireFrameBytesException) {
-// logger.trace { "${ex.message} - waiting for more data" }
-// null
-// }
-
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
- .createFlux(decoder, streamBuffer, byteBuf)
- .doOnSubscribe { logIncomingMessage(byteBuf) }
- .doOnNext(this::logDecodedWireMessage)
fun release() {
streamBuffer.release()
}
+ fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+ logIncomingMessage(byteBuf)
+ if (byteBuf.readableBytes() == 0) {
+ byteBuf.release()
+ Flux.empty()
+ } else {
+ streamBuffer.addComponent(true, byteBuf)
+ generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+ }
+ }
+
+ private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+ try {
+ val frame = decodeFirstFrameFromBuffer()
+ if (frame == null) {
+ logEndOfData()
+ next.complete()
+ } else {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ } catch (ex: Exception) {
+ next.error(ex)
+ }
+ }
+
+
+ private fun decodeFirstFrameFromBuffer(): WireFrame? =
+ try {
+ decoder.decodeFirst(streamBuffer)
+ } catch (ex: MissingWireFrameBytesException) {
+ logger.trace { "${ex.message} - waiting for more data" }
+ null
+ }
+
+
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
@@ -83,6 +85,10 @@ internal class WireChunkDecoder(
logger.trace { "Wire payload size: ${wire.payloadSize} B." }
}
+ private fun logEndOfData() {
+ logger.trace { "End of data in current TCP buffer" }
+ }
+
companion object {
val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
deleted file mode 100644
index 540c647a..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.FluxSink
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class WireFrameSink(
- private val decoder: WireFrameDecoder,
- private val streamBuffer: ByteBuf,
- private val sink: FluxSink<WireFrame>,
- private val requestedFrameCount: Long) {
- private var completed = false
-
- fun handleSubscriber() {
- if (!completed) {
- logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
-
- try {
- if (requestedFrameCount == Long.MAX_VALUE) {
- logger.trace { "Push based strategy" }
- pushAvailableFrames()
- } else {
- logger.trace { "Pull based strategy - req $requestedFrameCount" }
- pushUpToNumberOfFrames()
- }
- } catch (ex: Exception) {
- completed = true
- sink.error(ex)
- }
-
- logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
- }
- }
-
- private fun pushAvailableFrames() {
- var nextFrame = decodeFirstFrameFromBuffer()
- while (nextFrame != null && !sink.isCancelled) {
- sink.next(nextFrame)
- nextFrame = decodeFirstFrameFromBuffer()
- }
- completed = true
- sink.complete()
- }
-
- private fun pushUpToNumberOfFrames() {
- var nextFrame = decodeFirstFrameFromBuffer()
- var remaining = requestedFrameCount
- loop@ while (nextFrame != null && !sink.isCancelled) {
- sink.next(nextFrame)
- if (--remaining > 0) {
- nextFrame = decodeFirstFrameFromBuffer()
- } else {
- break@loop
- }
- }
- if (remaining > 0 && nextFrame == null) {
- completed = true
- sink.complete()
- }
- }
-
- private fun decodeFirstFrameFromBuffer(): WireFrame? =
- try {
- decoder.decodeFirst(streamBuffer)
- } catch (ex: MissingWireFrameBytesException) {
- logger.trace { "${ex.message} - waiting for more data" }
- null
- }
-
- companion object {
- private val logger = Logger(WireFrameSink::class)
- }
-}