aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-29 14:52:25 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-04-02 10:12:04 +0200
commit5ddee4d3b85c1b180acb506099c44678edcc57d5 (patch)
tree60d8910b33efa7d163ac6f7d2714de245914b9c9 /sources/hv-collector-core
parent087a6ef92e53452faaaee0872ad5183b08268c30 (diff)
Merge configurations
- changed temporarily HV-VES default log level to DEBUG as in current implementation we are applying LogLevel defined in configuration file only if we successfully retrieve one from configuration-module, which means that inside of this module we are logging on default level (from logback file). This should be fixed in future work - reduced log level on SDK's CbsClientImpl as it's logging frequency was too high Change-Id: If50df18df099c34bfc36d39b045140f9b9ad87f6 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt8
1 files changed, 3 insertions, 5 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 433e4d57..618b818f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -57,8 +57,6 @@ internal class VesHvCollector(
.transform(::filterInvalidWireFrame)
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
- // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
- .handleErrors()
.transform(::route)
.handleErrors()
.doFinally { releaseBuffersMemory() }
@@ -106,14 +104,14 @@ internal class VesHvCollector(
}
}
+ private fun releaseBuffersMemory() = wireChunkDecoder.release()
+ .also { logger.debug { "Released buffer memory after handling message stream" } }
+
private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
logger.handleReactiveStreamError(clientContext, it)
}
- private fun releaseBuffersMemory() = wireChunkDecoder.release()
- .also { logger.debug { "Released buffer memory after handling message stream" } }
-
private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
filterFailedWithLog(logger, clientContext::fullMdc, predicate)