diff options
6 files changed, 85 insertions, 7 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index fd08ba3d..07ce7604 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -21,8 +21,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn +import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicLong * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink { +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, + private val ctx: ClientContext) : Sink { private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { @@ -68,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace(ctx) { + logger.trace(ctx::asMap, Marker.INVOKE) { val msgNum = sentMessages.incrementAndGet() "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 2d29fe99..3fa05c4d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn +import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -74,13 +75,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, clientContext.clientAddress = it.address() } + logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" } return collectorProvider(clientContext).fold( { logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } Mono.empty() }, { - logger.info { "Handling new connection" } + logger.info(clientContext::asMap) { "Handling new connection" } nettyInbound.withConnection { conn -> conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) .logConnectionClosed(clientContext) @@ -106,6 +108,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { + logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." } if (it.isSuccess) logger.debug(ctx) { "Channel closed successfully." } else @@ -115,7 +118,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { - logger.info(ctx) { "Connection has been closed" } + // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled) + logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt index f14a7f65..213b7434 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.model import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.slf4j.MDC import java.net.InetSocketAddress import java.util.* diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 674fb2c3..58d2cb6a 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -12,7 +12,8 @@ %nopexception%50.50logger | %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} | %highlight(%-5level) -| %mdc{clientId} %mdc{clientAddress} +| %mdc +| %marker | %msg | %rootException | %thread%n"/> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 2fb48803..1e5c9c55 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -50,6 +50,9 @@ class Logger(logger: org.slf4j.Logger) { fun error(mdc: MappedDiagnosticContext, message: () -> String) = errorLogger.withMdc(mdc) { log(message()) } + fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + errorLogger.withMdc(mdc) { log(marker, message()) } + // WARN fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() @@ -64,6 +67,8 @@ class Logger(logger: org.slf4j.Logger) { fun warn(mdc: MappedDiagnosticContext, message: () -> String) = warnLogger.withMdc(mdc) { log(message()) } + fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + warnLogger.withMdc(mdc) { log(marker, message()) } // INFO @@ -79,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) { fun info(mdc: MappedDiagnosticContext, message: () -> String) = infoLogger.withMdc(mdc) { log(message()) } + fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + infoLogger.withMdc(mdc) { log(marker, message()) } + // DEBUG fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block() @@ -93,6 +101,8 @@ class Logger(logger: org.slf4j.Logger) { fun debug(mdc: MappedDiagnosticContext, message: () -> String) = debugLogger.withMdc(mdc) { log(message()) } + fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + debugLogger.withMdc(mdc) { log(marker, message()) } // TRACE @@ -108,11 +118,15 @@ class Logger(logger: org.slf4j.Logger) { fun trace(mdc: MappedDiagnosticContext, message: () -> String) = traceLogger.withMdc(mdc) { log(message()) } + fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + traceLogger.withMdc(mdc) { log(marker, message()) } + } abstract class AtLevelLogger { abstract fun log(message: String) abstract fun log(message: String, t: Throwable) + abstract fun log(marker: Marker, message: String) open val enabled: Boolean get() = true @@ -138,8 +152,13 @@ object OffLevelLogger : AtLevelLogger() { override fun log(message: String, t: Throwable) { // do not log anything } + + override fun log(marker: Marker, message: String) { + // do not log anything + } } +@Suppress("SuboptimalLoggerUsage") class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String) { logger.error(message) @@ -148,8 +167,13 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String, t: Throwable) { logger.error(message, t) } + + override fun log(marker: Marker, message: String) { + logger.error(marker(), message) + } } +@Suppress("SuboptimalLoggerUsage") class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String) { logger.warn(message) @@ -158,8 +182,13 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String, t: Throwable) { logger.warn(message, t) } + + override fun log(marker: Marker, message: String) { + logger.warn(marker(), message) + } } +@Suppress("SuboptimalLoggerUsage") class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String) { logger.info(message) @@ -168,8 +197,13 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String, t: Throwable) { logger.info(message, t) } + + override fun log(marker: Marker, message: String) { + logger.info(marker(), message) + } } +@Suppress("SuboptimalLoggerUsage") class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String) { logger.debug(message) @@ -178,8 +212,13 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String, t: Throwable) { logger.debug(message, t) } + + override fun log(marker: Marker, message: String) { + logger.debug(marker(), message) + } } +@Suppress("SuboptimalLoggerUsage") class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String) { logger.trace(message) @@ -188,4 +227,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { override fun log(message: String, t: Throwable) { logger.trace(message, t) } + + override fun log(marker: Marker, message: String) { + logger.trace(marker(), message) + } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt new file mode 100644 index 00000000..83fb9a5e --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.logging + +import org.slf4j.MarkerFactory + +enum class Marker(private val marker: org.slf4j.Marker) { + ENTRY(MarkerFactory.getMarker("ENTRY")), + EXIT(MarkerFactory.getMarker("EXIT")), + INVOKE(MarkerFactory.getMarker("INVOKE")); + + operator fun invoke() = marker +} |