summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt1
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml3
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt43
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt30
6 files changed, 85 insertions, 7 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index fd08ba3d..07ce7604 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -21,8 +21,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicLong
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+ private val ctx: ClientContext) : Sink {
private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -68,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx) {
+ logger.trace(ctx::asMap, Marker.INVOKE) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 2d29fe99..3fa05c4d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -74,13 +75,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
clientContext.clientAddress = it.address()
}
+ logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
return collectorProvider(clientContext).fold(
{
logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
Mono.empty()
},
{
- logger.info { "Handling new connection" }
+ logger.info(clientContext::asMap) { "Handling new connection" }
nettyInbound.withConnection { conn ->
conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
@@ -106,6 +108,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
+ logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
if (it.isSuccess)
logger.debug(ctx) { "Channel closed successfully." }
else
@@ -115,7 +118,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
- logger.info(ctx) { "Connection has been closed" }
+ // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
+ logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
}
return this
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
index f14a7f65..213b7434 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.model
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.slf4j.MDC
import java.net.InetSocketAddress
import java.util.*
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 674fb2c3..58d2cb6a 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -12,7 +12,8 @@
%nopexception%50.50logger
| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
| %highlight(%-5level)
-| %mdc{clientId} %mdc{clientAddress}
+| %mdc
+| %marker
| %msg
| %rootException
| %thread%n"/>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index 2fb48803..1e5c9c55 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -50,6 +50,9 @@ class Logger(logger: org.slf4j.Logger) {
fun error(mdc: MappedDiagnosticContext, message: () -> String) =
errorLogger.withMdc(mdc) { log(message()) }
+ fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ errorLogger.withMdc(mdc) { log(marker, message()) }
+
// WARN
fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
@@ -64,6 +67,8 @@ class Logger(logger: org.slf4j.Logger) {
fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
warnLogger.withMdc(mdc) { log(message()) }
+ fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ warnLogger.withMdc(mdc) { log(marker, message()) }
// INFO
@@ -79,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) {
fun info(mdc: MappedDiagnosticContext, message: () -> String) =
infoLogger.withMdc(mdc) { log(message()) }
+ fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ infoLogger.withMdc(mdc) { log(marker, message()) }
+
// DEBUG
fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
@@ -93,6 +101,8 @@ class Logger(logger: org.slf4j.Logger) {
fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
debugLogger.withMdc(mdc) { log(message()) }
+ fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ debugLogger.withMdc(mdc) { log(marker, message()) }
// TRACE
@@ -108,11 +118,15 @@ class Logger(logger: org.slf4j.Logger) {
fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
traceLogger.withMdc(mdc) { log(message()) }
+ fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ traceLogger.withMdc(mdc) { log(marker, message()) }
+
}
abstract class AtLevelLogger {
abstract fun log(message: String)
abstract fun log(message: String, t: Throwable)
+ abstract fun log(marker: Marker, message: String)
open val enabled: Boolean
get() = true
@@ -138,8 +152,13 @@ object OffLevelLogger : AtLevelLogger() {
override fun log(message: String, t: Throwable) {
// do not log anything
}
+
+ override fun log(marker: Marker, message: String) {
+ // do not log anything
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.error(message)
@@ -148,8 +167,13 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String, t: Throwable) {
logger.error(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.error(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.warn(message)
@@ -158,8 +182,13 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String, t: Throwable) {
logger.warn(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.warn(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.info(message)
@@ -168,8 +197,13 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String, t: Throwable) {
logger.info(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.info(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.debug(message)
@@ -178,8 +212,13 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String, t: Throwable) {
logger.debug(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.debug(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.trace(message)
@@ -188,4 +227,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String, t: Throwable) {
logger.trace(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.trace(marker(), message)
+ }
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
new file mode 100644
index 00000000..83fb9a5e
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import org.slf4j.MarkerFactory
+
+enum class Marker(private val marker: org.slf4j.Marker) {
+ ENTRY(MarkerFactory.getMarker("ENTRY")),
+ EXIT(MarkerFactory.getMarker("EXIT")),
+ INVOKE(MarkerFactory.getMarker("INVOKE"));
+
+ operator fun invoke() = marker
+}
</images> </configuration> <executions> <execution> <id>clean-images</id> <phase>pre-clean</phase> <goals> <goal>remove</goal> </goals> <configuration> <removeAll>true</removeAll> </configuration> </execution> <execution> <id>generate-images</id> <phase>generate-sources</phase> <goals> <goal>build</goal> </goals> </execution> <execution> <id>push-images</id> <phase>deploy</phase> <goals> <goal>build</goal> <goal>push</goal> </goals> <configuration> <image>onap/policy-distribution</image> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-deploy-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.onap.policy.distribution</groupId> <artifactId>policy-distribution-tarball</artifactId> <version>${project.version}</version> <classifier>tarball</classifier> <type>tar.gz</type> </dependency> </dependencies> </project>