summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-11-07 08:16:09 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2018-11-13 13:14:16 +0100
commit3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (patch)
tree5b75879d2b1c5bfdd6b2c923ffa570f965bc0e70
parentd1abb8c3e7c20495ca8a953b175a9810a5b73671 (diff)
Handle stream error early
Should fix inconsistent logging due to Reactor Signal sometimes propagating from WireChunkDecoder stream to VesHvCollector stream as Signal.CANCEL instead of Signal.ERROR and thus not being handled correctly. As a drawback however we will log error twice in case it comes from WireChunkDecoder as we want to terminate connection in such case and so we need to propagate error. In WireChunkDecoder `doOnTerminate` was changed to `doFinally` as this method handles also cancellation signals and not only terminal signals. Also fixed minor checkstyle reported issues. Change-Id: I6e91d96c5a1a3ecf30603db9a71e032c770d507f Issue-ID: DCAEGEN2-955 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt5
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt2
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt28
5 files changed, 37 insertions, 10 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 8970e03e..b700f135 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -52,8 +53,8 @@ internal class VesHvCollector(
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
- .onErrorResume(::handleErrors)
.then()
}
@@ -81,12 +82,6 @@ internal class VesHvCollector(
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
- logger.debug("Detailed stack trace", ex)
- return Flux.empty()
- }
-
companion object {
private val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index ede5a667..7a47cfc3 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
},
{
logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ val allocator = nettyInbound.context().channel().alloc()
+ it.handleConnection(allocator, createDataStream(nettyInbound))
}
)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 0775c652..4a2ef6b2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -51,7 +52,9 @@ internal class WireChunkDecoder(
Flux.empty()
} else {
streamBuffer.addComponent(true, byteBuf)
- generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+ generateFrames()
+ .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .doFinally { streamBuffer.discardReadComponents() }
}
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
index ba4c0802..a8414472 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
@@ -63,4 +63,4 @@ fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
getOptionValue(cmdLineOpt.option.opt),
- { System.getenv(cmdLineOpt.environmentVariableName()) }) \ No newline at end of file
+ { System.getenv(cmdLineOpt.environmentVariableName()) })
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
new file mode 100644
index 00000000..714702d3
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import reactor.core.publisher.Flux
+
+fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.debug("Detailed stack trace", ex)
+ return returnFlux
+}