aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-28 10:09:24 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 10:51:38 +0200
commitcf1465f37d20391a921df449d5dd01454f64910c (patch)
tree39615bdedb065af9341fba012a33d481e6383796 /hv-collector-core
parent678af1b5172eb3b214584de91ece3f8df163c5e9 (diff)
Use generator to simplify the WireFrame decoding
Performance tests have proven that manual creation of the Flux doesn't give us any performance benefits. On the other side it is complicated and error prone. Closes ONAP-438 Change-Id: I45912f91a52cbc84322775f7bae6d73afda079b8 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt76
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt76
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt99
3 files changed, 41 insertions, 210 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
deleted file mode 100644
index b788f511..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.CompositeByteBuf
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import reactor.core.publisher.FluxSink
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.function.Consumer
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class StreamBufferEmitter(
- private val decoder: WireFrameDecoder,
- private val streamBuffer: CompositeByteBuf,
- private val newFrame: ByteBuf)
- : Consumer<FluxSink<WireFrame>> {
-
- private val subscribed = AtomicBoolean(false)
-
- override fun accept(sink: FluxSink<WireFrame>) {
- when {
-
- subscribed.getAndSet(true) ->
- sink.error(IllegalStateException("Wire frame emitter supports only one subscriber"))
-
- newFrame.readableBytes() == 0 -> {
- logger.trace { "Discarding empty buffer" }
- newFrame.release()
- sink.complete()
- }
-
- else -> {
- streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
- sink.onDispose {
- logger.trace { "Disposing read components" }
- streamBuffer.discardReadComponents()
- }
- sink.onRequest { requestedFrameCount ->
- WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber()
- }
- }
- }
- }
-
- companion object {
- fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
- Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame))
-
- private const val INCREASE_WRITER_INDEX = true
- private val logger = Logger(StreamBufferEmitter::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index cfb61b3e..d1d72592 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
@@ -35,46 +36,47 @@ internal class WireChunkDecoder(
private val decoder: WireFrameDecoder,
alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
private val streamBuffer = alloc.compositeBuffer()
-
-// TODO: use this implementation and cleanup the rest
-// fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> {
-// if (byteBuf.readableBytes() == 0) {
-// byteBuf.release()
-// Flux.empty()
-// } else {
-// streamBuffer.addComponent(true, byteBuf)
-// Flux.generate { next ->
-// try {
-// val frame = decodeFirstFrameFromBuffer()
-// if (frame == null)
-// next.complete()
-// else
-// next.next(frame)
-// } catch (ex: Exception) {
-// next.error(ex)
-// }
-// }
-// }
-// }.doOnTerminate { streamBuffer.discardReadComponents() }
-//
-//
-// private fun decodeFirstFrameFromBuffer(): WireFrame? =
-// try {
-// decoder.decodeFirst(streamBuffer)
-// } catch (ex: MissingWireFrameBytesException) {
-// logger.trace { "${ex.message} - waiting for more data" }
-// null
-// }
-
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
- .createFlux(decoder, streamBuffer, byteBuf)
- .doOnSubscribe { logIncomingMessage(byteBuf) }
- .doOnNext(this::logDecodedWireMessage)
fun release() {
streamBuffer.release()
}
+ fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+ logIncomingMessage(byteBuf)
+ if (byteBuf.readableBytes() == 0) {
+ byteBuf.release()
+ Flux.empty()
+ } else {
+ streamBuffer.addComponent(true, byteBuf)
+ generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+ }
+ }
+
+ private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+ try {
+ val frame = decodeFirstFrameFromBuffer()
+ if (frame == null) {
+ logEndOfData()
+ next.complete()
+ } else {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ } catch (ex: Exception) {
+ next.error(ex)
+ }
+ }
+
+
+ private fun decodeFirstFrameFromBuffer(): WireFrame? =
+ try {
+ decoder.decodeFirst(streamBuffer)
+ } catch (ex: MissingWireFrameBytesException) {
+ logger.trace { "${ex.message} - waiting for more data" }
+ null
+ }
+
+
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
@@ -83,6 +85,10 @@ internal class WireChunkDecoder(
logger.trace { "Wire payload size: ${wire.payloadSize} B." }
}
+ private fun logEndOfData() {
+ logger.trace { "End of data in current TCP buffer" }
+ }
+
companion object {
val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
deleted file mode 100644
index 540c647a..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.FluxSink
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class WireFrameSink(
- private val decoder: WireFrameDecoder,
- private val streamBuffer: ByteBuf,
- private val sink: FluxSink<WireFrame>,
- private val requestedFrameCount: Long) {
- private var completed = false
-
- fun handleSubscriber() {
- if (!completed) {
- logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
-
- try {
- if (requestedFrameCount == Long.MAX_VALUE) {
- logger.trace { "Push based strategy" }
- pushAvailableFrames()
- } else {
- logger.trace { "Pull based strategy - req $requestedFrameCount" }
- pushUpToNumberOfFrames()
- }
- } catch (ex: Exception) {
- completed = true
- sink.error(ex)
- }
-
- logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
- }
- }
-
- private fun pushAvailableFrames() {
- var nextFrame = decodeFirstFrameFromBuffer()
- while (nextFrame != null && !sink.isCancelled) {
- sink.next(nextFrame)
- nextFrame = decodeFirstFrameFromBuffer()
- }
- completed = true
- sink.complete()
- }
-
- private fun pushUpToNumberOfFrames() {
- var nextFrame = decodeFirstFrameFromBuffer()
- var remaining = requestedFrameCount
- loop@ while (nextFrame != null && !sink.isCancelled) {
- sink.next(nextFrame)
- if (--remaining > 0) {
- nextFrame = decodeFirstFrameFromBuffer()
- } else {
- break@loop
- }
- }
- if (remaining > 0 && nextFrame == null) {
- completed = true
- sink.complete()
- }
- }
-
- private fun decodeFirstFrameFromBuffer(): WireFrame? =
- try {
- decoder.decodeFirst(streamBuffer)
- } catch (ex: MissingWireFrameBytesException) {
- logger.trace { "${ex.message} - waiting for more data" }
- null
- }
-
- companion object {
- private val logger = Logger(WireFrameSink::class)
- }
-}