diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 8 |
1 files changed, 3 insertions, 5 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 433e4d57..618b818f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -57,8 +57,6 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here - .handleErrors() .transform(::route) .handleErrors() .doFinally { releaseBuffersMemory() } @@ -106,14 +104,14 @@ internal class VesHvCollector( } } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } + private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume { metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) logger.handleReactiveStreamError(clientContext, it) } - private fun releaseBuffersMemory() = wireChunkDecoder.release() - .also { logger.debug { "Released buffer memory after handling message stream" } } - private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> = filterFailedWithLog(logger, clientContext::fullMdc, predicate) |