diff options
Diffstat (limited to 'sources')
3 files changed, 173 insertions, 67 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index adc629bc..0d07d167 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,18 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.None -import arrow.core.Option +import arrow.core.Try import arrow.core.getOrElse import arrow.effects.IO -import arrow.syntax.collections.firstOption -import io.netty.handler.ssl.SslHandler +import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Mono -import reactor.netty.ByteBufFlux import reactor.netty.Connection import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer -import java.security.cert.X509Certificate +import java.net.InetAddress import java.time.Duration -import javax.net.ssl.SSLSession /** @@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { - metrics.notifyClientConnected() - val clientContext = ClientContext(nettyOutbound.alloc()) - nettyInbound.withConnection { - populateClientContext(clientContext, it) - it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> - sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = + messageHandlingStream(nettyInbound, nettyOutbound).run { + subscribe() + nettyOutbound.neverComplete() } - } - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } - messageHandlingStream(clientContext, nettyInbound).subscribe() - return nettyOutbound.neverComplete() - } + private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = + withNewClientContextFrom(nettyInbound, nettyOutbound) + { clientContext -> + logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + + clientContext.clientAddress + .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } + .getOrElse { + logger.warn(clientContext::fullMdc) { + "Client address could not be resolved. Discarding connection" + } + nettyInbound.closeConnectionAndReturn(Mono.empty()) + } + } - private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> = - collectorProvider(clientContext).fold( + private fun acceptIfNotLocalConnection(address: InetAddress, + clientContext: ClientContext, + nettyInbound: NettyInbound): Mono<Void> = + if (address.isLocalClientAddress()) { + logger.debug(clientContext) { + "Client address resolved to localhost. Discarding connection as suspected healthcheck" + } + nettyInbound.closeConnectionAndReturn(Mono.empty<Void>()) + } else { + acceptClientConnection(clientContext, nettyInbound) + } + + private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { + metrics.notifyClientConnected() + logger.info(clientContext::fullMdc) { "Handling new client connection" } + return collectorProvider(clientContext).fold( { - logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." } - Mono.empty() + logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } + nettyInbound.closeConnectionAndReturn(Mono.empty<Void>()) }, - { - logger.info(clientContext::fullMdc) { "Handling new connection" } - nettyInbound.withConnection { conn -> - conn - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) - .logConnectionClosed(clientContext) - } - it.handleConnection(createDataStream(nettyInbound)) - } + handleClient(clientContext, nettyInbound) ) - - private fun populateClientContext(clientContext: ClientContext, connection: Connection) { - clientContext.clientAddress = try { - Option.fromNullable(connection.address().address) - } catch (ex: Exception) { - None - } - clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert) } - private fun getSslSession(connection: Connection) = Option.fromNullable( + private fun handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector -> + withConnectionFrom(nettyInbound) { connection -> connection - .channel() - .pipeline() - .get(SslHandler::class.java) - ?.engine() - ?.session) - - private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> = - sslSession - .peerCertificates - .firstOption() - .flatMap { Option.fromNullable(it as? X509Certificate) } - - private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .receive() - .retain() + .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + collector.handleConnection(nettyInbound.createDataStream()) + } + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { @@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, disconnectClient(ctx) } - - private fun Connection.disconnectClient(ctx: ClientContext) { - channel().close().addListener { - logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." } - if (it.isSuccess) - logger.debug(ctx) { "Channel closed successfully." } - else - logger.withWarn(ctx) { log("Channel close failed", it.cause()) } - } - } + private fun Connection.disconnectClient(ctx: ClientContext) = + closeChannelAndThen { + if (it.isSuccess) + logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." } + else + logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause()) + } private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = onDispose { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt new file mode 100644 index 00000000..91f502e6 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.socket + +import arrow.core.Option +import arrow.core.Try +import arrow.syntax.collections.firstOption +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.Future +import org.onap.dcae.collectors.veshv.model.ClientContext +import reactor.core.publisher.Mono +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import java.net.InetAddress +import java.security.cert.X509Certificate +import javax.net.ssl.SSLSession + +internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost" + +internal fun Connection.getSslSession(): Option<SSLSession> = + Option.fromNullable( + channel() + .pipeline() + .get(SslHandler::class.java) + ?.engine() + ?.session + ) + +internal fun SSLSession.findClientCert(): Option<X509Certificate> = + peerCertificates + .firstOption() + .flatMap { Option.fromNullable(it as? X509Certificate) } + +internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) = + nettyInboud.withConnection(task) + +internal fun Connection.closeChannel() = channel().close() + +internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) = + closeChannel().addListener { task(it) } + +internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T = + withConnectionFrom(this) { it.closeChannel() }.let { returnValue } + +internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain() + +// +// ClientContext related +// + +internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound, + nettyOutbound: NettyOutbound, + reactiveTask: (ClientContext) -> Mono<Void>) = + ClientContext(nettyOutbound.alloc()) + .also { populateClientContextFromInbound(it, nettyInbound) } + .run(reactiveTask) + +internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + clientContext.clientAddress = Try { connection.address().address }.toOption() + clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } + }
\ No newline at end of file diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index ade9b480..7fcc73a0 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -64,6 +64,9 @@ class Logger(logger: org.slf4j.Logger) { fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = errorLogger.withMdc(mdc) { log(marker, message()) } + fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + errorLogger.withMdc(mdc) { log(marker, message(), t) } + // WARN fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() @@ -81,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) { fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = warnLogger.withMdc(mdc) { log(marker, message()) } + fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + warnLogger.withMdc(mdc) { log(marker, message(), t) } + // INFO fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() @@ -144,6 +150,7 @@ abstract class AtLevelLogger { abstract fun log(message: String) abstract fun log(message: String, t: Throwable) abstract fun log(marker: Marker, message: String) + abstract fun log(marker: Marker, message: String, t: Throwable) open val enabled: Boolean get() = true @@ -187,6 +194,10 @@ object OffLevelLogger : AtLevelLogger() { override fun log(marker: Marker, message: String) { // do not log anything } + + override fun log(marker: Marker, message: String, t: Throwable) { + // do no log anything + } } @Suppress("SuboptimalLoggerUsage") @@ -203,6 +214,11 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.error(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.error(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -219,6 +235,11 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.warn(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.warn(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -235,6 +256,11 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.info(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.info(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -251,6 +277,11 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.debug(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.debug(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -267,4 +298,9 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.trace(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.trace(marker.slf4jMarker, message, t) + } } |