From 5ddee4d3b85c1b180acb506099c44678edcc57d5 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 29 Mar 2019 14:52:25 +0100 Subject: Merge configurations - changed temporarily HV-VES default log level to DEBUG as in current implementation we are applying LogLevel defined in configuration file only if we successfully retrieve one from configuration-module, which means that inside of this module we are logging on default level (from logback file). This should be fixed in future work - reduced log level on SDK's CbsClientImpl as it's logging frequency was too high Change-Id: If50df18df099c34bfc36d39b045140f9b9ad87f6 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka --- .../kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'sources/hv-collector-core/src/main/kotlin') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 433e4d57..618b818f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -57,8 +57,6 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here - .handleErrors() .transform(::route) .handleErrors() .doFinally { releaseBuffersMemory() } @@ -106,14 +104,14 @@ internal class VesHvCollector( } } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } + private fun Flux.handleErrors(): Flux = onErrorResume { metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) logger.handleReactiveStreamError(clientContext, it) } - private fun releaseBuffersMemory() = wireChunkDecoder.release() - .also { logger.debug { "Released buffer memory after handling message stream" } } - private fun Flux.filterFailedWithLog(predicate: (T) -> MessageEither): Flux = filterFailedWithLog(logger, clientContext::fullMdc, predicate) -- cgit 1.2.3-korg