From cf1465f37d20391a921df449d5dd01454f64910c Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 28 Jun 2018 10:09:24 +0200 Subject: Use generator to simplify the WireFrame decoding Performance tests have proven that manual creation of the Flux doesn't give us any performance benefits. On the other side it is complicated and error prone. Closes ONAP-438 Change-Id: I45912f91a52cbc84322775f7bae6d73afda079b8 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../veshv/impl/wire/StreamBufferEmitter.kt | 76 ----------------- .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 76 +++++++++-------- .../collectors/veshv/impl/wire/WireFrameSink.kt | 99 ---------------------- 3 files changed, 41 insertions(+), 210 deletions(-) delete mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt delete mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt (limited to 'hv-collector-core/src/main') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt deleted file mode 100644 index b788f511..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import io.netty.buffer.ByteBuf -import io.netty.buffer.CompositeByteBuf -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import reactor.core.publisher.FluxSink -import java.util.concurrent.atomic.AtomicBoolean -import java.util.function.Consumer - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class StreamBufferEmitter( - private val decoder: WireFrameDecoder, - private val streamBuffer: CompositeByteBuf, - private val newFrame: ByteBuf) - : Consumer> { - - private val subscribed = AtomicBoolean(false) - - override fun accept(sink: FluxSink) { - when { - - subscribed.getAndSet(true) -> - sink.error(IllegalStateException("Wire frame emitter supports only one subscriber")) - - newFrame.readableBytes() == 0 -> { - logger.trace { "Discarding empty buffer" } - newFrame.release() - sink.complete() - } - - else -> { - streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) - sink.onDispose { - logger.trace { "Disposing read components" } - streamBuffer.discardReadComponents() - } - sink.onRequest { requestedFrameCount -> - WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber() - } - } - } - } - - companion object { - fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux = - Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame)) - - private const val INCREASE_WRITER_INDEX = true - private val logger = Logger(StreamBufferEmitter::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index cfb61b3e..d1d72592 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux @@ -35,46 +36,47 @@ internal class WireChunkDecoder( private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() - -// TODO: use this implementation and cleanup the rest -// fun decode(byteBuf: ByteBuf): Flux = Flux.defer { -// if (byteBuf.readableBytes() == 0) { -// byteBuf.release() -// Flux.empty() -// } else { -// streamBuffer.addComponent(true, byteBuf) -// Flux.generate { next -> -// try { -// val frame = decodeFirstFrameFromBuffer() -// if (frame == null) -// next.complete() -// else -// next.next(frame) -// } catch (ex: Exception) { -// next.error(ex) -// } -// } -// } -// }.doOnTerminate { streamBuffer.discardReadComponents() } -// -// -// private fun decodeFirstFrameFromBuffer(): WireFrame? = -// try { -// decoder.decodeFirst(streamBuffer) -// } catch (ex: MissingWireFrameBytesException) { -// logger.trace { "${ex.message} - waiting for more data" } -// null -// } - - fun decode(byteBuf: ByteBuf): Flux = StreamBufferEmitter - .createFlux(decoder, streamBuffer, byteBuf) - .doOnSubscribe { logIncomingMessage(byteBuf) } - .doOnNext(this::logDecodedWireMessage) fun release() { streamBuffer.release() } + fun decode(byteBuf: ByteBuf): Flux = Flux.defer { + logIncomingMessage(byteBuf) + if (byteBuf.readableBytes() == 0) { + byteBuf.release() + Flux.empty() + } else { + streamBuffer.addComponent(true, byteBuf) + generateFrames().doOnTerminate { streamBuffer.discardReadComponents() } + } + } + + private fun generateFrames(): Flux = Flux.generate { next -> + try { + val frame = decodeFirstFrameFromBuffer() + if (frame == null) { + logEndOfData() + next.complete() + } else { + logDecodedWireMessage(frame) + next.next(frame) + } + } catch (ex: Exception) { + next.error(ex) + } + } + + + private fun decodeFirstFrameFromBuffer(): WireFrame? = + try { + decoder.decodeFirst(streamBuffer) + } catch (ex: MissingWireFrameBytesException) { + logger.trace { "${ex.message} - waiting for more data" } + null + } + + private fun logIncomingMessage(wire: ByteBuf) { logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } @@ -83,6 +85,10 @@ internal class WireChunkDecoder( logger.trace { "Wire payload size: ${wire.payloadSize} B." } } + private fun logEndOfData() { + logger.trace { "End of data in current TCP buffer" } + } + companion object { val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt deleted file mode 100644 index 540c647a..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ /dev/null @@ -1,99 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import io.netty.buffer.ByteBuf -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.FluxSink - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class WireFrameSink( - private val decoder: WireFrameDecoder, - private val streamBuffer: ByteBuf, - private val sink: FluxSink, - private val requestedFrameCount: Long) { - private var completed = false - - fun handleSubscriber() { - if (!completed) { - logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } - - try { - if (requestedFrameCount == Long.MAX_VALUE) { - logger.trace { "Push based strategy" } - pushAvailableFrames() - } else { - logger.trace { "Pull based strategy - req $requestedFrameCount" } - pushUpToNumberOfFrames() - } - } catch (ex: Exception) { - completed = true - sink.error(ex) - } - - logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } - } - } - - private fun pushAvailableFrames() { - var nextFrame = decodeFirstFrameFromBuffer() - while (nextFrame != null && !sink.isCancelled) { - sink.next(nextFrame) - nextFrame = decodeFirstFrameFromBuffer() - } - completed = true - sink.complete() - } - - private fun pushUpToNumberOfFrames() { - var nextFrame = decodeFirstFrameFromBuffer() - var remaining = requestedFrameCount - loop@ while (nextFrame != null && !sink.isCancelled) { - sink.next(nextFrame) - if (--remaining > 0) { - nextFrame = decodeFirstFrameFromBuffer() - } else { - break@loop - } - } - if (remaining > 0 && nextFrame == null) { - completed = true - sink.complete() - } - } - - private fun decodeFirstFrameFromBuffer(): WireFrame? = - try { - decoder.decodeFirst(streamBuffer) - } catch (ex: MissingWireFrameBytesException) { - logger.trace { "${ex.message} - waiting for more data" } - null - } - - companion object { - private val logger = Logger(WireFrameSink::class) - } -} -- cgit 1.2.3-korg