diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-01-24 11:21:26 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-01-29 11:24:31 +0000 |
commit | e7204cbcf6af61856330cffc541b6f5c78476a09 (patch) | |
tree | 7c5ed27887698b028901d4d2168e65d44df8633c /sources | |
parent | 40993732b302ce43ae1dbedbda44cc0113e9b6f2 (diff) |
Correct totalConnections metric
In previous implementation performed healthcheck on container
was counted as client connection, because metric was notified
for every TCP channel opened. This was making this metric less useful
(to avoid saying "useless" ;) ).
- refactored NettyTcpServer trying to extract functions with logic
not so strictly related to HV-VES behavior. This also resolves
discussions in https://gerrit.onap.org/r/#/c/76274/
- some renames and methods splitting was made in attempt to make code
more readable
- hv-ves should not count connections from either "127.0.0.1" or
"localhost" to his `totalConnections` metric
- removed redundant logging by adding new methods to Logger
Change-Id: I5f10dac8dac82eafd3b0de92a7ec43f2c23b8c16
Issue-ID: DCAEGEN2-1119
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources')
3 files changed, 173 insertions, 67 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index adc629bc..0d07d167 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,18 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.None -import arrow.core.Option +import arrow.core.Try import arrow.core.getOrElse import arrow.effects.IO -import arrow.syntax.collections.firstOption -import io.netty.handler.ssl.SslHandler +import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Mono -import reactor.netty.ByteBufFlux import reactor.netty.Connection import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer -import java.security.cert.X509Certificate +import java.net.InetAddress import java.time.Duration -import javax.net.ssl.SSLSession /** @@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { - metrics.notifyClientConnected() - val clientContext = ClientContext(nettyOutbound.alloc()) - nettyInbound.withConnection { - populateClientContext(clientContext, it) - it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> - sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = + messageHandlingStream(nettyInbound, nettyOutbound).run { + subscribe() + nettyOutbound.neverComplete() } - } - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } - messageHandlingStream(clientContext, nettyInbound).subscribe() - return nettyOutbound.neverComplete() - } + private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = + withNewClientContextFrom(nettyInbound, nettyOutbound) + { clientContext -> + logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + + clientContext.clientAddress + .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } + .getOrElse { + logger.warn(clientContext::fullMdc) { + "Client address could not be resolved. Discarding connection" + } + nettyInbound.closeConnectionAndReturn(Mono.empty()) + } + } - private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> = - collectorProvider(clientContext).fold( + private fun acceptIfNotLocalConnection(address: InetAddress, + clientContext: ClientContext, + nettyInbound: NettyInbound): Mono<Void> = + if (address.isLocalClientAddress()) { + logger.debug(clientContext) { + "Client address resolved to localhost. Discarding connection as suspected healthcheck" + } + nettyInbound.closeConnectionAndReturn(Mono.empty<Void>()) + } else { + acceptClientConnection(clientContext, nettyInbound) + } + + private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { + metrics.notifyClientConnected() + logger.info(clientContext::fullMdc) { "Handling new client connection" } + return collectorProvider(clientContext).fold( { - logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." } - Mono.empty() + logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } + nettyInbound.closeConnectionAndReturn(Mono.empty<Void>()) }, - { - logger.info(clientContext::fullMdc) { "Handling new connection" } - nettyInbound.withConnection { conn -> - conn - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) - .logConnectionClosed(clientContext) - } - it.handleConnection(createDataStream(nettyInbound)) - } + handleClient(clientContext, nettyInbound) ) - - private fun populateClientContext(clientContext: ClientContext, connection: Connection) { - clientContext.clientAddress = try { - Option.fromNullable(connection.address().address) - } catch (ex: Exception) { - None - } - clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert) } - private fun getSslSession(connection: Connection) = Option.fromNullable( + private fun handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector -> + withConnectionFrom(nettyInbound) { connection -> connection - .channel() - .pipeline() - .get(SslHandler::class.java) - ?.engine() - ?.session) - - private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> = - sslSession - .peerCertificates - .firstOption() - .flatMap { Option.fromNullable(it as? X509Certificate) } - - private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .receive() - .retain() + .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + collector.handleConnection(nettyInbound.createDataStream()) + } + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { @@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, disconnectClient(ctx) } - - private fun Connection.disconnectClient(ctx: ClientContext) { - channel().close().addListener { - logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." } - if (it.isSuccess) - logger.debug(ctx) { "Channel closed successfully." } - else - logger.withWarn(ctx) { log("Channel close failed", it.cause()) } - } - } + private fun Connection.disconnectClient(ctx: ClientContext) = + closeChannelAndThen { + if (it.isSuccess) + logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." } + else + logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause()) + } private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = onDispose { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt new file mode 100644 index 00000000..91f502e6 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.socket + +import arrow.core.Option +import arrow.core.Try +import arrow.syntax.collections.firstOption +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.Future +import org.onap.dcae.collectors.veshv.model.ClientContext +import reactor.core.publisher.Mono +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import java.net.InetAddress +import java.security.cert.X509Certificate +import javax.net.ssl.SSLSession + +internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost" + +internal fun Connection.getSslSession(): Option<SSLSession> = + Option.fromNullable( + channel() + .pipeline() + .get(SslHandler::class.java) + ?.engine() + ?.session + ) + +internal fun SSLSession.findClientCert(): Option<X509Certificate> = + peerCertificates + .firstOption() + .flatMap { Option.fromNullable(it as? X509Certificate) } + +internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) = + nettyInboud.withConnection(task) + +internal fun Connection.closeChannel() = channel().close() + +internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) = + closeChannel().addListener { task(it) } + +internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T = + withConnectionFrom(this) { it.closeChannel() }.let { returnValue } + +internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain() + +// +// ClientContext related +// + +internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound, + nettyOutbound: NettyOutbound, + reactiveTask: (ClientContext) -> Mono<Void>) = + ClientContext(nettyOutbound.alloc()) + .also { populateClientContextFromInbound(it, nettyInbound) } + .run(reactiveTask) + +internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + clientContext.clientAddress = Try { connection.address().address }.toOption() + clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } + }
\ No newline at end of file diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index ade9b480..7fcc73a0 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -64,6 +64,9 @@ class Logger(logger: org.slf4j.Logger) { fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = errorLogger.withMdc(mdc) { log(marker, message()) } + fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + errorLogger.withMdc(mdc) { log(marker, message(), t) } + // WARN fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() @@ -81,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) { fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = warnLogger.withMdc(mdc) { log(marker, message()) } + fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + warnLogger.withMdc(mdc) { log(marker, message(), t) } + // INFO fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() @@ -144,6 +150,7 @@ abstract class AtLevelLogger { abstract fun log(message: String) abstract fun log(message: String, t: Throwable) abstract fun log(marker: Marker, message: String) + abstract fun log(marker: Marker, message: String, t: Throwable) open val enabled: Boolean get() = true @@ -187,6 +194,10 @@ object OffLevelLogger : AtLevelLogger() { override fun log(marker: Marker, message: String) { // do not log anything } + + override fun log(marker: Marker, message: String, t: Throwable) { + // do no log anything + } } @Suppress("SuboptimalLoggerUsage") @@ -203,6 +214,11 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.error(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.error(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -219,6 +235,11 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.warn(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.warn(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -235,6 +256,11 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.info(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.info(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -251,6 +277,11 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.debug(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.debug(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -267,4 +298,9 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.trace(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.trace(marker.slf4jMarker, message, t) + } } |