diff options
9 files changed, 61 insertions, 28 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 0977595a..6105b585 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 0d07504d..cf73aed8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -33,7 +34,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -56,7 +56,7 @@ internal class VesHvCollector( .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) } + .onErrorResume { logger.handleReactiveStreamError(clientContext, it) } .doFinally { releaseBuffersMemory() } .then() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt new file mode 100644 index 00000000..21b79bbe --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError +import reactor.core.publisher.Flux + +@Suppress("TooManyFunctions") +internal object ClientContextLogging { + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) + + fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable, + returnFlux: Flux<T> = Flux.empty()): Flux<T> { + return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux) + } +} + diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index f6cb018f..ec8593af 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -22,8 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 07ce7604..690a7d1e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 3fa05c4d..6f02d43e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -24,9 +24,9 @@ import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 349b0787..b735138d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -27,10 +27,11 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace import reactor.core.publisher.Flux +import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink /** @@ -46,7 +47,7 @@ internal class WireChunkDecoder( streamBuffer.release() } - fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer { + fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = defer { logIncomingMessage(byteBuf) if (byteBuf.readableBytes() == 0) { byteBuf.release() @@ -54,7 +55,7 @@ internal class WireChunkDecoder( } else { streamBuffer.addComponent(true, byteBuf) generateFrames() - .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) } + .onErrorResume { logger.handleReactiveStreamError(ctx, it, Flux.error(it)) } .doFinally { streamBuffer.discardReadComponents() } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt index 213b7434..305e4cb1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -41,17 +41,3 @@ data class ClientContext( return result } } - -object ClientContextLogging { - fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) - fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) - fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) - fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) - fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) - - fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) - fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) - fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) - fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) - fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) -} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 1e98f2fc..95590d9d 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -34,7 +34,6 @@ fun <T> Logger.handleReactiveStreamError( return returnFlux } - fun <T> Try<T>.filterFailedWithLog(logger: Logger, context: MappedDiagnosticContext, acceptedMsg: (T) -> String, |