summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils/src/main/kotlin/org
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-utils/src/main/kotlin/org')
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt40
1 files changed, 39 insertions, 1 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index 714702d3..e8ec2549 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -19,10 +19,48 @@
*/
package org.onap.dcae.collectors.veshv.utils.logging
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
logger.debug("Detailed stack trace", ex)
return returnFlux
}
+
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: (Throwable) -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg(it))
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: () -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg)
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+ flatMap { t ->
+ predicate(t).fold({
+ logger.warn(it)
+ Mono.empty<T>()
+ }, {
+ logger.trace(it)
+ Mono.just<T>(t)
+ })
+ }