aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt5
3 files changed, 8 insertions, 9 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 8970e03e..b700f135 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -52,8 +53,8 @@ internal class VesHvCollector(
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
- .onErrorResume(::handleErrors)
.then()
}
@@ -81,12 +82,6 @@ internal class VesHvCollector(
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
- logger.debug("Detailed stack trace", ex)
- return Flux.empty()
- }
-
companion object {
private val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index ede5a667..7a47cfc3 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
},
{
logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ val allocator = nettyInbound.context().channel().alloc()
+ it.handleConnection(allocator, createDataStream(nettyInbound))
}
)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 0775c652..4a2ef6b2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -51,7 +52,9 @@ internal class WireChunkDecoder(
Flux.empty()
} else {
streamBuffer.addComponent(true, byteBuf)
- generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+ generateFrames()
+ .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .doFinally { streamBuffer.discardReadComponents() }
}
}