summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt123
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt81
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt36
3 files changed, 173 insertions, 67 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index adc629bc..0d07d167 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,18 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.None
-import arrow.core.Option
+import arrow.core.Try
import arrow.core.getOrElse
import arrow.effects.IO
-import arrow.syntax.collections.firstOption
-import io.netty.handler.ssl.SslHandler
+import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
import reactor.netty.Connection
import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
-import java.security.cert.X509Certificate
+import java.net.InetAddress
import java.time.Duration
-import javax.net.ssl.SSLSession
/**
@@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
- metrics.notifyClientConnected()
- val clientContext = ClientContext(nettyOutbound.alloc())
- nettyInbound.withConnection {
- populateClientContext(clientContext, it)
- it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
- sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ messageHandlingStream(nettyInbound, nettyOutbound).run {
+ subscribe()
+ nettyOutbound.neverComplete()
}
- }
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
- messageHandlingStream(clientContext, nettyInbound).subscribe()
- return nettyOutbound.neverComplete()
- }
+ private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ withNewClientContextFrom(nettyInbound, nettyOutbound)
+ { clientContext ->
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+
+ clientContext.clientAddress
+ .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
+ .getOrElse {
+ logger.warn(clientContext::fullMdc) {
+ "Client address could not be resolved. Discarding connection"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty())
+ }
+ }
- private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> =
- collectorProvider(clientContext).fold(
+ private fun acceptIfNotLocalConnection(address: InetAddress,
+ clientContext: ClientContext,
+ nettyInbound: NettyInbound): Mono<Void> =
+ if (address.isLocalClientAddress()) {
+ logger.debug(clientContext) {
+ "Client address resolved to localhost. Discarding connection as suspected healthcheck"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
+ } else {
+ acceptClientConnection(clientContext, nettyInbound)
+ }
+
+ private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
+ metrics.notifyClientConnected()
+ logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
- Mono.empty()
+ logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
},
- {
- logger.info(clientContext::fullMdc) { "Handling new connection" }
- nettyInbound.withConnection { conn ->
- conn
- .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
- .logConnectionClosed(clientContext)
- }
- it.handleConnection(createDataStream(nettyInbound))
- }
+ handleClient(clientContext, nettyInbound)
)
-
- private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
- clientContext.clientAddress = try {
- Option.fromNullable(connection.address().address)
- } catch (ex: Exception) {
- None
- }
- clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
}
- private fun getSslSession(connection: Connection) = Option.fromNullable(
+ private fun handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+ withConnectionFrom(nettyInbound) { connection ->
connection
- .channel()
- .pipeline()
- .get(SslHandler::class.java)
- ?.engine()
- ?.session)
-
- private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
- sslSession
- .peerCertificates
- .firstOption()
- .flatMap { Option.fromNullable(it as? X509Certificate) }
-
- private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .receive()
- .retain()
+ .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ collector.handleConnection(nettyInbound.createDataStream())
+ }
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
@@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
disconnectClient(ctx)
}
-
- private fun Connection.disconnectClient(ctx: ClientContext) {
- channel().close().addListener {
- logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
- if (it.isSuccess)
- logger.debug(ctx) { "Channel closed successfully." }
- else
- logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
- }
- }
+ private fun Connection.disconnectClient(ctx: ClientContext) =
+ closeChannelAndThen {
+ if (it.isSuccess)
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
+ else
+ logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
+ }
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
onDispose {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
new file mode 100644
index 00000000..91f502e6
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.socket
+
+import arrow.core.Option
+import arrow.core.Try
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
+import io.netty.util.concurrent.Future
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import reactor.core.publisher.Mono
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import java.net.InetAddress
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
+internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
+
+internal fun Connection.getSslSession(): Option<SSLSession> =
+ Option.fromNullable(
+ channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session
+ )
+
+internal fun SSLSession.findClientCert(): Option<X509Certificate> =
+ peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
+internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
+ nettyInboud.withConnection(task)
+
+internal fun Connection.closeChannel() = channel().close()
+
+internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
+ closeChannel().addListener { task(it) }
+
+internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
+ withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
+
+internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
+
+//
+// ClientContext related
+//
+
+internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
+ nettyOutbound: NettyOutbound,
+ reactiveTask: (ClientContext) -> Mono<Void>) =
+ ClientContext(nettyOutbound.alloc())
+ .also { populateClientContextFromInbound(it, nettyInbound) }
+ .run(reactiveTask)
+
+internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ clientContext.clientAddress = Try { connection.address().address }.toOption()
+ clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
+ } \ No newline at end of file
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index ade9b480..7fcc73a0 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -64,6 +64,9 @@ class Logger(logger: org.slf4j.Logger) {
fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
errorLogger.withMdc(mdc) { log(marker, message()) }
+ fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+ errorLogger.withMdc(mdc) { log(marker, message(), t) }
+
// WARN
fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
@@ -81,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) {
fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
warnLogger.withMdc(mdc) { log(marker, message()) }
+ fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+ warnLogger.withMdc(mdc) { log(marker, message(), t) }
+
// INFO
fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
@@ -144,6 +150,7 @@ abstract class AtLevelLogger {
abstract fun log(message: String)
abstract fun log(message: String, t: Throwable)
abstract fun log(marker: Marker, message: String)
+ abstract fun log(marker: Marker, message: String, t: Throwable)
open val enabled: Boolean
get() = true
@@ -187,6 +194,10 @@ object OffLevelLogger : AtLevelLogger() {
override fun log(marker: Marker, message: String) {
// do not log anything
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) {
+ // do no log anything
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -203,6 +214,11 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.error(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.error(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -219,6 +235,11 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.warn(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.warn(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -235,6 +256,11 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.info(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.info(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -251,6 +277,11 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.debug(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.debug(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -267,4 +298,9 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.trace(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.trace(marker.slf4jMarker, message, t)
+ }
}