aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-12-17 16:03:10 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2018-12-18 12:27:09 +0100
commit4ab95420e42f6df59bd4851eee41be6579bdbbe1 (patch)
tree09b81b772a4050d92c90bf6a6e09ecdf6c6db402 /sources/hv-collector-core/src/main
parent30488f1922f789c5b8e18934456968aa354c9671 (diff)
Add metrics for active connections count
* Fix and refactor gauges tests in MicrometerMetricsTests as they were not executing * Fix client disconnection handler in NettyTcpServer * Add metrics gauge and counters required to measure active connections Change-Id: I5620d398525c6859679cd5a49dc55a9fefd8b592 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1041
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt51
3 files changed, 31 insertions, 29 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 61d28c2b..ac55e55f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -36,6 +36,8 @@ interface Metrics {
fun notifyMessageReceived(msg: WireFrameMessage)
fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
+ fun notifyClientDisconnected()
+ fun notifyClientConnected()
fun notifyClientRejected(cause: ClientRejectionCause)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index dce933ab..2e6bb4dc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -30,6 +31,8 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
* @since May 2018
*/
object ServerFactory {
- fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server =
- NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider)
+ fun createNettyTcpServer(serverConfiguration: ServerConfiguration,
+ collectorProvider: CollectorProvider,
+ metrics: Metrics): Server =
+ NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider, metrics)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index d8d786be..725622f7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -22,31 +22,30 @@ package org.onap.dcae.collectors.veshv.impl.socket
import arrow.core.None
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.core.toOption
import arrow.effects.IO
import arrow.syntax.collections.firstOption
import io.netty.handler.ssl.SslHandler
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Mono
import reactor.netty.ByteBufFlux
import reactor.netty.Connection
import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
-import java.time.Duration
-import java.lang.Exception
import java.security.cert.X509Certificate
+import java.time.Duration
import javax.net.ssl.SSLSession
@@ -56,15 +55,15 @@ import javax.net.ssl.SSLSession
*/
internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val sslContextFactory: ServerSslContextFactory,
- private val collectorProvider: CollectorProvider) : Server {
+ private val collectorProvider: CollectorProvider,
+ private val metrics: Metrics) : Server {
override fun start(): IO<ServerHandle> = IO {
- val tcpServer = TcpServer.create()
+ TcpServer.create()
.addressSupplier { serverConfig.serverListenAddress }
.configureSsl()
.handle(this::handleConnection)
-
- NettyServerHandle(tcpServer.bindNow())
+ .let { NettyServerHandle(it.bindNow()) }
}
private fun TcpServer.configureSsl() =
@@ -79,13 +78,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
}
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ metrics.notifyClientConnected()
val clientContext = ClientContext(nettyOutbound.alloc())
nettyInbound.withConnection {
populateClientContext(clientContext, it)
it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
}
-
}
logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
@@ -97,7 +96,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
{
logger.info(clientContext::fullMdc) { "Handling new connection" }
nettyInbound.withConnection { conn ->
- conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ conn
+ .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
}
it.handleConnection(createDataStream(nettyInbound))
@@ -132,15 +132,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
.receive()
.retain()
- private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
- onReadIdle(timeout.toMillis()) {
- logger.info(ctx) {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
+ onReadIdle(timeout.toMillis()) {
+ logger.info(ctx) {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+ }
+ disconnectClient(ctx)
}
- disconnectClient(ctx)
- }
- return this
- }
+
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
@@ -152,13 +151,11 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
}
}
- private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
- onTerminate().subscribe {
- // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
- logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
- }
- return this
- }
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
+ onDispose {
+ metrics.notifyClientDisconnected()
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
+ }
companion object {
private val logger = Logger(NettyTcpServer::class)