From 4ab95420e42f6df59bd4851eee41be6579bdbbe1 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Mon, 17 Dec 2018 16:03:10 +0100 Subject: Add metrics for active connections count * Fix and refactor gauges tests in MicrometerMetricsTests as they were not executing * Fix client disconnection handler in NettyTcpServer * Add metrics gauge and counters required to measure active connections Change-Id: I5620d398525c6859679cd5a49dc55a9fefd8b592 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1041 --- .../dcae/collectors/veshv/boundary/adapters.kt | 2 + .../dcae/collectors/veshv/factory/ServerFactory.kt | 7 ++- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 51 ++++++++++------------ 3 files changed, 31 insertions(+), 29 deletions(-) (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 61d28c2b..ac55e55f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -36,6 +36,8 @@ interface Metrics { fun notifyMessageReceived(msg: WireFrameMessage) fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) + fun notifyClientDisconnected() + fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index dce933ab..2e6bb4dc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -30,6 +31,8 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory * @since May 2018 */ object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server = - NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider) + fun createNettyTcpServer(serverConfiguration: ServerConfiguration, + collectorProvider: CollectorProvider, + metrics: Metrics): Server = + NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider, metrics) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index d8d786be..725622f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -22,31 +22,30 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.None import arrow.core.Option import arrow.core.getOrElse -import arrow.core.toOption import arrow.effects.IO import arrow.syntax.collections.firstOption import io.netty.handler.ssl.SslHandler import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Mono import reactor.netty.ByteBufFlux import reactor.netty.Connection import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer -import java.time.Duration -import java.lang.Exception import java.security.cert.X509Certificate +import java.time.Duration import javax.net.ssl.SSLSession @@ -56,15 +55,15 @@ import javax.net.ssl.SSLSession */ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val sslContextFactory: ServerSslContextFactory, - private val collectorProvider: CollectorProvider) : Server { + private val collectorProvider: CollectorProvider, + private val metrics: Metrics) : Server { override fun start(): IO = IO { - val tcpServer = TcpServer.create() + TcpServer.create() .addressSupplier { serverConfig.serverListenAddress } .configureSsl() .handle(this::handleConnection) - - NettyServerHandle(tcpServer.bindNow()) + .let { NettyServerHandle(it.bindNow()) } } private fun TcpServer.configureSsl() = @@ -79,13 +78,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono { + metrics.notifyClientConnected() val clientContext = ClientContext(nettyOutbound.alloc()) nettyInbound.withConnection { populateClientContext(clientContext, it) it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } } - } logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } @@ -97,7 +96,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, { logger.info(clientContext::fullMdc) { "Handling new connection" } nettyInbound.withConnection { conn -> - conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + conn + .configureIdleTimeout(clientContext, serverConfig.idleTimeout) .logConnectionClosed(clientContext) } it.handleConnection(createDataStream(nettyInbound)) @@ -132,15 +132,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, .receive() .retain() - private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { - onReadIdle(timeout.toMillis()) { - logger.info(ctx) { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = + onReadIdle(timeout.toMillis()) { + logger.info(ctx) { + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." + } + disconnectClient(ctx) } - disconnectClient(ctx) - } - return this - } + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { @@ -152,13 +151,11 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } } - private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { - onTerminate().subscribe { - // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled) - logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } - } - return this - } + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = + onDispose { + metrics.notifyClientDisconnected() + logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } + } companion object { private val logger = Logger(NettyTcpServer::class) -- cgit 1.2.3-korg