diff options
Diffstat (limited to 'hv-collector-core')
3 files changed, 8 insertions, 9 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 8970e03e..b700f135 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -52,8 +53,8 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } - .onErrorResume(::handleErrors) .then() } @@ -81,12 +82,6 @@ internal class VesHvCollector( private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - private fun handleErrors(ex: Throwable): Flux<RoutedMessage> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") - logger.debug("Detailed stack trace", ex) - return Flux.empty() - } - companion object { private val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index ede5a667..7a47cfc3 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, }, { logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + val allocator = nettyInbound.context().channel().alloc() + it.handleConnection(allocator, createDataStream(nettyInbound)) } ) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 0775c652..4a2ef6b2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -51,7 +52,9 @@ internal class WireChunkDecoder( Flux.empty() } else { streamBuffer.addComponent(true, byteBuf) - generateFrames().doOnTerminate { streamBuffer.discardReadComponents() } + generateFrames() + .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .doFinally { streamBuffer.discardReadComponents() } } } |