diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-07 14:41:39 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-10 14:46:23 +0100 |
commit | 8b8c37c296e55644063e0332fd455437168e78da (patch) | |
tree | 36e9d96217346dd4296677cfd8af584c69a0ad05 | |
parent | 73293332b2244b66083dc5d3910801c1b1058105 (diff) |
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in
reactive application, we need to do some work manually. The approach
proposed is an explicit MDC handling, which means that context is
kept as an object created after establishing client connection. Next,
new instance of HvVesCollector (and its dependencies) is created. Every
object is propagated with ClientContext so it can use it when calling
logger methods.
In the future ClientContext might be used to support other use-cases,
ie. per-topic access control.
As a by-product I had to refactor our Logger wrapper, too. It already
had too many functions and after adding MDC number would be doubled.
Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe
Issue-ID: DCAEGEN2-671
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
37 files changed, 469 insertions, 328 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 4015b08b..d4c3f1d8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,45 +1,73 @@ version: "3.5" services: - zookeeper: + + # + # DMaaP Message Router + # + + message-router-zookeeper: image: wurstmeister/zookeeper ports: - - "2181:2181" + - "2181:2181" - kafka: + message-router-kafka: +# image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1 image: wurstmeister/kafka ports: - - "9092:9092" + - "9092:9092" environment: - KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092" + KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092" + KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT" volumes: - - /var/run/docker.sock:/var/run/docker.sock + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - message-router-zookeeper + + + # + # Consul / CBS + # + + consul-server: + image: docker.io/consul:1.0.6 + ports: + - "8500:8500" + command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"] + + consul-config: + image: consul depends_on: - - zookeeper + - consul-server + restart: on-failure + command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{ + "dmaap.kafkaBootstrapServers": "message-router-kafka:9092", + "collector.routing": [ + { + "fromDomain": "perf3gpp", + "toTopic": "HV_VES_PERF3GPP" + } + ] + }'] + - consul: - image: progrium/consul - ports: - - "8500:8500" - environment: - - CONSUL_BIND_INTERFACE=eth0 - command: ["-server", "-bootstrap", "-ui-dir", "/ui"] + # + # DCAE HV VES Collector + # ves-hv-collector: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest -# build: -# context: hv-collector-main -# dockerfile: Dockerfile ports: - - "6060:6060" - - "6061:6061/tcp" + - "6060:6060" + - "6061:6061/tcp" entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] command: ["--listen-port", "6061", "--health-check-api-port", "6060", - "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true", + "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true", "--key-store-password", "onaponap", "--trust-store-password", "onaponap"] healthcheck: @@ -49,37 +77,36 @@ services: retries: 3 start_period: 20s depends_on: - - kafka - - consul + - message-router-kafka + - consul-server volumes: - - ./ssl/:/etc/ves-hv/ + - ./ssl/:/etc/ves-hv/ + + + # + # Simulators + # xnf-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator -# build: -# context: hv-collector-xnf-simulator -# dockerfile: Dockerfile ports: - - "6062:6062/tcp" + - "6062:6062/tcp" command: ["--listen-port", "6062", "--ves-host", "ves-hv-collector", "--ves-port", "6061", "--key-store-password", "onaponap", "--trust-store-password", "onaponap"] depends_on: - - ves-hv-collector + - ves-hv-collector volumes: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator -# build: -# context: hv-collector-dcae-app-simulator -# dockerfile: Dockerfile ports: - - "6063:6063/tcp" + - "6063:6063/tcp" command: ["--listen-port", "6063", - "--kafka-bootstrap-servers", "kafka:9092", + "--kafka-bootstrap-servers", "message-router-kafka:9092", "--kafka-topics", "HV_VES_PERF3GPP"] depends_on: - - kafka + - message-router-kafka diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index dd0111bc..b686b250 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux @@ -35,12 +36,12 @@ interface Metrics { @FunctionalInterface interface SinkProvider { - operator fun invoke(config: CollectorConfiguration): Sink + operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink companion object { fun just(sink: Sink): SinkProvider = object : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink = sink + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 3c85a9b1..5584d61d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -23,15 +23,17 @@ import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.* interface Collector { - fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> + fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = () -> Option<Collector> +typealias CollectorProvider = (ClientContext) -> Option<Collector> interface Server { fun start(): IO<ServerHandle> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 5c96e1c5..2008fc35 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() + val config: AtomicReference<CollectorConfiguration> = AtomicReference() configuration() - .map(this::createVesHvCollector) .doOnNext { - logger.info("Using updated configuration for new connections") + logger.info { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error("Failed to acquire configuration from consul") + logger.error { "Failed to acquire configuration from consul" } healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } - .subscribe(collector::set) - return collector::getOption + .subscribe(config::set) + return { ctx: ClientContext -> + config.getOption().map { config -> createVesHvCollector(config, ctx) } + } } - private fun createVesHvCollector(config: CollectorConfiguration): Collector { - return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> - WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) - }, - protobufDecoder = VesDecoder(), - router = Router(config.routing), - sink = sinkProvider(config), - metrics = metrics) - } + private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(config.routing, ctx), + sink = sinkProvider(config, ctx), + metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index cee658b6..0977595a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -20,11 +20,22 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger -class Router(private val routing: Routing) { +class Router(private val routing: Routing, private val ctx: ClientContext) { fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) } + routing.routeFor(message.header).map { it(message) }.also { + if (it.isEmpty()) { + logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 4176de99..0d07504d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Either import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -42,28 +42,27 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) } + .doFinally { releaseBuffersMemory() } + .then() - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) + .concatMap(wireChunkDecoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux @@ -75,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, + .filterFailedWithLog(logger, clientContext::asMap, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -89,15 +88,15 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, + .filterEmptyWithLog(logger, clientContext::asMap, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - .also { logger.debug("Released buffer memory after handling message stream") } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, predicate) + filterFailedWithLog(logger, clientContext::asMap, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index cea8a7ee..bbaa47c4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { - logger.warn("Could not get fresh configuration", it.exception()) + logger.withWarn { log("Could not get fresh configuration", it.exception()) } healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index bdce6f73..3fefc6e8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient @@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient */ open class HttpAdapter(private val httpClient: HttpClient) { - private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient .get() .uri(url + createQueryString(queryParams)) @@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) { } } .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) + logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } + logger.withDebug { log("Nested exception:", it) } } private fun createQueryString(params: Map<String, Any>): String { @@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) { return builder.removeSuffix("&").toString() } + companion object { + + + private val logger = Logger(HttpAdapter::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 5f4bf354..f6cb018f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider { val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(logMessageSupplier) + logger.info(ctx, logMessageSupplier) else - logger.trace(logMessageSupplier) + logger.trace(ctx, logMessageSupplier) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index c4d6c87e..fd08ba3d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,6 +20,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink { private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { @@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM if (it.isSuccessful()) { Mono.just(it) } else { - logger.warn(it.exception()) { "Failed to send message to Kafka" } + logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) } Mono.empty<SenderResult<RoutedMessage>>() } } .map { it.correlationMetadata() } - return if (logger.traceEnabled) { - result.doOnNext(::logSentMessage) - } else { - result - } + return result.doOnNext(::logSentMessage) } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { @@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace { + logger.trace(ctx) { val msgNum = sentMessages.incrementAndGet() "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 18191952..b4f470d4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.apache.kafka.clients.producer.ProducerConfig import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader @@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions * @since June 2018 */ internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx) } private fun constructSenderOptions(config: CollectorConfiguration) = diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0b2997fa..2d29fe99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,6 +23,10 @@ import arrow.core.getOrElse import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> - logger.info("Collector configured with SSL enabled") + logger.info { "Collector configured with SSL enabled" } this.secure { b -> b.sslContext(sslContext) } }.getOrElse { - logger.info("Collector configured with SSL disabled") + logger.info { "Collector configured with SSL disabled" } this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + val clientContext = ClientContext(nettyOutbound.alloc()) + nettyInbound.withConnection { + clientContext.clientAddress = it.address() + } + + return collectorProvider(clientContext).fold( + { + logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + Mono.empty() + }, + { + logger.info { "Handling new connection" } + nettyInbound.withConnection { conn -> + conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) } - ) + it.handleConnection(createDataStream(nettyInbound)) + } + ) + } private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { - logger.info { + logger.info(ctx) { "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } - disconnectClient() + disconnectClient(ctx) } return this } - private fun Connection.disconnectClient() { + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } + logger.debug(ctx) { "Channel closed successfully." } else - logger.warn("Channel close failed", it.cause()) + logger.withWarn(ctx) { log("Channel close failed", it.cause()) } } } - private fun Connection.logConnectionClosed(): Connection { + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") + logger.info(ctx) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 4a2ef6b2..349b0787 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire import arrow.effects.IO import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux @@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink */ internal class WireChunkDecoder( private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() + private val ctx: ClientContext) { + private val streamBuffer = ctx.alloc.compositeBuffer() fun release() { streamBuffer.release() @@ -53,7 +54,7 @@ internal class WireChunkDecoder( } else { streamBuffer.addComponent(true, byteBuf) generateFrames() - .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) } .doFinally { streamBuffer.discardReadComponents() } } } @@ -84,15 +85,15 @@ internal class WireChunkDecoder( } private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" } } private fun logDecodedWireMessage(wire: WireFrameMessage) { - logger.trace { "Wire payload size: ${wire.payloadSize} B" } + logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" } } private fun logEndOfData() { - logger.trace { "End of data in current TCP buffer" } + logger.trace(ctx) { "End of data in current TCP buffer" } } companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt new file mode 100644 index 00000000..f14a7f65 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.slf4j.MDC +import java.net.InetSocketAddress +import java.util.* + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +data class ClientContext( + val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, + val clientId: String = UUID.randomUUID().toString(), + var clientAddress: InetSocketAddress? = null) { + fun asMap(): Map<String, String> { + val result = mutableMapOf("clientId" to clientId) + if (clientAddress != null) { + result["clientAddress"] = clientAddress.toString() + } + return result + } +} + +object ClientContextLogging { + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index 437614ac..ad97a3f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }).also { - if (it.isEmpty()) { - logger.debug { "No route is defined for domain: ${commonHeader.domain}" } - } - } - - companion object { - private val logger = Logger(Routing::class) - } + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index e8a31231..e4190163 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None import arrow.core.Some +import io.netty.buffer.ByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing @@ -56,7 +58,7 @@ object RouterTest : Spek({ withFixedPartitioning() } }.build() - val cut = Router(config) + val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index f06a0dc7..e0092cf9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import reactor.test.test /** @@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) - fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) + fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc)) fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 0897e910..ef4ce967 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({ ) val fluxes = (1.rangeTo(runs)).map { - sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + sut.collector.handleConnection(generateDataStream(sut.alloc, params)) } val durationMs = measureTimeMillis { Flux.merge(fluxes).then().block(timeout) @@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec - logger.info("Processed $runs connections each containing $numMessages msgs.") - logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + logger.info { "Processed $runs connections each containing $numMessages msgs." } + logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({ val dataStream = generateDataStream(sut.alloc, params) .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) - sut.collector.handleConnection(sut.alloc, dataStream) + sut.collector.handleConnection(dataStream) .timeout(timeout) .block() - logger.info("Forwarded ${sink.count} msgs") + logger.info { "Forwarded ${sink.count} msgs" } assertThat(sink.count) .describedAs("should send up to number of events") .isLessThan(numMessages) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 0495ced5..ce242e0b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState @@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) { private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } + get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") } companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 @@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) { } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) return sink.sentMessages } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2d81c671..ab59cc2e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ .map { vesWireFrameMessage(PERF3GPP) } - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 417183fb..f7d94de5 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -46,7 +46,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, throw IllegalArgumentException(message) } - logger.info("Received new configuration. Creating consumer for topics: $topics") + logger.info { "Received new configuration. Creating consumer for topics: $topics" } consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) }.fix() diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 20c0f592..36f30e66 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -61,13 +61,13 @@ class MessageStreamValidation( return messageParams.fold( { logger.warn { "Error while parsing message parameters: ${it::class.qualifiedName} : ${it.message}" } - logger.debug { "Detailed stack trace: ${it}" } + logger.debug { "Detailed stack trace: $it" } throw IllegalArgumentException("Parsing error: " + it.message) }, { if (it.isEmpty()) { val message = "Message param list cannot be empty" - logger.warn(message) + logger.warn { message } throw IllegalArgumentException(message) } it diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index a6ee1122..e54eb359 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -71,15 +71,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } .delete("messages") { ctx -> ctx.response.contentType(CONTENT_TEXT) - logger.info("Resetting simulator state") + logger.info { "Resetting simulator state" } ctx.response.sendOrError(simulator.resetState()) } .get("messages/all/count") { ctx -> - logger.info("Processing request for count of received messages") + logger.info { "Processing request for count of received messages" } simulator.state().fold( { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) - logger.warn("Error - number of messages could not be specified") + logger.warn { "Error - number of messages could not be specified" } }, { logger.info { "Returned number of received messages: ${it.messagesCount}" } @@ -90,7 +90,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } .post("messages/all/validate") { ctx -> ctx.request.body.then { body -> - logger.info("Processing request for message validation") + logger.info { "Processing request for message validation" } val response = simulator.validate(body.inputStream) .map { isValid -> if (isValid) { diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 06ff4d59..5856f044 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -43,17 +43,17 @@ fun main(args: Array<String>) = .map(::startApp) .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, { - logger.info("Started DCAE-APP Simulator API server") + logger.info { "Started DCAE-APP Simulator API server" } } ) private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 899f51fb..5c9566c7 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -40,15 +40,15 @@ fun main(args: Array<String>) = .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.info("Gentle shutdown") } + { logger.info { "Gentle shutdown" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } HealthCheckServer.start(config).bind() VesServer.start(config).bind() .await().bind() diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt index 5c6f1277..13b0bc7b 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger abstract class ServerStarter { fun start(config: ServerConfiguration): IO<ServerHandle> = startServer(config) - .map { logger.info(serverStartedMessage(it)); it } + .map { logger.info { serverStartedMessage(it) }; it } protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> protected abstract fun serverStartedMessage(handle: ServerHandle): String diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index bee0dae1..674fb2c3 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -12,6 +12,7 @@ %nopexception%50.50logger | %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} | %highlight(%-5level) +| %mdc{clientId} %mdc{clientAddress} | %msg | %rootException | %thread%n"/> diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt index d017b31b..6ca28a56 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt @@ -31,7 +31,7 @@ import java.time.Duration private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils") object Assertions : org.assertj.core.api.Assertions() { - fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual) + fun <A, B> assertThat(actual: Either<A, B>) = EitherAssert(actual) } @@ -42,7 +42,7 @@ fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) { while (tryNum <= retries) { tryNum++ try { - logger.debug("Try number $tryNum") + logger.debug { "Try number $tryNum" } action() break } catch (ex: Throwable) { diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt index 5a733f24..a25b2912 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt @@ -51,7 +51,7 @@ fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Resp fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) { response.attempt().unsafeRunSync().fold( { err -> - logger.warn("Error occurred. Sending .", err) + logger.withWarn { log("Error occurred. Sending .", err) } val message = err.message send(errorResponse(message)) }, diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 033dd5e5..2fb48803 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -21,117 +21,171 @@ package org.onap.dcae.collectors.veshv.utils.logging import kotlin.reflect.KClass import org.slf4j.LoggerFactory +import org.slf4j.MDC + +typealias MappedDiagnosticContext = () -> Map<String, String> @Suppress("TooManyFunctions", "SuboptimalLoggerUsage") -class Logger(val logger: org.slf4j.Logger) { +class Logger(logger: org.slf4j.Logger) { constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java)) constructor(name: String) : this(LoggerFactory.getLogger(name)) - // - // TRACE - // + private val errorLogger = if (logger.isErrorEnabled) ErrorLevelLogger(logger) else OffLevelLogger + private val warnLogger = if (logger.isWarnEnabled) WarnLevelLogger(logger) else OffLevelLogger + private val infoLogger = if (logger.isInfoEnabled) InfoLevelLogger(logger) else OffLevelLogger + private val debugLogger = if (logger.isDebugEnabled) DebugLevelLogger(logger) else OffLevelLogger + private val traceLogger = if (logger.isTraceEnabled) TraceLevelLogger(logger) else OffLevelLogger - val traceEnabled: Boolean - get() = logger.isTraceEnabled + // ERROR - fun trace(messageProvider: () -> String) { - if (logger.isTraceEnabled) { - logger.trace(messageProvider()) - } - } + fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block() - // - // DEBUG - // + fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + errorLogger.withMdc(mdc, block) - fun debug(message: String) { - logger.debug(message) + fun error(message: () -> String) = errorLogger.run { + log(message()) } - fun debug(message: String, t: Throwable) { - logger.debug(message, t) + fun error(mdc: MappedDiagnosticContext, message: () -> String) = + errorLogger.withMdc(mdc) { log(message()) } + + // WARN + + fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() + + fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + warnLogger.withMdc(mdc, block) + + fun warn(message: () -> String) = warnLogger.run { + log(message()) } - fun debug(messageProvider: () -> String) { - if (logger.isDebugEnabled) { - logger.debug(messageProvider()) - } + fun warn(mdc: MappedDiagnosticContext, message: () -> String) = + warnLogger.withMdc(mdc) { log(message()) } + + + // INFO + + fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() + + fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + infoLogger.withMdc(mdc, block) + + fun info(message: () -> String) = infoLogger.run { + log(message()) } - fun debug(t: Throwable, messageProvider: () -> String) { - if (logger.isDebugEnabled) { - logger.debug(messageProvider(), t) - } + fun info(mdc: MappedDiagnosticContext, message: () -> String) = + infoLogger.withMdc(mdc) { log(message()) } + + // DEBUG + + fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block() + + fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + debugLogger.withMdc(mdc, block) + + fun debug(message: () -> String) = debugLogger.run { + log(message()) } - // - // INFO - // - fun info(message: String) { - logger.info(message) + fun debug(mdc: MappedDiagnosticContext, message: () -> String) = + debugLogger.withMdc(mdc) { log(message()) } + + + // TRACE + + fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block() + + fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + traceLogger.withMdc(mdc, block) + + fun trace(message: () -> String) = traceLogger.run { + log(message()) } - fun info(messageProvider: () -> String) { - if (logger.isInfoEnabled) { - logger.info(messageProvider()) + fun trace(mdc: MappedDiagnosticContext, message: () -> String) = + traceLogger.withMdc(mdc) { log(message()) } + +} + +abstract class AtLevelLogger { + abstract fun log(message: String) + abstract fun log(message: String, t: Throwable) + open val enabled: Boolean + get() = true + + inline fun withMdc(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) { + if (enabled) { + try { + MDC.setContextMap(mdc()) + block() + } finally { + MDC.clear() + } } } +} - fun info(message: String, t: Throwable) { - logger.info(message, t) +object OffLevelLogger : AtLevelLogger() { + override val enabled = false + + override fun log(message: String) { + // do not log anything } - fun info(t: Throwable, messageProvider: () -> String) { - if (logger.isInfoEnabled) { - logger.info(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + // do not log anything } +} - // - // WARN - // +class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.error(message) + } + + override fun log(message: String, t: Throwable) { + logger.error(message, t) + } +} - fun warn(message: String) { +class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { logger.warn(message) } - fun warn(message: String, t: Throwable) { + override fun log(message: String, t: Throwable) { logger.warn(message, t) } +} - fun warn(messageProvider: () -> String) { - if (logger.isWarnEnabled) { - logger.warn(messageProvider()) - } +class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.info(message) } - fun warn(t: Throwable, messageProvider: () -> String) { - if (logger.isWarnEnabled) { - logger.warn(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + logger.info(message, t) } +} - // - // ERROR - // - - fun error(message: String) { - logger.error(message) +class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.debug(message) } - fun error(message: String, t: Throwable) { - logger.error(message, t) + override fun log(message: String, t: Throwable) { + logger.debug(message, t) } +} - fun error(messageProvider: () -> String) { - if (logger.isErrorEnabled) { - logger.error(messageProvider()) - } +class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.trace(message) } - fun error(t: Throwable, messageProvider: () -> String) { - if (logger.isErrorEnabled) { - logger.error(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + logger.trace(message, t) } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index e8ec2549..1e98f2fc 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -25,42 +25,49 @@ import arrow.core.Try import reactor.core.publisher.Flux import reactor.core.publisher.Mono -fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") - logger.debug("Detailed stack trace", ex) +fun <T> Logger.handleReactiveStreamError( + context: MappedDiagnosticContext, + ex: Throwable, + returnFlux: Flux<T> = Flux.empty()): Flux<T> { + warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" } + withDebug(context) { log("Detailed stack trace", ex) } return returnFlux } fun <T> Try<T>.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: (Throwable) -> String): Flux<T> = - fold({ - logger.warn(rejectedMsg(it)) + fold({ ex -> + logger.withWarn(context) { log(rejectedMsg(ex)) } Flux.empty<T>() - }, { - logger.trace { acceptedMsg(it) } - Flux.just(it) + }, { obj -> + logger.trace(context) { acceptedMsg(obj) } + Flux.just(obj) }) fun <T> Option<T>.filterEmptyWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: () -> String): Flux<T> = fold({ - logger.warn(rejectedMsg) + logger.warn(context, rejectedMsg) Flux.empty<T>() }, { - logger.trace { acceptedMsg(it) } + logger.trace(context) { acceptedMsg(it) } Flux.just(it) }) -fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = +fun <T> Flux<T>.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, + predicate: (T) -> Either<() -> String, () -> String>) = flatMap { t -> predicate(t).fold({ - logger.warn(it) + logger.warn(context, it) Mono.empty<T>() }, { - logger.trace(it) + logger.trace(context, it) Mono.just<T>(t) }) } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt index c27fb8c8..10fc8d8f 100644 --- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt @@ -34,11 +34,16 @@ import org.jetbrains.spek.api.dsl.it object LoggerTest : Spek({ lateinit var slf4jLogger: org.slf4j.Logger - lateinit var cut: Logger + fun cut() = Logger(slf4jLogger).also { + verify(slf4jLogger).isTraceEnabled + verify(slf4jLogger).isDebugEnabled + verify(slf4jLogger).isInfoEnabled + verify(slf4jLogger).isWarnEnabled + verify(slf4jLogger).isErrorEnabled + } beforeEachTest { slf4jLogger = mock() - cut = Logger(slf4jLogger) } afterEachTest { @@ -50,28 +55,19 @@ object LoggerTest : Spek({ val exception = Exception("fail") describe("debug levels") { - it("should log message") { - cut.debug(message) - verify(slf4jLogger).debug(message) - } - - it("should log message with exception") { - cut.debug(message, exception) - verify(slf4jLogger).debug(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isDebugEnabled).thenReturn(true) - cut.debug { message } + cut().debug { message } verify(slf4jLogger).isDebugEnabled verify(slf4jLogger).debug(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isDebugEnabled).thenReturn(false) - cut.debug { message } + cut().debug { message } verify(slf4jLogger).isDebugEnabled } } @@ -80,42 +76,33 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isDebugEnabled).thenReturn(true) - cut.debug(exception) { message } + cut().withDebug { log(message, exception) } verify(slf4jLogger).isDebugEnabled verify(slf4jLogger).debug(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isDebugEnabled).thenReturn(false) - cut.debug(exception) { message } + cut().withDebug { log(message, exception) } verify(slf4jLogger).isDebugEnabled } } } describe("info levels") { - it("should log message") { - cut.info(message) - verify(slf4jLogger).info(message) - } - - it("should log message with exception") { - cut.info(message, exception) - verify(slf4jLogger).info(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isInfoEnabled).thenReturn(true) - cut.info { message } + cut().info { message } verify(slf4jLogger).isInfoEnabled verify(slf4jLogger).info(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isInfoEnabled).thenReturn(false) - cut.info { message } + cut().info { message } verify(slf4jLogger).isInfoEnabled } } @@ -124,42 +111,32 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isInfoEnabled).thenReturn(true) - cut.info(exception) { message } + cut().withInfo { log(message, exception) } verify(slf4jLogger).isInfoEnabled verify(slf4jLogger).info(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isInfoEnabled).thenReturn(false) - cut.info(exception) { message } + cut().withInfo { log(message, exception) } verify(slf4jLogger).isInfoEnabled } } } describe("warning levels") { - it("should log message") { - cut.warn(message) - verify(slf4jLogger).warn(message) - } - - it("should log message with exception") { - cut.warn(message, exception) - verify(slf4jLogger).warn(message, exception) - } - describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isWarnEnabled).thenReturn(true) - cut.warn { message } + cut().warn { message } verify(slf4jLogger).isWarnEnabled verify(slf4jLogger).warn(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isWarnEnabled).thenReturn(false) - cut.warn { message } + cut().warn { message } verify(slf4jLogger).isWarnEnabled } } @@ -168,42 +145,33 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isWarnEnabled).thenReturn(true) - cut.warn(exception) { message } + cut().withWarn { log(message, exception) } verify(slf4jLogger).isWarnEnabled verify(slf4jLogger).warn(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isWarnEnabled).thenReturn(false) - cut.warn(exception) { message } + cut().withWarn { log(message, exception) } verify(slf4jLogger).isWarnEnabled } } } describe("error levels") { - it("should log message") { - cut.error(message) - verify(slf4jLogger).error(message) - } - - it("should log message with exception") { - cut.error(message, exception) - verify(slf4jLogger).error(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isErrorEnabled).thenReturn(true) - cut.error { message } + cut().error { message } verify(slf4jLogger).isErrorEnabled verify(slf4jLogger).error(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isErrorEnabled).thenReturn(false) - cut.error { message } + cut().error { message } verify(slf4jLogger).isErrorEnabled } } @@ -212,14 +180,14 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isErrorEnabled).thenReturn(true) - cut.error(exception) { message } + cut().withError { log(message, exception) } verify(slf4jLogger).isErrorEnabled verify(slf4jLogger).error(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isErrorEnabled).thenReturn(false) - cut.error(exception) { message } + cut().withError { log(message, exception) } verify(slf4jLogger).isErrorEnabled } } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt index 0f359df3..da956bec 100644 --- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt @@ -42,7 +42,7 @@ class ReactiveLoggingTest : Spek({ val cut = Try.just(event) it("should not filter stream event and log accepted message") { - cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) .test() .expectNext(event) .verifyComplete() @@ -53,7 +53,7 @@ class ReactiveLoggingTest : Spek({ val e = Exception() val cut = Failure(e) it("should filter stream event and log rejected message") { - cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) .test() .verifyComplete() } @@ -65,7 +65,7 @@ class ReactiveLoggingTest : Spek({ val cut = Option.just(event) it("should not filter stream event and log accepted message") { - cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + cut.filterEmptyWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE) .test() .expectNext(event) .verifyComplete() @@ -75,7 +75,7 @@ class ReactiveLoggingTest : Spek({ given("empty Option") { val cut = Option.empty<Int>() it("should filter stream event and log rejected message") { - cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + cut.filterEmptyWithLog(logger,::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE) .test() .verifyComplete() } @@ -88,7 +88,7 @@ class ReactiveLoggingTest : Spek({ val cut = Flux.just(event) it("should not filter stream event and log accepted message") { - cut.filterFailedWithLog(logger, right()) + cut.filterFailedWithLog(logger,::emptyMap, right()) .test() .expectNext(event) .verifyComplete() @@ -99,7 +99,7 @@ class ReactiveLoggingTest : Spek({ val cut = Flux.just(event) it("should filter stream event and log rejected message") { - cut.filterFailedWithLog(logger, left()) + cut.filterFailedWithLog(logger,::emptyMap, left()) .test() .verifyComplete() } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 57aaf3db..ca6d169a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -61,12 +61,14 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .handle { _, output -> handler(complete, messages, output) } .connect() .doOnError { - logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") + logger.info { + "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" + } } .subscribe { - logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") + logger.info { + "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" + } } return complete.then() } @@ -86,7 +88,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .options { it.flushOnBoundary() } .sendGroups(frames) .then { - logger.info("Messages have been sent") + logger.info { "Messages have been sent" } complete.onComplete() } .then() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index 16019384..cfd3a6e9 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -59,17 +59,17 @@ internal class XnfApiServer( .post("simulator/async", ::startSimulationHandler) .get("simulator/:id", ::simulatorStatusHandler) .get("healthcheck") { ctx -> - logger.info("Checking health") + logger.info { "Checking health" } ctx.response.status(HttpConstants.STATUS_OK).send() } } private fun startSimulationHandler(ctx: Context) { - logger.info("Attempting to start asynchronous scenario") + logger.info { "Attempting to start asynchronous scenario" } ctx.request.body.then { body -> val id = startSimulation(body) when (id) { - is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}"} + is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" } is Either.Right -> logger.info { "Scenario started, details: ${id.b}" } } ctx.response.sendEitherErrorOrResponse(id) @@ -83,7 +83,7 @@ internal class XnfApiServer( } private fun simulatorStatusHandler(ctx: Context) { - logger.debug("Checking task status") + logger.debug { "Checking task status" } val id = UUID.fromString(ctx.pathTokens["id"]) logger.debug { "Checking status for id: $id" } val status = ongoingSimulations.status(id) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index 21748ae8..d7d42d88 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -43,11 +43,11 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> result.fold( { err -> - logger.warn("Error", err) + logger.withWarn { log("Error", err) } simulations[id] = StatusFailure(err) }, { - logger.info("Finished sending messages") + logger.info { "Finished sending messages" } simulations[id] = StatusSuccess } ) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 4512dfbf..91070d35 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -42,7 +42,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) .map { config -> - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } val xnfSimulator = XnfSimulator( VesHvClient(config), MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) @@ -52,10 +52,10 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) } .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, { - logger.info("Started xNF Simulator API server") + logger.info { "Started xNF Simulator API server" } } ) |