diff options
5 files changed, 37 insertions, 10 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 8970e03e..b700f135 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -52,8 +53,8 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } - .onErrorResume(::handleErrors) .then() } @@ -81,12 +82,6 @@ internal class VesHvCollector( private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - private fun handleErrors(ex: Throwable): Flux<RoutedMessage> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") - logger.debug("Detailed stack trace", ex) - return Flux.empty() - } - companion object { private val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index ede5a667..7a47cfc3 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, }, { logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + val allocator = nettyInbound.context().channel().alloc() + it.handleConnection(allocator, createDataStream(nettyInbound)) } ) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 0775c652..4a2ef6b2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -51,7 +52,9 @@ internal class WireChunkDecoder( Flux.empty() } else { streamBuffer.addComponent(true, byteBuf) - generateFrames().doOnTerminate { streamBuffer.discardReadComponents() } + generateFrames() + .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .doFinally { streamBuffer.discardReadComponents() } } } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt index ba4c0802..a8414472 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt @@ -63,4 +63,4 @@ fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean = private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain( getOptionValue(cmdLineOpt.option.opt), - { System.getenv(cmdLineOpt.environmentVariableName()) })
\ No newline at end of file + { System.getenv(cmdLineOpt.environmentVariableName()) }) diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt new file mode 100644 index 00000000..714702d3 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -0,0 +1,28 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.logging + +import reactor.core.publisher.Flux + +fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.debug("Detailed stack trace", ex) + return returnFlux +} |