diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-27 12:30:56 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 10:39:41 +0200 |
commit | 678af1b5172eb3b214584de91ece3f8df163c5e9 (patch) | |
tree | 984c0cd15158183c3d038a08163737cd5e34a91b /hv-collector-core | |
parent | 553154ae42e5362dacab6c190b8cf1e1388f5b87 (diff) |
Write performance tests
Closes ONAP-434
Change-Id: I1139848f32ac19a4d0a0fd595f4b07c10cd83db0
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
3 files changed, 63 insertions, 20 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 033095ad..3246cf59 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -59,21 +59,28 @@ internal class VesHvCollector( .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .onErrorResume(this::handleErrors) .then() } private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { - wireChunkDecoder.release() - } - private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> = mapper(input).fold( { Mono.empty() }, { Mono.just(it) }) + private fun handleErrors(ex: Throwable): Flux<RoutedMessage> { + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.debug("Detailed stack trace", ex) + return Flux.empty() + } + + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { + wireChunkDecoder.release() + } + companion object { - val logger = Logger(VesHvCollector::class) + private val logger = Logger(VesHvCollector::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 056e0557..cfb61b3e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -31,9 +31,40 @@ import reactor.core.publisher.Flux * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class WireChunkDecoder(private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { +internal class WireChunkDecoder( + private val decoder: WireFrameDecoder, + alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() + +// TODO: use this implementation and cleanup the rest +// fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> { +// if (byteBuf.readableBytes() == 0) { +// byteBuf.release() +// Flux.empty() +// } else { +// streamBuffer.addComponent(true, byteBuf) +// Flux.generate { next -> +// try { +// val frame = decodeFirstFrameFromBuffer() +// if (frame == null) +// next.complete() +// else +// next.next(frame) +// } catch (ex: Exception) { +// next.error(ex) +// } +// } +// } +// }.doOnTerminate { streamBuffer.discardReadComponents() } +// +// +// private fun decodeFirstFrameFromBuffer(): WireFrame? = +// try { +// decoder.decodeFirst(streamBuffer) +// } catch (ex: MissingWireFrameBytesException) { +// logger.trace { "${ex.message} - waiting for more data" } +// null +// } fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter .createFlux(decoder, streamBuffer, byteBuf) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index abebff3d..540c647a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -35,24 +35,27 @@ internal class WireFrameSink( private val streamBuffer: ByteBuf, private val sink: FluxSink<WireFrame>, private val requestedFrameCount: Long) { + private var completed = false fun handleSubscriber() { - logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + if (!completed) { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } - try { - if (requestedFrameCount == Long.MAX_VALUE) { - logger.trace { "Push based strategy" } - pushAvailableFrames() - } else { - logger.trace { "Pull based strategy - req $requestedFrameCount" } - pushUpToNumberOfFrames() + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + completed = true + sink.error(ex) } - } catch (ex: Exception) { - sink.error(ex) - } - - logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + } } private fun pushAvailableFrames() { @@ -61,6 +64,7 @@ internal class WireFrameSink( sink.next(nextFrame) nextFrame = decodeFirstFrameFromBuffer() } + completed = true sink.complete() } @@ -76,6 +80,7 @@ internal class WireFrameSink( } } if (remaining > 0 && nextFrame == null) { + completed = true sink.complete() } } |