diff options
37 files changed, 469 insertions, 328 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 4015b08b..d4c3f1d8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,45 +1,73 @@ version: "3.5" services: - zookeeper: + + # + # DMaaP Message Router + # + + message-router-zookeeper: image: wurstmeister/zookeeper ports: - - "2181:2181" + - "2181:2181" - kafka: + message-router-kafka: +# image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1 image: wurstmeister/kafka ports: - - "9092:9092" + - "9092:9092" environment: - KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092" + KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092" + KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT" volumes: - - /var/run/docker.sock:/var/run/docker.sock + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - message-router-zookeeper + + + # + # Consul / CBS + # + + consul-server: + image: docker.io/consul:1.0.6 + ports: + - "8500:8500" + command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"] + + consul-config: + image: consul depends_on: - - zookeeper + - consul-server + restart: on-failure + command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{ + "dmaap.kafkaBootstrapServers": "message-router-kafka:9092", + "collector.routing": [ + { + "fromDomain": "perf3gpp", + "toTopic": "HV_VES_PERF3GPP" + } + ] + }'] + - consul: - image: progrium/consul - ports: - - "8500:8500" - environment: - - CONSUL_BIND_INTERFACE=eth0 - command: ["-server", "-bootstrap", "-ui-dir", "/ui"] + # + # DCAE HV VES Collector + # ves-hv-collector: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest -# build: -# context: hv-collector-main -# dockerfile: Dockerfile ports: - - "6060:6060" - - "6061:6061/tcp" + - "6060:6060" + - "6061:6061/tcp" entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] command: ["--listen-port", "6061", "--health-check-api-port", "6060", - "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true", + "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true", "--key-store-password", "onaponap", "--trust-store-password", "onaponap"] healthcheck: @@ -49,37 +77,36 @@ services: retries: 3 start_period: 20s depends_on: - - kafka - - consul + - message-router-kafka + - consul-server volumes: - - ./ssl/:/etc/ves-hv/ + - ./ssl/:/etc/ves-hv/ + + + # + # Simulators + # xnf-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator -# build: -# context: hv-collector-xnf-simulator -# dockerfile: Dockerfile ports: - - "6062:6062/tcp" + - "6062:6062/tcp" command: ["--listen-port", "6062", "--ves-host", "ves-hv-collector", "--ves-port", "6061", "--key-store-password", "onaponap", "--trust-store-password", "onaponap"] depends_on: - - ves-hv-collector + - ves-hv-collector volumes: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator -# build: -# context: hv-collector-dcae-app-simulator -# dockerfile: Dockerfile ports: - - "6063:6063/tcp" + - "6063:6063/tcp" command: ["--listen-port", "6063", - "--kafka-bootstrap-servers", "kafka:9092", + "--kafka-bootstrap-servers", "message-router-kafka:9092", "--kafka-topics", "HV_VES_PERF3GPP"] depends_on: - - kafka + - message-router-kafka diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index dd0111bc..b686b250 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux @@ -35,12 +36,12 @@ interface Metrics { @FunctionalInterface interface SinkProvider { - operator fun invoke(config: CollectorConfiguration): Sink + operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink companion object { fun just(sink: Sink): SinkProvider = object : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink = sink + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 3c85a9b1..5584d61d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -23,15 +23,17 @@ import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.* interface Collector { - fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> + fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = () -> Option<Collector> +typealias CollectorProvider = (ClientContext) -> Option<Collector> interface Server { fun start(): IO<ServerHandle> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 5c96e1c5..2008fc35 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() + val config: AtomicReference<CollectorConfiguration> = AtomicReference() configuration() - .map(this::createVesHvCollector) .doOnNext { - logger.info("Using updated configuration for new connections") + logger.info { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error("Failed to acquire configuration from consul") + logger.error { "Failed to acquire configuration from consul" } healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } - .subscribe(collector::set) - return collector::getOption + .subscribe(config::set) + return { ctx: ClientContext -> + config.getOption().map { config -> createVesHvCollector(config, ctx) } + } } - private fun createVesHvCollector(config: CollectorConfiguration): Collector { - return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> - WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) - }, - protobufDecoder = VesDecoder(), - router = Router(config.routing), - sink = sinkProvider(config), - metrics = metrics) - } + private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(config.routing, ctx), + sink = sinkProvider(config, ctx), + metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index cee658b6..0977595a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -20,11 +20,22 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger -class Router(private val routing: Routing) { +class Router(private val routing: Routing, private val ctx: ClientContext) { fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) } + routing.routeFor(message.header).map { it(message) }.also { + if (it.isEmpty()) { + logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 4176de99..0d07504d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Either import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -42,28 +42,27 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) } + .doFinally { releaseBuffersMemory() } + .then() - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) + .concatMap(wireChunkDecoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux @@ -75,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, + .filterFailedWithLog(logger, clientContext::asMap, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -89,15 +88,15 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, + .filterEmptyWithLog(logger, clientContext::asMap, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - .also { logger.debug("Released buffer memory after handling message stream") } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, predicate) + filterFailedWithLog(logger, clientContext::asMap, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index cea8a7ee..bbaa47c4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { - logger.warn("Could not get fresh configuration", it.exception()) + logger.withWarn { log("Could not get fresh configuration", it.exception()) } healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index bdce6f73..3fefc6e8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient @@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient */ open class HttpAdapter(private val httpClient: HttpClient) { - private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient .get() .uri(url + createQueryString(queryParams)) @@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) { } } .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) + logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } + logger.withDebug { log("Nested exception:", it) } } private fun createQueryString(params: Map<String, Any>): String { @@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) { return builder.removeSuffix("&").toString() } + companion object { + + + private val logger = Logger(HttpAdapter::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 5f4bf354..f6cb018f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider { val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(logMessageSupplier) + logger.info(ctx, logMessageSupplier) else - logger.trace(logMessageSupplier) + logger.trace(ctx, logMessageSupplier) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index c4d6c87e..fd08ba3d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,6 +20,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink { private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { @@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM if (it.isSuccessful()) { Mono.just(it) } else { - logger.warn(it.exception()) { "Failed to send message to Kafka" } + logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) } Mono.empty<SenderResult<RoutedMessage>>() } } .map { it.correlationMetadata() } - return if (logger.traceEnabled) { - result.doOnNext(::logSentMessage) - } else { - result - } + return result.doOnNext(::logSentMessage) } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { @@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace { + logger.trace(ctx) { val msgNum = sentMessages.incrementAndGet() "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 18191952..b4f470d4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.apache.kafka.clients.producer.ProducerConfig import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader @@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions * @since June 2018 */ internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx) } private fun constructSenderOptions(config: CollectorConfiguration) = diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0b2997fa..2d29fe99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,6 +23,10 @@ import arrow.core.getOrElse import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> - logger.info("Collector configured with SSL enabled") + logger.info { "Collector configured with SSL enabled" } this.secure { b -> b.sslContext(sslContext) } }.getOrElse { - logger.info("Collector configured with SSL disabled") + logger.info { "Collector configured with SSL disabled" } this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + val clientContext = ClientContext(nettyOutbound.alloc()) + nettyInbound.withConnection { + clientContext.clientAddress = it.address() + } + + return collectorProvider(clientContext).fold( + { + logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + Mono.empty() + }, + { + logger.info { "Handling new connection" } + nettyInbound.withConnection { conn -> + conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) } - ) + it.handleConnection(createDataStream(nettyInbound)) + } + ) + } private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { - logger.info { + logger.info(ctx) { "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } - disconnectClient() + disconnectClient(ctx) } return this } - private fun Connection.disconnectClient() { + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } + logger.debug(ctx) { "Channel closed successfully." } else - logger.warn("Channel close failed", it.cause()) + logger.withWarn(ctx) { log("Channel close failed", it.cause()) } } } - private fun Connection.logConnectionClosed(): Connection { + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") + logger.info(ctx) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 4a2ef6b2..349b0787 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire import arrow.effects.IO import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux @@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink */ internal class WireChunkDecoder( private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() + private val ctx: ClientContext) { + private val streamBuffer = ctx.alloc.compositeBuffer() fun release() { streamBuffer.release() @@ -53,7 +54,7 @@ internal class WireChunkDecoder( } else { streamBuffer.addComponent(true, byteBuf) generateFrames() - .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) } .doFinally { streamBuffer.discardReadComponents() } } } @@ -84,15 +85,15 @@ internal class WireChunkDecoder( } private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" } } private fun logDecodedWireMessage(wire: WireFrameMessage) { - logger.trace { "Wire payload size: ${wire.payloadSize} B" } + logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" } } private fun logEndOfData() { - logger.trace { "End of data in current TCP buffer" } + logger.trace(ctx) { "End of data in current TCP buffer" } } companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt new file mode 100644 index 00000000..f14a7f65 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.slf4j.MDC +import java.net.InetSocketAddress +import java.util.* + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +data class ClientContext( + val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, + val clientId: String = UUID.randomUUID().toString(), + var clientAddress: InetSocketAddress? = null) { + fun asMap(): Map<String, String> { + val result = mutableMapOf("clientId" to clientId) + if (clientAddress != null) { + result["clientAddress"] = clientAddress.toString() + } + return result + } +} + +object ClientContextLogging { + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index 437614ac..ad97a3f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }).also { - if (it.isEmpty()) { - logger.debug { "No route is defined for domain: ${commonHeader.domain}" } - } - } - - companion object { - private val logger = Logger(Routing::class) - } + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index e8a31231..e4190163 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None import arrow.core.Some +import io.netty.buffer.ByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing @@ -56,7 +58,7 @@ object RouterTest : Spek({ withFixedPartitioning() } }.build() - val cut = Router(config) + val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index f06a0dc7..e0092cf9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import reactor.test.test /** @@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) - fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) + fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc)) fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 0897e910..ef4ce967 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({ ) val fluxes = (1.rangeTo(runs)).map { - sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + sut.collector.handleConnection(generateDataStream(sut.alloc, params)) } val durationMs = measureTimeMillis { Flux.merge(fluxes).then().block(timeout) @@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec - logger.info("Processed $runs connections each containing $numMessages msgs.") - logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + logger.info { "Processed $runs connections each containing $numMessages msgs." } + logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({ val dataStream = generateDataStream(sut.alloc, params) .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) - sut.collector.handleConnection(sut.alloc, dataStream) + sut.collector.handleConnection(dataStream) .timeout(timeout) .block() - logger.info("Forwarded ${sink.count} msgs") + logger.info { "Forwarded ${sink.count} msgs" } assertThat(sink.count) .describedAs("should send up to number of events") .isLessThan(numMessages) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 0495ced5..ce242e0b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState @@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) { private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } + get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") } companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 @@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) { } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) return sink.sentMessages } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2d81c671..ab59cc2e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ .map { vesWireFrameMessage(PERF3GPP) } - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 417183fb..f7d94de5 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -46,7 +46,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, throw IllegalArgumentException(message) } - logger.info("Received new configuration. Creating consumer for topics: $topics") + logger.info { "Received new configuration. Creating consumer for topics: $topics" } consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) }.fix() diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 20c0f592..36f30e66 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -61,13 +61,13 @@ class MessageStreamValidation( return messageParams.fold( { logger.warn { "Error while parsing message parameters: ${it::class.qualifiedName} : ${it.message}" } - logger.debug { "Detailed stack trace: ${it}" } + logger.debug { "Detailed stack trace: $it" } throw IllegalArgumentException("Parsing error: " + it.message) }, { if (it.isEmpty()) { val message = "Message param list cannot be empty" - logger.warn(message) + logger.warn { message } throw IllegalArgumentException(message) } it diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index a6ee1122..e54eb359 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -71,15 +71,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } .delete("messages") { ctx -> ctx.response.contentType(CONTENT_TEXT) - logger.info("Resetting simulator state") + logger.info { "Resetting simulator state" } ctx.response.sendOrError(simulator.resetState()) } .get("messages/all/count") { ctx -> - logger.info("Processing request for count of received messages") + logger.info { "Processing request for count of received messages" } simulator.state().fold( { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) - logger.warn("Error - number of messages could not be specified") + logger.warn { "Error - number of messages could not be specified" } }, { logger.info { "Returned number of received messages: ${it.messagesCount}" } @@ -90,7 +90,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } .post("messages/all/validate") { ctx -> ctx.request.body.then { body -> - logger.info("Processing request for message validation") + logger.info { "Processing request for message validation" } val response = simulator.validate(body.inputStream) .map { isValid -> if (isValid) { diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 06ff4d59..5856f044 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -43,17 +43,17 @@ fun main(args: Array<String>) = .map(::startApp) .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, { - logger.info("Started DCAE-APP Simulator API server") + logger.info { "Started DCAE-APP Simulator API server" } } ) private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 899f51fb..5c9566c7 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -40,15 +40,15 @@ fun main(args: Array<String>) = .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.info("Gentle shutdown") } + { logger.info { "Gentle shutdown" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } HealthCheckServer.start(config).bind() VesServer.start(config).bind() .await().bind() diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt index 5c6f1277..13b0bc7b 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger abstract class ServerStarter { fun start(config: ServerConfiguration): IO<ServerHandle> = startServer(config) - .map { logger.info(serverStartedMessage(it)); it } + .map { logger.info { serverStartedMessage(it) }; it } protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> protected abstract fun serverStartedMessage(handle: ServerHandle): String diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index bee0dae1..674fb2c3 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -12,6 +12,7 @@ %nopexception%50.50logger | %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} | %highlight(%-5level) +| %mdc{clientId} %mdc{clientAddress} | %msg | %rootException | %thread%n"/> diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt index d017b31b..6ca28a56 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt @@ -31,7 +31,7 @@ import java.time.Duration private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils") object Assertions : org.assertj.core.api.Assertions() { - fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual) + fun <A, B> assertThat(actual: Either<A, B>) = EitherAssert(actual) } @@ -42,7 +42,7 @@ fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) { while (tryNum <= retries) { tryNum++ try { - logger.debug("Try number $tryNum") + logger.debug { "Try number $tryNum" } action() break } catch (ex: Throwable) { diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt index 5a733f24..a25b2912 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt @@ -51,7 +51,7 @@ fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Resp fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) { response.attempt().unsafeRunSync().fold( { err -> - logger.warn("Error occurred. Sending .", err) + logger.withWarn { log("Error occurred. Sending .", err) } val message = err.message send(errorResponse(message)) }, diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 033dd5e5..2fb48803 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -21,117 +21,171 @@ package org.onap.dcae.collectors.veshv.utils.logging import kotlin.reflect.KClass import org.slf4j.LoggerFactory +import org.slf4j.MDC + +typealias MappedDiagnosticContext = () -> Map<String, String> @Suppress("TooManyFunctions", "SuboptimalLoggerUsage") -class Logger(val logger: org.slf4j.Logger) { +class Logger(logger: org.slf4j.Logger) { constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java)) constructor(name: String) : this(LoggerFactory.getLogger(name)) - // - // TRACE - // + private val errorLogger = if (logger.isErrorEnabled) ErrorLevelLogger(logger) else OffLevelLogger + private val warnLogger = if (logger.isWarnEnabled) WarnLevelLogger(logger) else OffLevelLogger + private val infoLogger = if (logger.isInfoEnabled) InfoLevelLogger(logger) else OffLevelLogger + private val debugLogger = if (logger.isDebugEnabled) DebugLevelLogger(logger) else OffLevelLogger + private val traceLogger = if (logger.isTraceEnabled) TraceLevelLogger(logger) else OffLevelLogger - val traceEnabled: Boolean - get() = logger.isTraceEnabled + // ERROR - fun trace(messageProvider: () -> String) { - if (logger.isTraceEnabled) { - logger.trace(messageProvider()) - } - } + fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block() - // - // DEBUG - // + fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + errorLogger.withMdc(mdc, block) - fun debug(message: String) { - logger.debug(message) + fun error(message: () -> String) = errorLogger.run { + log(message()) } - fun debug(message: String, t: Throwable) { - logger.debug(message, t) + fun error(mdc: MappedDiagnosticContext, message: () -> String) = + errorLogger.withMdc(mdc) { log(message()) } + + // WARN + + fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() + + fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + warnLogger.withMdc(mdc, block) + + fun warn(message: () -> String) = warnLogger.run { + log(message()) } - fun debug(messageProvider: () -> String) { - if (logger.isDebugEnabled) { - logger.debug(messageProvider()) - } + fun warn(mdc: MappedDiagnosticContext, message: () -> String) = + warnLogger.withMdc(mdc) { log(message()) } + + + // INFO + + fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() + + fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + infoLogger.withMdc(mdc, block) + + fun info(message: () -> String) = infoLogger.run { + log(message()) } - fun debug(t: Throwable, messageProvider: () -> String) { - if (logger.isDebugEnabled) { - logger.debug(messageProvider(), t) - } + fun info(mdc: MappedDiagnosticContext, message: () -> String) = + infoLogger.withMdc(mdc) { log(message()) } + + // DEBUG + + fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block() + + fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + debugLogger.withMdc(mdc, block) + + fun debug(message: () -> String) = debugLogger.run { + log(message()) } - // - // INFO - // - fun info(message: String) { - logger.info(message) + fun debug(mdc: MappedDiagnosticContext, message: () -> String) = + debugLogger.withMdc(mdc) { log(message()) } + + + // TRACE + + fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block() + + fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + traceLogger.withMdc(mdc, block) + + fun trace(message: () -> String) = traceLogger.run { + log(message()) } - fun info(messageProvider: () -> String) { - if (logger.isInfoEnabled) { - logger.info(messageProvider()) + fun trace(mdc: MappedDiagnosticContext, message: () -> String) = + traceLogger.withMdc(mdc) { log(message()) } + +} + +abstract class AtLevelLogger { + abstract fun log(message: String) + abstract fun log(message: String, t: Throwable) + open val enabled: Boolean + get() = true + + inline fun withMdc(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) { + if (enabled) { + try { + MDC.setContextMap(mdc()) + block() + } finally { + MDC.clear() + } } } +} - fun info(message: String, t: Throwable) { - logger.info(message, t) +object OffLevelLogger : AtLevelLogger() { + override val enabled = false + + override fun log(message: String) { + // do not log anything } - fun info(t: Throwable, messageProvider: () -> String) { - if (logger.isInfoEnabled) { - logger.info(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + // do not log anything } +} - // - // WARN - // +class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.error(message) + } + + override fun log(message: String, t: Throwable) { + logger.error(message, t) + } +} - fun warn(message: String) { +class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { logger.warn(message) } - fun warn(message: String, t: Throwable) { + override fun log(message: String, t: Throwable) { logger.warn(message, t) } +} - fun warn(messageProvider: () -> String) { - if (logger.isWarnEnabled) { - logger.warn(messageProvider()) - } +class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.info(message) } - fun warn(t: Throwable, messageProvider: () -> String) { - if (logger.isWarnEnabled) { - logger.warn(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + logger.info(message, t) } +} - // - // ERROR - // - - fun error(message: String) { - logger.error(message) +class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.debug(message) } - fun error(message: String, t: Throwable) { - logger.error(message, t) + override fun log(message: String, t: Throwable) { + logger.debug(message, t) } +} - fun error(messageProvider: () -> String) { - if (logger.isErrorEnabled) { - logger.error(messageProvider()) - } +class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.trace(message) } - fun error(t: Throwable, messageProvider: () -> String) { - if (logger.isErrorEnabled) { - logger.error(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + logger.trace(message, t) } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index e8ec2549..1e98f2fc 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -25,42 +25,49 @@ import arrow.core.Try import reactor.core.publisher.Flux import reactor.core.publisher.Mono -fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") - logger.debug("Detailed stack trace", ex) +fun <T> Logger.handleReactiveStreamError( + context: MappedDiagnosticContext, + ex: Throwable, + returnFlux: Flux<T> = Flux.empty()): Flux<T> { + warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" } + withDebug(context) { log("Detailed stack trace", ex) } return returnFlux } fun <T> Try<T>.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: (Throwable) -> String): Flux<T> = - fold({ - logger.warn(rejectedMsg(it)) + fold({ ex -> + logger.withWarn(context) { log(rejectedMsg(ex)) } Flux.empty<T>() - }, { - logger.trace { acceptedMsg(it) } - Flux.just(it) + }, { obj -> + logger.trace(context) { acceptedMsg(obj) } + Flux.just(obj) }) fun <T> Option<T>.filterEmptyWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: () -> String): Flux<T> = fold({ - logger.warn(rejectedMsg) + logger.warn(context, rejectedMsg) Flux.empty<T>() }, { - logger.trace { acceptedMsg(it) } + logger.trace(context) { acceptedMsg(it) } Flux.just(it) }) -fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = +fun <T> Flux<T>.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, + predicate: (T) -> Either<() -> String, () -> String>) = flatMap { t -> predicate(t).fold({ - logger.warn(it) + logger.warn(context, it) Mono.empty<T>() }, { - logger.trace(it) + logger.trace(context, it) Mono.just<T>(t) }) } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt index c27fb8c8..10fc8d8f 100644 --- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt @@ -34,11 +34,16 @@ import org.jetbrains.spek.api.dsl.it object LoggerTest : Spek({ lateinit var slf4jLogger: org.slf4j.Logger - lateinit var cut: Logger + fun cut() = Logger(slf4jLogger).also { + verify(slf4jLogger).isTraceEnabled + verify(slf4jLogger).isDebugEnabled + verify(slf4jLogger).isInfoEnabled + verify(slf4jLogger).isWarnEnabled + verify(slf4jLogger).isErrorEnabled + } beforeEachTest { slf4jLogger = mock() - cut = Logger(slf4jLogger) } afterEachTest { @@ -50,28 +55,19 @@ object LoggerTest : Spek({ val exception = Exception("fail") describe("debug levels") { - it("should log message") { - cut.debug(message) - verify(slf4jLogger).debug(message) - } - - it("should log message with exception") { - cut.debug(message, exception) - verify(slf4jLogger).debug(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isDebugEnabled).thenReturn(true) - cut.debug { message } + cut().debug { message } verify(slf4jLogger).isDebugEnabled verify(slf4jLogger).debug(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isDebugEnabled).thenReturn(false) - cut.debug { message } + cut().debug { message } verify(slf4jLogger).isDebugEnabled } } @@ -80,42 +76,33 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isDebugEnabled).thenReturn(true) - cut.debug(exception) { message } + cut().withDebug { log(message, exception) } verify(slf4jLogger).isDebugEnabled verify(slf4jLogger).debug(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isDebugEnabled).thenReturn(false) - cut.debug(exception) { message } + cut().withDebug { log(message, exception) } verify(slf4jLogger).isDebugEnabled } } } describe("info levels") { - it("should log message") { - cut.info(message) - verify(slf4jLogger).info(message) - } - - it("should log message with exception") { - cut.info(message, exception) - verify(slf4jLogger).info(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isInfoEnabled).thenReturn(true) - cut.info { message } + cut().info { message } verify(slf4jLogger).isInfoEnabled verify(slf4jLogger).info(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isInfoEnabled).thenReturn(false) - cut.info { message } + cut().info { message } verify(slf4jLogger).isInfoEnabled } } @@ -124,42 +111,32 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isInfoEnabled).thenReturn(true) - cut.info(exception) { message } + cut().withInfo { log(message, exception) } verify(slf4jLogger).isInfoEnabled verify(slf4jLogger).info(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isInfoEnabled).thenReturn(false) - cut.info(exception) { message } + cut().withInfo { log(message, exception) } verify(slf4jLogger).isInfoEnabled } } } describe("warning levels") { - it("should log message") { - cut.warn(message) - verify(slf4jLogger).warn(message) - } - - it("should log message with exception") { - cut.warn(message, exception) - verify(slf4jLogger).warn(message, exception) - } - describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isWarnEnabled).thenReturn(true) - cut.warn { message } + cut().warn { message } verify(slf4jLogger).isWarnEnabled verify(slf4jLogger).warn(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isWarnEnabled).thenReturn(false) - cut.warn { message } + cut().warn { message } verify(slf4jLogger).isWarnEnabled } } @@ -168,42 +145,33 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isWarnEnabled).thenReturn(true) - cut.warn(exception) { message } + cut().withWarn { log(message, exception) } verify(slf4jLogger).isWarnEnabled verify(slf4jLogger).warn(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isWarnEnabled).thenReturn(false) - cut.warn(exception) { message } + cut().withWarn { log(message, exception) } verify(slf4jLogger).isWarnEnabled } } } describe("error levels") { - it("should log message") { - cut.error(message) - verify(slf4jLogger).error(message) - } - - it("should log message with exception") { - cut.error(message, exception) - verify(slf4jLogger).error(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isErrorEnabled).thenReturn(true) - cut.error { message } + cut().error { message } verify(slf4jLogger).isErrorEnabled verify(slf4jLogger).error(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isErrorEnabled).thenReturn(false) - cut.error { message } + cut().error { message } verify(slf4jLogger).isErrorEnabled } } @@ -212,14 +180,14 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isErrorEnabled).thenReturn(true) - cut.error(exception) { message } + cut().withError { log(message, exception) } verify(slf4jLogger).isErrorEnabled verify(slf4jLogger).error(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isErrorEnabled).thenReturn(false) - cut.error(exception) { message } + cut().withError { log(message, exception) } verify(slf4jLogger).isErrorEnabled } } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt index 0f359df3..da956bec 100644 --- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt @@ -42,7 +42,7 @@ class ReactiveLoggingTest : Spek({ val cut = Try.just(event) it("should not filter stream event and log accepted message") { - cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) .test() .expectNext(event) .verifyComplete() @@ -53,7 +53,7 @@ class ReactiveLoggingTest : Spek({ val e = Exception() val cut = Failure(e) it("should filter stream event and log rejected message") { - cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) .test() .verifyComplete() } @@ -65,7 +65,7 @@ class ReactiveLoggingTest : Spek({ val cut = Option.just(event) it("should not filter stream event and log accepted message") { - cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + cut.filterEmptyWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE) .test() .expectNext(event) .verifyComplete() @@ -75,7 +75,7 @@ class ReactiveLoggingTest : Spek({ given("empty Option") { val cut = Option.empty<Int>() it("should filter stream event and log rejected message") { - cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + cut.filterEmptyWithLog(logger,::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE) .test() .verifyComplete() } @@ -88,7 +88,7 @@ class ReactiveLoggingTest : Spek({ val cut = Flux.just(event) it("should not filter stream event and log accepted message") { - cut.filterFailedWithLog(logger, right()) + cut.filterFailedWithLog(logger,::emptyMap, right()) .test() .expectNext(event) .verifyComplete() @@ -99,7 +99,7 @@ class ReactiveLoggingTest : Spek({ val cut = Flux.just(event) it("should filter stream event and log rejected message") { - cut.filterFailedWithLog(logger, left()) + cut.filterFailedWithLog(logger,::emptyMap, left()) .test() .verifyComplete() } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 57aaf3db..ca6d169a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -61,12 +61,14 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .handle { _, output -> handler(complete, messages, output) } .connect() .doOnError { - logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") + logger.info { + "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" + } } .subscribe { - logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") + logger.info { + "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" + } } return complete.then() } @@ -86,7 +88,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .options { it.flushOnBoundary() } .sendGroups(frames) .then { - logger.info("Messages have been sent") + logger.info { "Messages have been sent" } complete.onComplete() } .then() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index 16019384..cfd3a6e9 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -59,17 +59,17 @@ internal class XnfApiServer( .post("simulator/async", ::startSimulationHandler) .get("simulator/:id", ::simulatorStatusHandler) .get("healthcheck") { ctx -> - logger.info("Checking health") + logger.info { "Checking health" } ctx.response.status(HttpConstants.STATUS_OK).send() } } private fun startSimulationHandler(ctx: Context) { - logger.info("Attempting to start asynchronous scenario") + logger.info { "Attempting to start asynchronous scenario" } ctx.request.body.then { body -> val id = startSimulation(body) when (id) { - is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}"} + is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" } is Either.Right -> logger.info { "Scenario started, details: ${id.b}" } } ctx.response.sendEitherErrorOrResponse(id) @@ -83,7 +83,7 @@ internal class XnfApiServer( } private fun simulatorStatusHandler(ctx: Context) { - logger.debug("Checking task status") + logger.debug { "Checking task status" } val id = UUID.fromString(ctx.pathTokens["id"]) logger.debug { "Checking status for id: $id" } val status = ongoingSimulations.status(id) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index 21748ae8..d7d42d88 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -43,11 +43,11 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> result.fold( { err -> - logger.warn("Error", err) + logger.withWarn { log("Error", err) } simulations[id] = StatusFailure(err) }, { - logger.info("Finished sending messages") + logger.info { "Finished sending messages" } simulations[id] = StatusSuccess } ) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 4512dfbf..91070d35 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -42,7 +42,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) .map { config -> - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } val xnfSimulator = XnfSimulator( VesHvClient(config), MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) @@ -52,10 +52,10 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) } .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, { - logger.info("Started xNF Simulator API server") + logger.info { "Started xNF Simulator API server" } } ) |