diff options
author | 2018-12-07 14:41:39 +0100 | |
---|---|---|
committer | 2018-12-10 14:46:23 +0100 | |
commit | 8b8c37c296e55644063e0332fd455437168e78da (patch) | |
tree | 36e9d96217346dd4296677cfd8af584c69a0ad05 /sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt | |
parent | 73293332b2244b66083dc5d3910801c1b1058105 (diff) |
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in
reactive application, we need to do some work manually. The approach
proposed is an explicit MDC handling, which means that context is
kept as an object created after establishing client connection. Next,
new instance of HvVesCollector (and its dependencies) is created. Every
object is propagated with ClientContext so it can use it when calling
logger methods.
In the future ClientContext might be used to support other use-cases,
ie. per-topic access control.
As a by-product I had to refactor our Logger wrapper, too. It already
had too many functions and after adding MDC number would be doubled.
Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe
Issue-ID: DCAEGEN2-671
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt | 60 |
1 files changed, 34 insertions, 26 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0b2997fa..2d29fe99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,6 +23,10 @@ import arrow.core.getOrElse import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> - logger.info("Collector configured with SSL enabled") + logger.info { "Collector configured with SSL enabled" } this.secure { b -> b.sslContext(sslContext) } }.getOrElse { - logger.info("Collector configured with SSL disabled") + logger.info { "Collector configured with SSL disabled" } this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + val clientContext = ClientContext(nettyOutbound.alloc()) + nettyInbound.withConnection { + clientContext.clientAddress = it.address() + } + + return collectorProvider(clientContext).fold( + { + logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + Mono.empty() + }, + { + logger.info { "Handling new connection" } + nettyInbound.withConnection { conn -> + conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) } - ) + it.handleConnection(createDataStream(nettyInbound)) + } + ) + } private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { - logger.info { + logger.info(ctx) { "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } - disconnectClient() + disconnectClient(ctx) } return this } - private fun Connection.disconnectClient() { + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } + logger.debug(ctx) { "Channel closed successfully." } else - logger.warn("Channel close failed", it.cause()) + logger.withWarn(ctx) { log("Channel close failed", it.cause()) } } } - private fun Connection.logConnectionClosed(): Connection { + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") + logger.info(ctx) { "Connection has been closed" } } return this } |