From 678af1b5172eb3b214584de91ece3f8df163c5e9 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 27 Jun 2018 12:30:56 +0200 Subject: Write performance tests Closes ONAP-434 Change-Id: I1139848f32ac19a4d0a0fd595f4b07c10cd83db0 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 17 +++++++---- .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 35 ++++++++++++++++++++-- .../collectors/veshv/impl/wire/WireFrameSink.kt | 31 +++++++++++-------- 3 files changed, 63 insertions(+), 20 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 033095ad..3246cf59 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -59,21 +59,28 @@ internal class VesHvCollector( .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .onErrorResume(this::handleErrors) .then() } private fun findRoute(msg: VesMessage): Mono = omitWhenNull(msg, router::findDestination) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { - wireChunkDecoder.release() - } - private fun omitWhenNull(input: T, mapper: (T) -> Option): Mono = mapper(input).fold( { Mono.empty() }, { Mono.just(it) }) + private fun handleErrors(ex: Throwable): Flux { + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.debug("Detailed stack trace", ex) + return Flux.empty() + } + + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { + wireChunkDecoder.release() + } + companion object { - val logger = Logger(VesHvCollector::class) + private val logger = Logger(VesHvCollector::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 056e0557..cfb61b3e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -31,9 +31,40 @@ import reactor.core.publisher.Flux * @author Piotr Jaszczyk * @since May 2018 */ -internal class WireChunkDecoder(private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { +internal class WireChunkDecoder( + private val decoder: WireFrameDecoder, + alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() + +// TODO: use this implementation and cleanup the rest +// fun decode(byteBuf: ByteBuf): Flux = Flux.defer { +// if (byteBuf.readableBytes() == 0) { +// byteBuf.release() +// Flux.empty() +// } else { +// streamBuffer.addComponent(true, byteBuf) +// Flux.generate { next -> +// try { +// val frame = decodeFirstFrameFromBuffer() +// if (frame == null) +// next.complete() +// else +// next.next(frame) +// } catch (ex: Exception) { +// next.error(ex) +// } +// } +// } +// }.doOnTerminate { streamBuffer.discardReadComponents() } +// +// +// private fun decodeFirstFrameFromBuffer(): WireFrame? = +// try { +// decoder.decodeFirst(streamBuffer) +// } catch (ex: MissingWireFrameBytesException) { +// logger.trace { "${ex.message} - waiting for more data" } +// null +// } fun decode(byteBuf: ByteBuf): Flux = StreamBufferEmitter .createFlux(decoder, streamBuffer, byteBuf) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index abebff3d..540c647a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -35,24 +35,27 @@ internal class WireFrameSink( private val streamBuffer: ByteBuf, private val sink: FluxSink, private val requestedFrameCount: Long) { + private var completed = false fun handleSubscriber() { - logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + if (!completed) { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } - try { - if (requestedFrameCount == Long.MAX_VALUE) { - logger.trace { "Push based strategy" } - pushAvailableFrames() - } else { - logger.trace { "Pull based strategy - req $requestedFrameCount" } - pushUpToNumberOfFrames() + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + completed = true + sink.error(ex) } - } catch (ex: Exception) { - sink.error(ex) - } - - logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + } } private fun pushAvailableFrames() { @@ -61,6 +64,7 @@ internal class WireFrameSink( sink.next(nextFrame) nextFrame = decodeFirstFrameFromBuffer() } + completed = true sink.complete() } @@ -76,6 +80,7 @@ internal class WireFrameSink( } } if (remaining > 0 && nextFrame == null) { + completed = true sink.complete() } } -- cgit 1.2.3-korg