aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-27 12:30:56 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 10:39:41 +0200
commit678af1b5172eb3b214584de91ece3f8df163c5e9 (patch)
tree984c0cd15158183c3d038a08163737cd5e34a91b /hv-collector-core
parent553154ae42e5362dacab6c190b8cf1e1388f5b87 (diff)
Write performance tests
Closes ONAP-434 Change-Id: I1139848f32ac19a4d0a0fd595f4b07c10cd83db0 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt35
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt31
3 files changed, 63 insertions, 20 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 033095ad..3246cf59 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -59,21 +59,28 @@ internal class VesHvCollector(
.compose(sink::send)
.doOnNext { metrics.notifyMessageSent(it.topic) }
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .onErrorResume(this::handleErrors)
.then()
}
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
- wireChunkDecoder.release()
- }
-
private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
mapper(input).fold(
{ Mono.empty() },
{ Mono.just(it) })
+ private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.debug("Detailed stack trace", ex)
+ return Flux.empty()
+ }
+
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
+ wireChunkDecoder.release()
+ }
+
companion object {
- val logger = Logger(VesHvCollector::class)
+ private val logger = Logger(VesHvCollector::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 056e0557..cfb61b3e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -31,9 +31,40 @@ import reactor.core.publisher.Flux
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class WireChunkDecoder(private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(
+ private val decoder: WireFrameDecoder,
+ alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
private val streamBuffer = alloc.compositeBuffer()
+
+// TODO: use this implementation and cleanup the rest
+// fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> {
+// if (byteBuf.readableBytes() == 0) {
+// byteBuf.release()
+// Flux.empty()
+// } else {
+// streamBuffer.addComponent(true, byteBuf)
+// Flux.generate { next ->
+// try {
+// val frame = decodeFirstFrameFromBuffer()
+// if (frame == null)
+// next.complete()
+// else
+// next.next(frame)
+// } catch (ex: Exception) {
+// next.error(ex)
+// }
+// }
+// }
+// }.doOnTerminate { streamBuffer.discardReadComponents() }
+//
+//
+// private fun decodeFirstFrameFromBuffer(): WireFrame? =
+// try {
+// decoder.decodeFirst(streamBuffer)
+// } catch (ex: MissingWireFrameBytesException) {
+// logger.trace { "${ex.message} - waiting for more data" }
+// null
+// }
fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
.createFlux(decoder, streamBuffer, byteBuf)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index abebff3d..540c647a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -35,24 +35,27 @@ internal class WireFrameSink(
private val streamBuffer: ByteBuf,
private val sink: FluxSink<WireFrame>,
private val requestedFrameCount: Long) {
+ private var completed = false
fun handleSubscriber() {
- logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+ if (!completed) {
+ logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
- try {
- if (requestedFrameCount == Long.MAX_VALUE) {
- logger.trace { "Push based strategy" }
- pushAvailableFrames()
- } else {
- logger.trace { "Pull based strategy - req $requestedFrameCount" }
- pushUpToNumberOfFrames()
+ try {
+ if (requestedFrameCount == Long.MAX_VALUE) {
+ logger.trace { "Push based strategy" }
+ pushAvailableFrames()
+ } else {
+ logger.trace { "Pull based strategy - req $requestedFrameCount" }
+ pushUpToNumberOfFrames()
+ }
+ } catch (ex: Exception) {
+ completed = true
+ sink.error(ex)
}
- } catch (ex: Exception) {
- sink.error(ex)
- }
-
- logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ }
}
private fun pushAvailableFrames() {
@@ -61,6 +64,7 @@ internal class WireFrameSink(
sink.next(nextFrame)
nextFrame = decodeFirstFrameFromBuffer()
}
+ completed = true
sink.complete()
}
@@ -76,6 +80,7 @@ internal class WireFrameSink(
}
}
if (remaining > 0 && nextFrame == null) {
+ completed = true
sink.complete()
}
}