diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-07 14:41:39 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-10 14:46:23 +0100 |
commit | 8b8c37c296e55644063e0332fd455437168e78da (patch) | |
tree | 36e9d96217346dd4296677cfd8af584c69a0ad05 /sources/hv-collector-core | |
parent | 73293332b2244b66083dc5d3910801c1b1058105 (diff) |
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in
reactive application, we need to do some work manually. The approach
proposed is an explicit MDC handling, which means that context is
kept as an object created after establishing client connection. Next,
new instance of HvVesCollector (and its dependencies) is created. Every
object is propagated with ClientContext so it can use it when calling
logger methods.
In the future ClientContext might be used to support other use-cases,
ie. per-topic access control.
As a by-product I had to refactor our Logger wrapper, too. It already
had too many functions and after adding MDC number would be doubled.
Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe
Issue-ID: DCAEGEN2-671
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
16 files changed, 191 insertions, 110 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index dd0111bc..b686b250 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux @@ -35,12 +36,12 @@ interface Metrics { @FunctionalInterface interface SinkProvider { - operator fun invoke(config: CollectorConfiguration): Sink + operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink companion object { fun just(sink: Sink): SinkProvider = object : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink = sink + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 3c85a9b1..5584d61d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -23,15 +23,17 @@ import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.* interface Collector { - fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> + fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = () -> Option<Collector> +typealias CollectorProvider = (ClientContext) -> Option<Collector> interface Server { fun start(): IO<ServerHandle> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 5c96e1c5..2008fc35 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() + val config: AtomicReference<CollectorConfiguration> = AtomicReference() configuration() - .map(this::createVesHvCollector) .doOnNext { - logger.info("Using updated configuration for new connections") + logger.info { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error("Failed to acquire configuration from consul") + logger.error { "Failed to acquire configuration from consul" } healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } - .subscribe(collector::set) - return collector::getOption + .subscribe(config::set) + return { ctx: ClientContext -> + config.getOption().map { config -> createVesHvCollector(config, ctx) } + } } - private fun createVesHvCollector(config: CollectorConfiguration): Collector { - return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> - WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) - }, - protobufDecoder = VesDecoder(), - router = Router(config.routing), - sink = sinkProvider(config), - metrics = metrics) - } + private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(config.routing, ctx), + sink = sinkProvider(config, ctx), + metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index cee658b6..0977595a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -20,11 +20,22 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger -class Router(private val routing: Routing) { +class Router(private val routing: Routing, private val ctx: ClientContext) { fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) } + routing.routeFor(message.header).map { it(message) }.also { + if (it.isEmpty()) { + logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 4176de99..0d07504d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Either import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -42,28 +42,27 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) } + .doFinally { releaseBuffersMemory() } + .then() - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) + .concatMap(wireChunkDecoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux @@ -75,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, + .filterFailedWithLog(logger, clientContext::asMap, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -89,15 +88,15 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, + .filterEmptyWithLog(logger, clientContext::asMap, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - .also { logger.debug("Released buffer memory after handling message stream") } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, predicate) + filterFailedWithLog(logger, clientContext::asMap, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index cea8a7ee..bbaa47c4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { - logger.warn("Could not get fresh configuration", it.exception()) + logger.withWarn { log("Could not get fresh configuration", it.exception()) } healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index bdce6f73..3fefc6e8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient @@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient */ open class HttpAdapter(private val httpClient: HttpClient) { - private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient .get() .uri(url + createQueryString(queryParams)) @@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) { } } .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) + logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } + logger.withDebug { log("Nested exception:", it) } } private fun createQueryString(params: Map<String, Any>): String { @@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) { return builder.removeSuffix("&").toString() } + companion object { + + + private val logger = Logger(HttpAdapter::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 5f4bf354..f6cb018f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider { val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(logMessageSupplier) + logger.info(ctx, logMessageSupplier) else - logger.trace(logMessageSupplier) + logger.trace(ctx, logMessageSupplier) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index c4d6c87e..fd08ba3d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,6 +20,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink { private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { @@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM if (it.isSuccessful()) { Mono.just(it) } else { - logger.warn(it.exception()) { "Failed to send message to Kafka" } + logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) } Mono.empty<SenderResult<RoutedMessage>>() } } .map { it.correlationMetadata() } - return if (logger.traceEnabled) { - result.doOnNext(::logSentMessage) - } else { - result - } + return result.doOnNext(::logSentMessage) } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { @@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace { + logger.trace(ctx) { val msgNum = sentMessages.incrementAndGet() "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 18191952..b4f470d4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.apache.kafka.clients.producer.ProducerConfig import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader @@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions * @since June 2018 */ internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx) } private fun constructSenderOptions(config: CollectorConfiguration) = diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0b2997fa..2d29fe99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,6 +23,10 @@ import arrow.core.getOrElse import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> - logger.info("Collector configured with SSL enabled") + logger.info { "Collector configured with SSL enabled" } this.secure { b -> b.sslContext(sslContext) } }.getOrElse { - logger.info("Collector configured with SSL disabled") + logger.info { "Collector configured with SSL disabled" } this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + val clientContext = ClientContext(nettyOutbound.alloc()) + nettyInbound.withConnection { + clientContext.clientAddress = it.address() + } + + return collectorProvider(clientContext).fold( + { + logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + Mono.empty() + }, + { + logger.info { "Handling new connection" } + nettyInbound.withConnection { conn -> + conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) } - ) + it.handleConnection(createDataStream(nettyInbound)) + } + ) + } private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { - logger.info { + logger.info(ctx) { "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } - disconnectClient() + disconnectClient(ctx) } return this } - private fun Connection.disconnectClient() { + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } + logger.debug(ctx) { "Channel closed successfully." } else - logger.warn("Channel close failed", it.cause()) + logger.withWarn(ctx) { log("Channel close failed", it.cause()) } } } - private fun Connection.logConnectionClosed(): Connection { + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") + logger.info(ctx) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 4a2ef6b2..349b0787 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire import arrow.effects.IO import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux @@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink */ internal class WireChunkDecoder( private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() + private val ctx: ClientContext) { + private val streamBuffer = ctx.alloc.compositeBuffer() fun release() { streamBuffer.release() @@ -53,7 +54,7 @@ internal class WireChunkDecoder( } else { streamBuffer.addComponent(true, byteBuf) generateFrames() - .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) } .doFinally { streamBuffer.discardReadComponents() } } } @@ -84,15 +85,15 @@ internal class WireChunkDecoder( } private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" } } private fun logDecodedWireMessage(wire: WireFrameMessage) { - logger.trace { "Wire payload size: ${wire.payloadSize} B" } + logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" } } private fun logEndOfData() { - logger.trace { "End of data in current TCP buffer" } + logger.trace(ctx) { "End of data in current TCP buffer" } } companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt new file mode 100644 index 00000000..f14a7f65 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.slf4j.MDC +import java.net.InetSocketAddress +import java.util.* + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +data class ClientContext( + val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, + val clientId: String = UUID.randomUUID().toString(), + var clientAddress: InetSocketAddress? = null) { + fun asMap(): Map<String, String> { + val result = mutableMapOf("clientId" to clientId) + if (clientAddress != null) { + result["clientAddress"] = clientAddress.toString() + } + return result + } +} + +object ClientContextLogging { + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index 437614ac..ad97a3f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }).also { - if (it.isEmpty()) { - logger.debug { "No route is defined for domain: ${commonHeader.domain}" } - } - } - - companion object { - private val logger = Logger(Routing::class) - } + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index e8a31231..e4190163 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None import arrow.core.Some +import io.netty.buffer.ByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing @@ -56,7 +58,7 @@ object RouterTest : Spek({ withFixedPartitioning() } }.build() - val cut = Router(config) + val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index f06a0dc7..e0092cf9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import reactor.test.test /** @@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) - fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) + fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc)) fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { |