From d632aef8303701a1802f817c3d6fdcd4064c32b2 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 29 Nov 2018 11:58:40 +0100 Subject: Harmonize logging and add new logs - corrected docker-compose consul url Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54 Issue-ID: DCAEGEN2-1003 Signed-off-by: Filip Krzywka --- .../veshv/utils/logging/reactive_logging.kt | 40 +++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) (limited to 'sources/hv-collector-utils/src/main/kotlin/org/onap/dcae') diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 714702d3..e8ec2549 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -19,10 +19,48 @@ */ package org.onap.dcae.collectors.veshv.utils.logging +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try import reactor.core.publisher.Flux +import reactor.core.publisher.Mono fun Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux = Flux.empty()): Flux { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") logger.debug("Detailed stack trace", ex) return returnFlux } + + +fun Try.filterFailedWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: (Throwable) -> String): Flux = + fold({ + logger.warn(rejectedMsg(it)) + Flux.empty() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun Option.filterEmptyWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: () -> String): Flux = + fold({ + logger.warn(rejectedMsg) + Flux.empty() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun Flux.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = + flatMap { t -> + predicate(t).fold({ + logger.warn(it) + Mono.empty() + }, { + logger.trace(it) + Mono.just(t) + }) + } -- cgit 1.2.3-korg