aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt5
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt2
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt28
5 files changed, 37 insertions, 10 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 8970e03e..b700f135 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -52,8 +53,8 @@ internal class VesHvCollector(
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
- .onErrorResume(::handleErrors)
.then()
}
@@ -81,12 +82,6 @@ internal class VesHvCollector(
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
- logger.debug("Detailed stack trace", ex)
- return Flux.empty()
- }
-
companion object {
private val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index ede5a667..7a47cfc3 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
},
{
logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ val allocator = nettyInbound.context().channel().alloc()
+ it.handleConnection(allocator, createDataStream(nettyInbound))
}
)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 0775c652..4a2ef6b2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -51,7 +52,9 @@ internal class WireChunkDecoder(
Flux.empty()
} else {
streamBuffer.addComponent(true, byteBuf)
- generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+ generateFrames()
+ .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .doFinally { streamBuffer.discardReadComponents() }
}
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
index ba4c0802..a8414472 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
@@ -63,4 +63,4 @@ fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
getOptionValue(cmdLineOpt.option.opt),
- { System.getenv(cmdLineOpt.environmentVariableName()) }) \ No newline at end of file
+ { System.getenv(cmdLineOpt.environmentVariableName()) })
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
new file mode 100644
index 00000000..714702d3
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import reactor.core.publisher.Flux
+
+fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.debug("Detailed stack trace", ex)
+ return returnFlux
+}