diff options
Diffstat (limited to 'sources/hv-collector-utils')
2 files changed, 159 insertions, 1 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 714702d3..e8ec2549 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -19,10 +19,48 @@ */ package org.onap.dcae.collectors.veshv.utils.logging +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try import reactor.core.publisher.Flux +import reactor.core.publisher.Mono fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") logger.debug("Detailed stack trace", ex) return returnFlux } + + +fun <T> Try<T>.filterFailedWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: (Throwable) -> String): Flux<T> = + fold({ + logger.warn(rejectedMsg(it)) + Flux.empty<T>() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun <T> Option<T>.filterEmptyWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: () -> String): Flux<T> = + fold({ + logger.warn(rejectedMsg) + Flux.empty<T>() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = + flatMap { t -> + predicate(t).fold({ + logger.warn(it) + Mono.empty<T>() + }, { + logger.trace(it) + Mono.just<T>(t) + }) + } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt new file mode 100644 index 00000000..0f359df3 --- /dev/null +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.logging + +import arrow.core.Either +import arrow.core.Failure +import arrow.core.Option +import arrow.core.Try +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import reactor.core.publisher.Flux +import reactor.test.test +import kotlin.test.fail + +class ReactiveLoggingTest : Spek({ + + describe("filtering with log message") { + val logger = Logger("React") + val event = 5 + + describe("Try") { + given("successful Try") { + val cut = Try.just(event) + + it("should not filter stream event and log accepted message") { + cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + .test() + .expectNext(event) + .verifyComplete() + } + } + + given("failed Try") { + val e = Exception() + val cut = Failure(e) + it("should filter stream event and log rejected message") { + cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + .test() + .verifyComplete() + } + } + } + + describe("Option") { + given("Option with content") { + val cut = Option.just(event) + + it("should not filter stream event and log accepted message") { + cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + .test() + .expectNext(event) + .verifyComplete() + } + } + + given("empty Option") { + val cut = Option.empty<Int>() + it("should filter stream event and log rejected message") { + cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + .test() + .verifyComplete() + } + } + } + + + describe("Either") { + given("successful Either (right)") { + val cut = Flux.just(event) + + it("should not filter stream event and log accepted message") { + cut.filterFailedWithLog(logger, right()) + .test() + .expectNext(event) + .verifyComplete() + } + } + + given("failed Either (left)") { + val cut = Flux.just(event) + + it("should filter stream event and log rejected message") { + cut.filterFailedWithLog(logger, left()) + .test() + .verifyComplete() + } + } + } + } +}) + + +val ACCEPTED_MESSAGE: (Int) -> String = { "SUCCESS" } +val FAILED_MESSAGE: () -> String = { "FAILED" } +val FAILED_WITH_EXCEPTION_MESSAGE: (Throwable) -> String = { "FAILED" } + +private fun right(): (Int) -> Either<() -> String, () -> String> = + { Either.cond(true, { { "SUCCESS" } }, { fail() }) } + +private fun left(): (Int) -> Either<() -> String, () -> String> = + { Either.cond(false, { fail() }, { FAILED_MESSAGE }) } |