aboutsummaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-01-24 11:21:26 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-01-29 11:24:31 +0000
commite7204cbcf6af61856330cffc541b6f5c78476a09 (patch)
tree7c5ed27887698b028901d4d2168e65d44df8633c /sources
parent40993732b302ce43ae1dbedbda44cc0113e9b6f2 (diff)
Correct totalConnections metric
In previous implementation performed healthcheck on container was counted as client connection, because metric was notified for every TCP channel opened. This was making this metric less useful (to avoid saying "useless" ;) ). - refactored NettyTcpServer trying to extract functions with logic not so strictly related to HV-VES behavior. This also resolves discussions in https://gerrit.onap.org/r/#/c/76274/ - some renames and methods splitting was made in attempt to make code more readable - hv-ves should not count connections from either "127.0.0.1" or "localhost" to his `totalConnections` metric - removed redundant logging by adding new methods to Logger Change-Id: I5f10dac8dac82eafd3b0de92a7ec43f2c23b8c16 Issue-ID: DCAEGEN2-1119 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt123
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt81
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt36
3 files changed, 173 insertions, 67 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index adc629bc..0d07d167 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,18 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.None
-import arrow.core.Option
+import arrow.core.Try
import arrow.core.getOrElse
import arrow.effects.IO
-import arrow.syntax.collections.firstOption
-import io.netty.handler.ssl.SslHandler
+import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
import reactor.netty.Connection
import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
-import java.security.cert.X509Certificate
+import java.net.InetAddress
import java.time.Duration
-import javax.net.ssl.SSLSession
/**
@@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
- metrics.notifyClientConnected()
- val clientContext = ClientContext(nettyOutbound.alloc())
- nettyInbound.withConnection {
- populateClientContext(clientContext, it)
- it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
- sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ messageHandlingStream(nettyInbound, nettyOutbound).run {
+ subscribe()
+ nettyOutbound.neverComplete()
}
- }
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
- messageHandlingStream(clientContext, nettyInbound).subscribe()
- return nettyOutbound.neverComplete()
- }
+ private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ withNewClientContextFrom(nettyInbound, nettyOutbound)
+ { clientContext ->
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+
+ clientContext.clientAddress
+ .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
+ .getOrElse {
+ logger.warn(clientContext::fullMdc) {
+ "Client address could not be resolved. Discarding connection"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty())
+ }
+ }
- private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> =
- collectorProvider(clientContext).fold(
+ private fun acceptIfNotLocalConnection(address: InetAddress,
+ clientContext: ClientContext,
+ nettyInbound: NettyInbound): Mono<Void> =
+ if (address.isLocalClientAddress()) {
+ logger.debug(clientContext) {
+ "Client address resolved to localhost. Discarding connection as suspected healthcheck"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
+ } else {
+ acceptClientConnection(clientContext, nettyInbound)
+ }
+
+ private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
+ metrics.notifyClientConnected()
+ logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
- Mono.empty()
+ logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
},
- {
- logger.info(clientContext::fullMdc) { "Handling new connection" }
- nettyInbound.withConnection { conn ->
- conn
- .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
- .logConnectionClosed(clientContext)
- }
- it.handleConnection(createDataStream(nettyInbound))
- }
+ handleClient(clientContext, nettyInbound)
)
-
- private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
- clientContext.clientAddress = try {
- Option.fromNullable(connection.address().address)
- } catch (ex: Exception) {
- None
- }
- clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
}
- private fun getSslSession(connection: Connection) = Option.fromNullable(
+ private fun handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+ withConnectionFrom(nettyInbound) { connection ->
connection
- .channel()
- .pipeline()
- .get(SslHandler::class.java)
- ?.engine()
- ?.session)
-
- private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
- sslSession
- .peerCertificates
- .firstOption()
- .flatMap { Option.fromNullable(it as? X509Certificate) }
-
- private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .receive()
- .retain()
+ .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ collector.handleConnection(nettyInbound.createDataStream())
+ }
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
@@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
disconnectClient(ctx)
}
-
- private fun Connection.disconnectClient(ctx: ClientContext) {
- channel().close().addListener {
- logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
- if (it.isSuccess)
- logger.debug(ctx) { "Channel closed successfully." }
- else
- logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
- }
- }
+ private fun Connection.disconnectClient(ctx: ClientContext) =
+ closeChannelAndThen {
+ if (it.isSuccess)
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
+ else
+ logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
+ }
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
onDispose {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
new file mode 100644
index 00000000..91f502e6
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.socket
+
+import arrow.core.Option
+import arrow.core.Try
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
+import io.netty.util.concurrent.Future
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import reactor.core.publisher.Mono
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import java.net.InetAddress
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
+internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
+
+internal fun Connection.getSslSession(): Option<SSLSession> =
+ Option.fromNullable(
+ channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session
+ )
+
+internal fun SSLSession.findClientCert(): Option<X509Certificate> =
+ peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
+internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
+ nettyInboud.withConnection(task)
+
+internal fun Connection.closeChannel() = channel().close()
+
+internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
+ closeChannel().addListener { task(it) }
+
+internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
+ withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
+
+internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
+
+//
+// ClientContext related
+//
+
+internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
+ nettyOutbound: NettyOutbound,
+ reactiveTask: (ClientContext) -> Mono<Void>) =
+ ClientContext(nettyOutbound.alloc())
+ .also { populateClientContextFromInbound(it, nettyInbound) }
+ .run(reactiveTask)
+
+internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ clientContext.clientAddress = Try { connection.address().address }.toOption()
+ clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
+ } \ No newline at end of file
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index ade9b480..7fcc73a0 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -64,6 +64,9 @@ class Logger(logger: org.slf4j.Logger) {
fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
errorLogger.withMdc(mdc) { log(marker, message()) }
+ fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+ errorLogger.withMdc(mdc) { log(marker, message(), t) }
+
// WARN
fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
@@ -81,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) {
fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
warnLogger.withMdc(mdc) { log(marker, message()) }
+ fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+ warnLogger.withMdc(mdc) { log(marker, message(), t) }
+
// INFO
fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
@@ -144,6 +150,7 @@ abstract class AtLevelLogger {
abstract fun log(message: String)
abstract fun log(message: String, t: Throwable)
abstract fun log(marker: Marker, message: String)
+ abstract fun log(marker: Marker, message: String, t: Throwable)
open val enabled: Boolean
get() = true
@@ -187,6 +194,10 @@ object OffLevelLogger : AtLevelLogger() {
override fun log(marker: Marker, message: String) {
// do not log anything
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) {
+ // do no log anything
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -203,6 +214,11 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.error(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.error(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -219,6 +235,11 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.warn(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.warn(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -235,6 +256,11 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.info(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.info(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -251,6 +277,11 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.debug(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.debug(marker.slf4jMarker, message, t)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -267,4 +298,9 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
withAdditionalMdc(marker.mdc) {
logger.trace(marker.slf4jMarker, message)
}
+
+ override fun log(marker: Marker, message: String, t: Throwable) =
+ withAdditionalMdc(marker.mdc) {
+ logger.trace(marker.slf4jMarker, message, t)
+ }
}