From 6ca062752bcf57af7f543bd33f372b48e2010a24 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 13 Dec 2018 09:26:36 +0100 Subject: Add all required and reasonable MDCs Change-Id: I34beb32a7c53da97c6945ec8d0022ac37059b7c5 Issue-ID: DCAEGEN2-670 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 6 +-- .../veshv/impl/adapters/ClientContextLogging.kt | 22 +++++----- .../impl/adapters/ConsulConfigurationProvider.kt | 37 +++++++++++------ .../collectors/veshv/impl/adapters/HttpAdapter.kt | 37 +++++++++-------- .../veshv/impl/adapters/kafka/KafkaSink.kt | 2 +- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 48 +++++++++++++++++++--- .../dcae/collectors/veshv/model/ClientContext.kt | 40 +++++++++++++----- .../dcae/collectors/veshv/model/ServiceContext.kt | 45 ++++++++++++++++++++ 8 files changed, 176 insertions(+), 61 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt (limited to 'sources/hv-collector-core/src/main/kotlin') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index cf73aed8..ca1605e6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -74,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, clientContext::asMap, + .filterFailedWithLog(logger, clientContext::fullMdc, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -88,7 +88,7 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, clientContext::asMap, + .filterEmptyWithLog(logger, clientContext::fullMdc, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) @@ -96,7 +96,7 @@ internal class VesHvCollector( .also { logger.debug { "Released buffer memory after handling message stream" } } fun Flux.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, clientContext::asMap, predicate) + filterFailedWithLog(logger, clientContext::fullMdc, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt index 21b79bbe..954de978 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt @@ -27,21 +27,21 @@ import reactor.core.publisher.Flux @Suppress("TooManyFunctions") internal object ClientContextLogging { - fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) - fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) - fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) - fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) - fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block) - fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) - fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) - fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) - fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) - fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message) fun Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable, returnFlux: Flux = Flux.empty()): Flux { - return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux) + return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index bbaa47c4..14d511be 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -24,13 +24,16 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter import reactor.retry.Retry import java.io.StringReader import java.time.Duration +import java.util.* import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -52,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference = AtomicReference(0) private val retry = retrySpec .doOnRetry { - logger.withWarn { log("Could not get fresh configuration", it.exception()) } + logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) } healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } @@ -77,17 +80,26 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, .map(::createCollectorConfiguration) .retryWhen(retry) - private fun askForConfig(): Mono = http.get(url) + private fun askForConfig(): Mono = Mono.defer { + val invocationId = UUID.randomUUID() + http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) } + } - private fun filterDifferentValues(configurationString: String) = - hashOf(configurationString).let { - if (it == lastConfigurationHash.get()) { - logger.trace { "No change detected in consul configuration" } - Mono.empty() - } else { - logger.info { "Obtained new configuration from consul:\n${configurationString}" } - lastConfigurationHash.set(it) - Mono.just(configurationString) + private fun filterDifferentValues(configuration: BodyWithInvocationId) = + configuration.body.let { configurationString -> + hashOf(configurationString).let { + if (it == lastConfigurationHash.get()) { + logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { + "No change detected in consul configuration" + } + Mono.empty() + } else { + logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { + "Obtained new configuration from consul:\n${configurationString}" + } + lastConfigurationHash.set(it) + Mono.just(configurationString) + } } } @@ -119,5 +131,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) } -} + private data class BodyWithInvocationId(val body: String, val invocationId: UUID) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 3fefc6e8..51f7410b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient +import java.util.* /** * @author Jakub Dudycz @@ -31,21 +33,23 @@ import reactor.netty.http.client.HttpClient */ open class HttpAdapter(private val httpClient: HttpClient) { - open fun get(url: String, queryParams: Map = emptyMap()): Mono = httpClient - .get() - .uri(url + createQueryString(queryParams)) - .responseSingle { response, content -> - if (response.status().codeClass() == HttpStatusClass.SUCCESS) - content.asString() - else { - val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}" - Mono.error(IllegalStateException(errorMessage)) - } - } - .doOnError { - logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } - logger.withDebug { log("Nested exception:", it) } - } + open fun get(url: String, invocationId: UUID, queryParams: Map = emptyMap()): Mono = + httpClient + .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() } + .get() + .uri(url + createQueryString(queryParams)) + .responseSingle { response, content -> + if (response.status().codeClass() == HttpStatusClass.SUCCESS) + content.asString() + else { + val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}" + Mono.error(IllegalStateException(errorMessage)) + } + } + .doOnError { + logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } + logger.withDebug { log("Nested exception:", it) } + } private fun createQueryString(params: Map): String { if (params.isEmpty()) @@ -65,8 +69,7 @@ open class HttpAdapter(private val httpClient: HttpClient) { } companion object { - - private val logger = Logger(HttpAdapter::class) + const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 690a7d1e..b4f9a90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender @@ -72,17 +81,21 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono { val clientContext = ClientContext(nettyOutbound.alloc()) nettyInbound.withConnection { - clientContext.clientAddress = it.address() + populateClientContext(clientContext, it) + it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> + sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } + } + } - logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" } + logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } return collectorProvider(clientContext).fold( { - logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." } Mono.empty() }, { - logger.info(clientContext::asMap) { "Handling new connection" } + logger.info(clientContext::fullMdc) { "Handling new connection" } nettyInbound.withConnection { conn -> conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) .logConnectionClosed(clientContext) @@ -92,6 +105,29 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, ) } + private fun populateClientContext(clientContext: ClientContext, connection: Connection) { + clientContext.clientAddress = try { + Option.fromNullable(connection.address().address) + } catch (ex: Exception) { + None + } + clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert) + } + + private fun getSslSession(connection: Connection) = Option.fromNullable( + connection + .channel() + .pipeline() + .get(SslHandler::class.java) + ?.engine() + ?.session) + + private fun findClientCert(sslSession: SSLSession): Option = + sslSession + .peerCertificates + .firstOption() + .flatMap { Option.fromNullable(it as? X509Certificate) } + private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() @@ -108,7 +144,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { - logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." } + logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." } if (it.isSuccess) logger.debug(ctx) { "Channel closed successfully." } else @@ -119,7 +155,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled) - logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" } + logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt index 305e4cb1..7b082e64 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -19,10 +19,13 @@ */ package org.onap.dcae.collectors.veshv.model +import arrow.core.None +import arrow.core.Option +import arrow.core.getOrElse import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.net.InetSocketAddress +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import java.net.InetAddress +import java.security.cert.X509Certificate import java.util.* /** @@ -31,13 +34,28 @@ import java.util.* */ data class ClientContext( val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, - val clientId: String = UUID.randomUUID().toString(), - var clientAddress: InetSocketAddress? = null) { - fun asMap(): Map { - val result = mutableMapOf("clientId" to clientId) - if (clientAddress != null) { - result["clientAddress"] = clientAddress.toString() - } - return result + var clientAddress: Option = None, + var clientCert: Option = None, + val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP + val invocationId: String = UUID.randomUUID().toString()) { + + val mdc: Map + get() = mapOf( + OnapMdc.REQUEST_ID to requestId, + OnapMdc.INVOCATION_ID to invocationId, + OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE, + OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE }, + OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE } + ) + + val fullMdc: Map + get() = mdc + ServiceContext.mdc + + private fun clientDn(): Option = clientCert.map { it.subjectX500Principal.toString() } + private fun clientIp(): Option = clientAddress.map(InetAddress::getHostAddress) + + companion object { + const val DEFAULT_STATUS_CODE = "INPROGRESS" + const val DEFAULT_VALUE = "" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt new file mode 100644 index 00000000..2407eced --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import java.net.InetAddress +import java.net.UnknownHostException +import java.util.* + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +object ServiceContext { + val instanceId = UUID.randomUUID().toString() + val serverFqdn = getHost().hostName + + val mdc = mapOf( + OnapMdc.INSTANCE_ID to instanceId, + OnapMdc.SERVER_FQDN to serverFqdn + ) + + private fun getHost() = try { + InetAddress.getLocalHost() + } catch (ex: UnknownHostException) { + InetAddress.getLoopbackAddress() + } +} -- cgit 1.2.3-korg