diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2018-11-07 08:16:09 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2018-11-13 13:14:16 +0100 |
commit | 3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (patch) | |
tree | 5b75879d2b1c5bfdd6b2c923ffa570f965bc0e70 /hv-collector-core | |
parent | d1abb8c3e7c20495ca8a953b175a9810a5b73671 (diff) |
Handle stream error early
Should fix inconsistent logging due to Reactor Signal sometimes
propagating from WireChunkDecoder stream to VesHvCollector stream as
Signal.CANCEL instead of Signal.ERROR and thus not being handled
correctly.
As a drawback however we will log error twice in case it comes from
WireChunkDecoder as we want to terminate connection in such case and
so we need to propagate error.
In WireChunkDecoder `doOnTerminate` was changed to
`doFinally` as this method handles also cancellation
signals and not only terminal signals.
Also fixed minor checkstyle reported issues.
Change-Id: I6e91d96c5a1a3ecf30603db9a71e032c770d507f
Issue-ID: DCAEGEN2-955
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-core')
3 files changed, 8 insertions, 9 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 8970e03e..b700f135 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -52,8 +53,8 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } - .onErrorResume(::handleErrors) .then() } @@ -81,12 +82,6 @@ internal class VesHvCollector( private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - private fun handleErrors(ex: Throwable): Flux<RoutedMessage> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") - logger.debug("Detailed stack trace", ex) - return Flux.empty() - } - companion object { private val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index ede5a667..7a47cfc3 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, }, { logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + val allocator = nettyInbound.context().channel().alloc() + it.handleConnection(allocator, createDataStream(nettyInbound)) } ) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 0775c652..4a2ef6b2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -51,7 +52,9 @@ internal class WireChunkDecoder( Flux.empty() } else { streamBuffer.addComponent(true, byteBuf) - generateFrames().doOnTerminate { streamBuffer.discardReadComponents() } + generateFrames() + .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .doFinally { streamBuffer.discardReadComponents() } } } |