summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-07 14:41:39 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-10 14:46:23 +0100
commit8b8c37c296e55644063e0332fd455437168e78da (patch)
tree36e9d96217346dd4296677cfd8af584c69a0ad05 /sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
parent73293332b2244b66083dc5d3910801c1b1058105 (diff)
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in reactive application, we need to do some work manually. The approach proposed is an explicit MDC handling, which means that context is kept as an object created after establishing client connection. Next, new instance of HvVesCollector (and its dependencies) is created. Every object is propagated with ClientContext so it can use it when calling logger methods. In the future ClientContext might be used to support other use-cases, ie. per-topic access control. As a by-product I had to refactor our Logger wrapper, too. It already had too many functions and after adding MDC number would be doubled. Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe Issue-ID: DCAEGEN2-671 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt60
1 files changed, 34 insertions, 26 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 0b2997fa..2d29fe99 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -23,6 +23,10 @@ import arrow.core.getOrElse
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
sslContextFactory
.createSslContext(serverConfig.securityConfiguration)
.map { sslContext ->
- logger.info("Collector configured with SSL enabled")
+ logger.info { "Collector configured with SSL enabled" }
this.secure { b -> b.sslContext(sslContext) }
}.getOrElse {
- logger.info("Collector configured with SSL disabled")
+ logger.info { "Collector configured with SSL disabled" }
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- collectorProvider().fold(
- {
- nettyInbound.withConnection { conn ->
- logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
- }
- Mono.empty()
- },
- {
- nettyInbound.withConnection { conn ->
- logger.info { "Handling connection from ${conn.address()}" }
- conn.configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- }
- it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ val clientContext = ClientContext(nettyOutbound.alloc())
+ nettyInbound.withConnection {
+ clientContext.clientAddress = it.address()
+ }
+
+ return collectorProvider(clientContext).fold(
+ {
+ logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling new connection" }
+ nettyInbound.withConnection { conn ->
+ conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
}
- )
+ it.handleConnection(createDataStream(nettyInbound))
+ }
+ )
+ }
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
- logger.info {
+ logger.info(ctx) {
"Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
- disconnectClient()
+ disconnectClient(ctx)
}
return this
}
- private fun Connection.disconnectClient() {
+ private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${address()}) closed successfully." }
+ logger.debug(ctx) { "Channel closed successfully." }
else
- logger.warn("Channel close failed", it.cause())
+ logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
}
}
- private fun Connection.logConnectionClosed(): Connection {
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
- logger.info("Connection from ${address()} has been closed")
+ logger.info(ctx) { "Connection has been closed" }
}
return this
}