summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils/src/main/kotlin/org/onap
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-11-29 11:58:40 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2018-12-04 13:31:17 +0100
commitd632aef8303701a1802f817c3d6fdcd4064c32b2 (patch)
tree70614ef073f437810beea848c9f9a81189b794d8 /sources/hv-collector-utils/src/main/kotlin/org/onap
parentdde383a2aa75f94c26d7949665b79cc95486a223 (diff)
Harmonize logging and add new logs
- corrected docker-compose consul url Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54 Issue-ID: DCAEGEN2-1003 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-utils/src/main/kotlin/org/onap')
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt40
1 files changed, 39 insertions, 1 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index 714702d3..e8ec2549 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -19,10 +19,48 @@
*/
package org.onap.dcae.collectors.veshv.utils.logging
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
logger.debug("Detailed stack trace", ex)
return returnFlux
}
+
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: (Throwable) -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg(it))
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: () -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg)
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+ flatMap { t ->
+ predicate(t).fold({
+ logger.warn(it)
+ Mono.empty<T>()
+ }, {
+ logger.trace(it)
+ Mono.just<T>(t)
+ })
+ }