aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml99
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt43
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt12
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt60
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt21
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt58
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt10
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt4
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt3
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt10
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt5
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt2
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt2
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt4
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt8
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt6
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt6
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt2
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml1
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt4
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt2
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt192
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt33
-rw-r--r--sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt78
-rw-r--r--sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt12
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt12
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt8
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt4
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt6
37 files changed, 469 insertions, 328 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 4015b08b..d4c3f1d8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,45 +1,73 @@
version: "3.5"
services:
- zookeeper:
+
+ #
+ # DMaaP Message Router
+ #
+
+ message-router-zookeeper:
image: wurstmeister/zookeeper
ports:
- - "2181:2181"
+ - "2181:2181"
- kafka:
+ message-router-kafka:
+# image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
image: wurstmeister/kafka
ports:
- - "9092:9092"
+ - "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
- KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
- KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
+ KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181"
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
+ KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092"
+ KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092"
+ KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
volumes:
- - /var/run/docker.sock:/var/run/docker.sock
+ - /var/run/docker.sock:/var/run/docker.sock
+ depends_on:
+ - message-router-zookeeper
+
+
+ #
+ # Consul / CBS
+ #
+
+ consul-server:
+ image: docker.io/consul:1.0.6
+ ports:
+ - "8500:8500"
+ command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"]
+
+ consul-config:
+ image: consul
depends_on:
- - zookeeper
+ - consul-server
+ restart: on-failure
+ command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
+ "dmaap.kafkaBootstrapServers": "message-router-kafka:9092",
+ "collector.routing": [
+ {
+ "fromDomain": "perf3gpp",
+ "toTopic": "HV_VES_PERF3GPP"
+ }
+ ]
+ }']
+
- consul:
- image: progrium/consul
- ports:
- - "8500:8500"
- environment:
- - CONSUL_BIND_INTERFACE=eth0
- command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
+ #
+ # DCAE HV VES Collector
+ #
ves-hv-collector:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
-# build:
-# context: hv-collector-main
-# dockerfile: Dockerfile
ports:
- - "6060:6060"
- - "6061:6061/tcp"
+ - "6060:6060"
+ - "6061:6061/tcp"
entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
"-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
- "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true",
+ "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
healthcheck:
@@ -49,37 +77,36 @@ services:
retries: 3
start_period: 20s
depends_on:
- - kafka
- - consul
+ - message-router-kafka
+ - consul-server
volumes:
- - ./ssl/:/etc/ves-hv/
+ - ./ssl/:/etc/ves-hv/
+
+
+ #
+ # Simulators
+ #
xnf-simulator:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
-# build:
-# context: hv-collector-xnf-simulator
-# dockerfile: Dockerfile
ports:
- - "6062:6062/tcp"
+ - "6062:6062/tcp"
command: ["--listen-port", "6062",
"--ves-host", "ves-hv-collector",
"--ves-port", "6061",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
depends_on:
- - ves-hv-collector
+ - ves-hv-collector
volumes:
- ./ssl/:/etc/ves-hv/
dcae-app-simulator:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
-# build:
-# context: hv-collector-dcae-app-simulator
-# dockerfile: Dockerfile
ports:
- - "6063:6063/tcp"
+ - "6063:6063/tcp"
command: ["--listen-port", "6063",
- "--kafka-bootstrap-servers", "kafka:9092",
+ "--kafka-bootstrap-servers", "message-router-kafka:9092",
"--kafka-topics", "HV_VES_PERF3GPP"]
depends_on:
- - kafka
+ - message-router-kafka
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index dd0111bc..b686b250 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
@@ -35,12 +36,12 @@ interface Metrics {
@FunctionalInterface
interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration): Sink
+ operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
companion object {
fun just(sink: Sink): SinkProvider =
object : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink = sink
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
}
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 3c85a9b1..5584d61d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -23,15 +23,17 @@ import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import java.util.*
interface Collector {
- fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
+ fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = () -> Option<Collector>
+typealias CollectorProvider = (ClientContext) -> Option<Collector>
interface Server {
fun start(): IO<ServerHandle>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 5c96e1c5..2008fc35 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
+ val config: AtomicReference<CollectorConfiguration> = AtomicReference()
configuration()
- .map(this::createVesHvCollector)
.doOnNext {
- logger.info("Using updated configuration for new connections")
+ logger.info { "Using updated configuration for new connections" }
healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error("Failed to acquire configuration from consul")
+ logger.error { "Failed to acquire configuration from consul" }
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
- .subscribe(collector::set)
- return collector::getOption
+ .subscribe(config::set)
+ return { ctx: ClientContext ->
+ config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ }
}
- private fun createVesHvCollector(config: CollectorConfiguration): Collector {
- return VesHvCollector(
- wireChunkDecoderSupplier = { alloc ->
- WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
- },
- protobufDecoder = VesDecoder(),
- router = Router(config.routing),
- sink = sinkProvider(config),
- metrics = metrics)
- }
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(config, ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index cee658b6..0977595a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -20,11 +20,22 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.Routing
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
-class Router(private val routing: Routing) {
+class Router(private val routing: Routing, private val ctx: ClientContext) {
fun findDestination(message: VesMessage): Option<RoutedMessage> =
- routing.routeFor(message.header).map { it(message) }
+ routing.routeFor(message.header).map { it(message) }.also {
+ if (it.isEmpty()) {
+ logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
+ }
+ }
+
+ companion object {
+ private val logger = Logger(Routing::class)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 4176de99..0d07504d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Either
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -42,28 +42,27 @@ import reactor.core.publisher.Mono
* @since May 2018
*/
internal class VesHvCollector(
- private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+ private val clientContext: ClientContext,
+ private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val router: Router,
private val sink: Sink,
private val metrics: Metrics) : Collector {
- override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
- wireChunkDecoderSupplier(alloc).let { wireDecoder ->
- dataStream
- .transform { decodeWireFrame(it, wireDecoder) }
- .transform(::filterInvalidWireFrame)
- .transform(::decodeProtobufPayload)
- .transform(::filterInvalidProtobufMessages)
- .transform(::routeMessage)
- .onErrorResume { logger.handleReactiveStreamError(it) }
- .doFinally { releaseBuffersMemory(wireDecoder) }
- .then()
- }
+ override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+ dataStream
+ .transform { decodeWireFrame(it) }
+ .transform(::filterInvalidWireFrame)
+ .transform(::decodeProtobufPayload)
+ .transform(::filterInvalidProtobufMessages)
+ .transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+ .doFinally { releaseBuffersMemory() }
+ .then()
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(decoder::decode)
+ .concatMap(wireChunkDecoder::decode)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
@@ -75,7 +74,7 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
- .filterFailedWithLog(logger,
+ .filterFailedWithLog(logger, clientContext::asMap,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
@@ -89,15 +88,15 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
- .filterEmptyWithLog(logger,
+ .filterEmptyWithLog(logger, clientContext::asMap,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- .also { logger.debug("Released buffer memory after handling message stream") }
+ private fun releaseBuffersMemory() = wireChunkDecoder.release()
+ .also { logger.debug { "Released buffer memory after handling message stream" } }
fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
- filterFailedWithLog(logger, predicate)
+ filterFailedWithLog(logger, clientContext::asMap, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index cea8a7ee..bbaa47c4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.warn("Could not get fresh configuration", it.exception())
+ logger.withWarn { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index bdce6f73..3fefc6e8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import io.netty.handler.codec.http.HttpStatusClass
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
@@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient
*/
open class HttpAdapter(private val httpClient: HttpClient) {
- private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
-
open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
.get()
.uri(url + createQueryString(queryParams))
@@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
}
.doOnError {
- logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
- logger.debug("Nested exception:", it)
+ logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+ logger.withDebug { log("Nested exception:", it) }
}
private fun createQueryString(params: Map<String, Any>): String {
@@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) {
return builder.removeSuffix("&").toString()
}
+ companion object {
+
+
+ private val logger = Logger(HttpAdapter::class)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 5f4bf354..f6cb018f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
@@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider {
val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
if (msgs % INFO_LOGGING_FREQ == 0L)
- logger.info(logMessageSupplier)
+ logger.info(ctx, logMessageSupplier)
else
- logger.trace(logMessageSupplier)
+ logger.trace(ctx, logMessageSupplier)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index c4d6c87e..fd08ba3d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,6 +20,9 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
if (it.isSuccessful()) {
Mono.just(it)
} else {
- logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
Mono.empty<SenderResult<RoutedMessage>>()
}
}
.map { it.correlationMetadata() }
- return if (logger.traceEnabled) {
- result.doOnNext(::logSentMessage)
- } else {
- result
- }
+ return result.doOnNext(::logSentMessage)
}
private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace {
+ logger.trace(ctx) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 18191952..b4f470d4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.apache.kafka.clients.producer.ProducerConfig
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
@@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions
* @since June 2018
*/
internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
}
private fun constructSenderOptions(config: CollectorConfiguration) =
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 0b2997fa..2d29fe99 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -23,6 +23,10 @@ import arrow.core.getOrElse
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
sslContextFactory
.createSslContext(serverConfig.securityConfiguration)
.map { sslContext ->
- logger.info("Collector configured with SSL enabled")
+ logger.info { "Collector configured with SSL enabled" }
this.secure { b -> b.sslContext(sslContext) }
}.getOrElse {
- logger.info("Collector configured with SSL disabled")
+ logger.info { "Collector configured with SSL disabled" }
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- collectorProvider().fold(
- {
- nettyInbound.withConnection { conn ->
- logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
- }
- Mono.empty()
- },
- {
- nettyInbound.withConnection { conn ->
- logger.info { "Handling connection from ${conn.address()}" }
- conn.configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- }
- it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ val clientContext = ClientContext(nettyOutbound.alloc())
+ nettyInbound.withConnection {
+ clientContext.clientAddress = it.address()
+ }
+
+ return collectorProvider(clientContext).fold(
+ {
+ logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling new connection" }
+ nettyInbound.withConnection { conn ->
+ conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
}
- )
+ it.handleConnection(createDataStream(nettyInbound))
+ }
+ )
+ }
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
- logger.info {
+ logger.info(ctx) {
"Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
- disconnectClient()
+ disconnectClient(ctx)
}
return this
}
- private fun Connection.disconnectClient() {
+ private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${address()}) closed successfully." }
+ logger.debug(ctx) { "Channel closed successfully." }
else
- logger.warn("Channel close failed", it.cause())
+ logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
}
}
- private fun Connection.logConnectionClosed(): Connection {
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
- logger.info("Connection from ${address()} has been closed")
+ logger.info(ctx) { "Connection has been closed" }
}
return this
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 4a2ef6b2..349b0787 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire
import arrow.effects.IO
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
@@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink
*/
internal class WireChunkDecoder(
private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- private val streamBuffer = alloc.compositeBuffer()
+ private val ctx: ClientContext) {
+ private val streamBuffer = ctx.alloc.compositeBuffer()
fun release() {
streamBuffer.release()
@@ -53,7 +54,7 @@ internal class WireChunkDecoder(
} else {
streamBuffer.addComponent(true, byteBuf)
generateFrames()
- .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
.doFinally { streamBuffer.discardReadComponents() }
}
}
@@ -84,15 +85,15 @@ internal class WireChunkDecoder(
}
private fun logIncomingMessage(wire: ByteBuf) {
- logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+ logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" }
}
private fun logDecodedWireMessage(wire: WireFrameMessage) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" }
}
private fun logEndOfData() {
- logger.trace { "End of data in current TCP buffer" }
+ logger.trace(ctx) { "End of data in current TCP buffer" }
}
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
new file mode 100644
index 00000000..f14a7f65
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.slf4j.MDC
+import java.net.InetSocketAddress
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class ClientContext(
+ val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
+ val clientId: String = UUID.randomUUID().toString(),
+ var clientAddress: InetSocketAddress? = null) {
+ fun asMap(): Map<String, String> {
+ val result = mutableMapOf("clientId" to clientId)
+ if (clientAddress != null) {
+ result["clientAddress"] = clientAddress.toString()
+ }
+ return result
+ }
+}
+
+object ClientContextLogging {
+ fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
+ fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
+ fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
+ fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
+ fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+
+ fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
+ fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
+ fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
+ fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
+ fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index 437614ac..ad97a3f7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
- if (it.isEmpty()) {
- logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
- }
- }
-
- companion object {
- private val logger = Logger(Routing::class)
- }
+ Option.fromNullable(routes.find { it.applies(commonHeader) })
}
data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index e8a31231..e4190163 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
import arrow.core.Some
+import io.netty.buffer.ByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
@@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
@@ -56,7 +58,7 @@ object RouterTest : Spek({
withFixedPartitioning()
}
}.build()
- val cut = Router(config)
+ val cut = Router(config, ClientContext())
on("message with existing route (rtpm)") {
val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index f06a0dc7..e0092cf9 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
import reactor.test.test
/**
@@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
- fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc))
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 0897e910..ef4ce967 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({
)
val fluxes = (1.rangeTo(runs)).map {
- sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+ sut.collector.handleConnection(generateDataStream(sut.alloc, params))
}
val durationMs = measureTimeMillis {
Flux.merge(fluxes).then().block(timeout)
@@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
- logger.info("Processed $runs connections each containing $numMessages msgs.")
- logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ logger.info { "Processed $runs connections each containing $numMessages msgs." }
+ logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
@@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({
val dataStream = generateDataStream(sut.alloc, params)
.transform(::dropWhenIndex.partially1 { it % 101 == 0L })
- sut.collector.handleConnection(sut.alloc, dataStream)
+ sut.collector.handleConnection(dataStream)
.timeout(timeout)
.block()
- logger.info("Forwarded ${sink.count} msgs")
+ logger.info { "Forwarded ${sink.count} msgs" }
assertThat(sink.count)
.describedAs("should send up to number of events")
.isLessThan(numMessages)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 0495ced5..ce242e0b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
@@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) {
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+ get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") }
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
@@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) {
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
return sink.sentMessages
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2d81c671..ab59cc2e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
.map { vesWireFrameMessage(PERF3GPP) }
- sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+ sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 417183fb..f7d94de5 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -46,7 +46,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
throw IllegalArgumentException(message)
}
- logger.info("Received new configuration. Creating consumer for topics: $topics")
+ logger.info { "Received new configuration. Creating consumer for topics: $topics" }
consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
}.fix()
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 20c0f592..36f30e66 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -61,13 +61,13 @@ class MessageStreamValidation(
return messageParams.fold(
{
logger.warn { "Error while parsing message parameters: ${it::class.qualifiedName} : ${it.message}" }
- logger.debug { "Detailed stack trace: ${it}" }
+ logger.debug { "Detailed stack trace: $it" }
throw IllegalArgumentException("Parsing error: " + it.message)
},
{
if (it.isEmpty()) {
val message = "Message param list cannot be empty"
- logger.warn(message)
+ logger.warn { message }
throw IllegalArgumentException(message)
}
it
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index a6ee1122..e54eb359 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -71,15 +71,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
}
.delete("messages") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
- logger.info("Resetting simulator state")
+ logger.info { "Resetting simulator state" }
ctx.response.sendOrError(simulator.resetState())
}
.get("messages/all/count") { ctx ->
- logger.info("Processing request for count of received messages")
+ logger.info { "Processing request for count of received messages" }
simulator.state().fold(
{
ctx.response.status(HttpConstants.STATUS_NOT_FOUND)
- logger.warn("Error - number of messages could not be specified")
+ logger.warn { "Error - number of messages could not be specified" }
},
{
logger.info { "Returned number of received messages: ${it.messagesCount}" }
@@ -90,7 +90,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
}
.post("messages/all/validate") { ctx ->
ctx.request.body.then { body ->
- logger.info("Processing request for message validation")
+ logger.info { "Processing request for message validation" }
val response = simulator.validate(body.inputStream)
.map { isValid ->
if (isValid) {
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 06ff4d59..5856f044 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -43,17 +43,17 @@ fun main(args: Array<String>) =
.map(::startApp)
.unsafeRunEitherSync(
{ ex ->
- logger.error("Failed to start a server", ex)
+ logger.withError { log("Failed to start a server", ex) }
ExitFailure(1)
},
{
- logger.info("Started DCAE-APP Simulator API server")
+ logger.info { "Started DCAE-APP Simulator API server" }
}
)
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- logger.info("Using configuration: $config")
+ logger.info { "Using configuration: $config" }
val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 899f51fb..5c9566c7 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -40,15 +40,15 @@ fun main(args: Array<String>) =
.map(::startAndAwaitServers)
.unsafeRunEitherSync(
{ ex ->
- logger.error("Failed to start a server", ex)
+ logger.withError { log("Failed to start a server", ex) }
ExitFailure(1)
},
- { logger.info("Gentle shutdown") }
+ { logger.info { "Gentle shutdown" } }
)
private fun startAndAwaitServers(config: ServerConfiguration) =
IO.monad().binding {
- logger.info("Using configuration: $config")
+ logger.info { "Using configuration: $config" }
HealthCheckServer.start(config).bind()
VesServer.start(config).bind()
.await().bind()
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
index 5c6f1277..13b0bc7b 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
abstract class ServerStarter {
fun start(config: ServerConfiguration): IO<ServerHandle> =
startServer(config)
- .map { logger.info(serverStartedMessage(it)); it }
+ .map { logger.info { serverStartedMessage(it) }; it }
protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
protected abstract fun serverStartedMessage(handle: ServerHandle): String
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index bee0dae1..674fb2c3 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -12,6 +12,7 @@
%nopexception%50.50logger
| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
| %highlight(%-5level)
+| %mdc{clientId} %mdc{clientAddress}
| %msg
| %rootException
| %thread%n"/>
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
index d017b31b..6ca28a56 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
@@ -31,7 +31,7 @@ import java.time.Duration
private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils")
object Assertions : org.assertj.core.api.Assertions() {
- fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
+ fun <A, B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
}
@@ -42,7 +42,7 @@ fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) {
while (tryNum <= retries) {
tryNum++
try {
- logger.debug("Try number $tryNum")
+ logger.debug { "Try number $tryNum" }
action()
break
} catch (ex: Throwable) {
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
index 5a733f24..a25b2912 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
@@ -51,7 +51,7 @@ fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Resp
fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
response.attempt().unsafeRunSync().fold(
{ err ->
- logger.warn("Error occurred. Sending .", err)
+ logger.withWarn { log("Error occurred. Sending .", err) }
val message = err.message
send(errorResponse(message))
},
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index 033dd5e5..2fb48803 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -21,117 +21,171 @@ package org.onap.dcae.collectors.veshv.utils.logging
import kotlin.reflect.KClass
import org.slf4j.LoggerFactory
+import org.slf4j.MDC
+
+typealias MappedDiagnosticContext = () -> Map<String, String>
@Suppress("TooManyFunctions", "SuboptimalLoggerUsage")
-class Logger(val logger: org.slf4j.Logger) {
+class Logger(logger: org.slf4j.Logger) {
constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
constructor(name: String) : this(LoggerFactory.getLogger(name))
- //
- // TRACE
- //
+ private val errorLogger = if (logger.isErrorEnabled) ErrorLevelLogger(logger) else OffLevelLogger
+ private val warnLogger = if (logger.isWarnEnabled) WarnLevelLogger(logger) else OffLevelLogger
+ private val infoLogger = if (logger.isInfoEnabled) InfoLevelLogger(logger) else OffLevelLogger
+ private val debugLogger = if (logger.isDebugEnabled) DebugLevelLogger(logger) else OffLevelLogger
+ private val traceLogger = if (logger.isTraceEnabled) TraceLevelLogger(logger) else OffLevelLogger
- val traceEnabled: Boolean
- get() = logger.isTraceEnabled
+ // ERROR
- fun trace(messageProvider: () -> String) {
- if (logger.isTraceEnabled) {
- logger.trace(messageProvider())
- }
- }
+ fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block()
- //
- // DEBUG
- //
+ fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+ errorLogger.withMdc(mdc, block)
- fun debug(message: String) {
- logger.debug(message)
+ fun error(message: () -> String) = errorLogger.run {
+ log(message())
}
- fun debug(message: String, t: Throwable) {
- logger.debug(message, t)
+ fun error(mdc: MappedDiagnosticContext, message: () -> String) =
+ errorLogger.withMdc(mdc) { log(message()) }
+
+ // WARN
+
+ fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
+
+ fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+ warnLogger.withMdc(mdc, block)
+
+ fun warn(message: () -> String) = warnLogger.run {
+ log(message())
}
- fun debug(messageProvider: () -> String) {
- if (logger.isDebugEnabled) {
- logger.debug(messageProvider())
- }
+ fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
+ warnLogger.withMdc(mdc) { log(message()) }
+
+
+ // INFO
+
+ fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
+
+ fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+ infoLogger.withMdc(mdc, block)
+
+ fun info(message: () -> String) = infoLogger.run {
+ log(message())
}
- fun debug(t: Throwable, messageProvider: () -> String) {
- if (logger.isDebugEnabled) {
- logger.debug(messageProvider(), t)
- }
+ fun info(mdc: MappedDiagnosticContext, message: () -> String) =
+ infoLogger.withMdc(mdc) { log(message()) }
+
+ // DEBUG
+
+ fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
+
+ fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+ debugLogger.withMdc(mdc, block)
+
+ fun debug(message: () -> String) = debugLogger.run {
+ log(message())
}
- //
- // INFO
- //
- fun info(message: String) {
- logger.info(message)
+ fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
+ debugLogger.withMdc(mdc) { log(message()) }
+
+
+ // TRACE
+
+ fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block()
+
+ fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+ traceLogger.withMdc(mdc, block)
+
+ fun trace(message: () -> String) = traceLogger.run {
+ log(message())
}
- fun info(messageProvider: () -> String) {
- if (logger.isInfoEnabled) {
- logger.info(messageProvider())
+ fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
+ traceLogger.withMdc(mdc) { log(message()) }
+
+}
+
+abstract class AtLevelLogger {
+ abstract fun log(message: String)
+ abstract fun log(message: String, t: Throwable)
+ open val enabled: Boolean
+ get() = true
+
+ inline fun withMdc(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) {
+ if (enabled) {
+ try {
+ MDC.setContextMap(mdc())
+ block()
+ } finally {
+ MDC.clear()
+ }
}
}
+}
- fun info(message: String, t: Throwable) {
- logger.info(message, t)
+object OffLevelLogger : AtLevelLogger() {
+ override val enabled = false
+
+ override fun log(message: String) {
+ // do not log anything
}
- fun info(t: Throwable, messageProvider: () -> String) {
- if (logger.isInfoEnabled) {
- logger.info(messageProvider(), t)
- }
+ override fun log(message: String, t: Throwable) {
+ // do not log anything
}
+}
- //
- // WARN
- //
+class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+ override fun log(message: String) {
+ logger.error(message)
+ }
+
+ override fun log(message: String, t: Throwable) {
+ logger.error(message, t)
+ }
+}
- fun warn(message: String) {
+class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+ override fun log(message: String) {
logger.warn(message)
}
- fun warn(message: String, t: Throwable) {
+ override fun log(message: String, t: Throwable) {
logger.warn(message, t)
}
+}
- fun warn(messageProvider: () -> String) {
- if (logger.isWarnEnabled) {
- logger.warn(messageProvider())
- }
+class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+ override fun log(message: String) {
+ logger.info(message)
}
- fun warn(t: Throwable, messageProvider: () -> String) {
- if (logger.isWarnEnabled) {
- logger.warn(messageProvider(), t)
- }
+ override fun log(message: String, t: Throwable) {
+ logger.info(message, t)
}
+}
- //
- // ERROR
- //
-
- fun error(message: String) {
- logger.error(message)
+class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+ override fun log(message: String) {
+ logger.debug(message)
}
- fun error(message: String, t: Throwable) {
- logger.error(message, t)
+ override fun log(message: String, t: Throwable) {
+ logger.debug(message, t)
}
+}
- fun error(messageProvider: () -> String) {
- if (logger.isErrorEnabled) {
- logger.error(messageProvider())
- }
+class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+ override fun log(message: String) {
+ logger.trace(message)
}
- fun error(t: Throwable, messageProvider: () -> String) {
- if (logger.isErrorEnabled) {
- logger.error(messageProvider(), t)
- }
+ override fun log(message: String, t: Throwable) {
+ logger.trace(message, t)
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index e8ec2549..1e98f2fc 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -25,42 +25,49 @@ import arrow.core.Try
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
- logger.debug("Detailed stack trace", ex)
+fun <T> Logger.handleReactiveStreamError(
+ context: MappedDiagnosticContext,
+ ex: Throwable,
+ returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+ warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+ withDebug(context) { log("Detailed stack trace", ex) }
return returnFlux
}
fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
acceptedMsg: (T) -> String,
rejectedMsg: (Throwable) -> String): Flux<T> =
- fold({
- logger.warn(rejectedMsg(it))
+ fold({ ex ->
+ logger.withWarn(context) { log(rejectedMsg(ex)) }
Flux.empty<T>()
- }, {
- logger.trace { acceptedMsg(it) }
- Flux.just(it)
+ }, { obj ->
+ logger.trace(context) { acceptedMsg(obj) }
+ Flux.just(obj)
})
fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
acceptedMsg: (T) -> String,
rejectedMsg: () -> String): Flux<T> =
fold({
- logger.warn(rejectedMsg)
+ logger.warn(context, rejectedMsg)
Flux.empty<T>()
}, {
- logger.trace { acceptedMsg(it) }
+ logger.trace(context) { acceptedMsg(it) }
Flux.just(it)
})
-fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
+ predicate: (T) -> Either<() -> String, () -> String>) =
flatMap { t ->
predicate(t).fold({
- logger.warn(it)
+ logger.warn(context, it)
Mono.empty<T>()
}, {
- logger.trace(it)
+ logger.trace(context, it)
Mono.just<T>(t)
})
}
diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
index c27fb8c8..10fc8d8f 100644
--- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
+++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
@@ -34,11 +34,16 @@ import org.jetbrains.spek.api.dsl.it
object LoggerTest : Spek({
lateinit var slf4jLogger: org.slf4j.Logger
- lateinit var cut: Logger
+ fun cut() = Logger(slf4jLogger).also {
+ verify(slf4jLogger).isTraceEnabled
+ verify(slf4jLogger).isDebugEnabled
+ verify(slf4jLogger).isInfoEnabled
+ verify(slf4jLogger).isWarnEnabled
+ verify(slf4jLogger).isErrorEnabled
+ }
beforeEachTest {
slf4jLogger = mock()
- cut = Logger(slf4jLogger)
}
afterEachTest {
@@ -50,28 +55,19 @@ object LoggerTest : Spek({
val exception = Exception("fail")
describe("debug levels") {
- it("should log message") {
- cut.debug(message)
- verify(slf4jLogger).debug(message)
- }
-
- it("should log message with exception") {
- cut.debug(message, exception)
- verify(slf4jLogger).debug(message, exception)
- }
describe("lazy logging message") {
it("should log when debug is ON") {
whenever(slf4jLogger.isDebugEnabled).thenReturn(true)
- cut.debug { message }
+ cut().debug { message }
verify(slf4jLogger).isDebugEnabled
verify(slf4jLogger).debug(message)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isDebugEnabled).thenReturn(false)
- cut.debug { message }
+ cut().debug { message }
verify(slf4jLogger).isDebugEnabled
}
}
@@ -80,42 +76,33 @@ object LoggerTest : Spek({
it("should log when debug is ON") {
whenever(slf4jLogger.isDebugEnabled).thenReturn(true)
- cut.debug(exception) { message }
+ cut().withDebug { log(message, exception) }
verify(slf4jLogger).isDebugEnabled
verify(slf4jLogger).debug(message, exception)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isDebugEnabled).thenReturn(false)
- cut.debug(exception) { message }
+ cut().withDebug { log(message, exception) }
verify(slf4jLogger).isDebugEnabled
}
}
}
describe("info levels") {
- it("should log message") {
- cut.info(message)
- verify(slf4jLogger).info(message)
- }
-
- it("should log message with exception") {
- cut.info(message, exception)
- verify(slf4jLogger).info(message, exception)
- }
describe("lazy logging message") {
it("should log when debug is ON") {
whenever(slf4jLogger.isInfoEnabled).thenReturn(true)
- cut.info { message }
+ cut().info { message }
verify(slf4jLogger).isInfoEnabled
verify(slf4jLogger).info(message)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isInfoEnabled).thenReturn(false)
- cut.info { message }
+ cut().info { message }
verify(slf4jLogger).isInfoEnabled
}
}
@@ -124,42 +111,32 @@ object LoggerTest : Spek({
it("should log when debug is ON") {
whenever(slf4jLogger.isInfoEnabled).thenReturn(true)
- cut.info(exception) { message }
+ cut().withInfo { log(message, exception) }
verify(slf4jLogger).isInfoEnabled
verify(slf4jLogger).info(message, exception)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isInfoEnabled).thenReturn(false)
- cut.info(exception) { message }
+ cut().withInfo { log(message, exception) }
verify(slf4jLogger).isInfoEnabled
}
}
}
describe("warning levels") {
- it("should log message") {
- cut.warn(message)
- verify(slf4jLogger).warn(message)
- }
-
- it("should log message with exception") {
- cut.warn(message, exception)
- verify(slf4jLogger).warn(message, exception)
- }
-
describe("lazy logging message") {
it("should log when debug is ON") {
whenever(slf4jLogger.isWarnEnabled).thenReturn(true)
- cut.warn { message }
+ cut().warn { message }
verify(slf4jLogger).isWarnEnabled
verify(slf4jLogger).warn(message)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isWarnEnabled).thenReturn(false)
- cut.warn { message }
+ cut().warn { message }
verify(slf4jLogger).isWarnEnabled
}
}
@@ -168,42 +145,33 @@ object LoggerTest : Spek({
it("should log when debug is ON") {
whenever(slf4jLogger.isWarnEnabled).thenReturn(true)
- cut.warn(exception) { message }
+ cut().withWarn { log(message, exception) }
verify(slf4jLogger).isWarnEnabled
verify(slf4jLogger).warn(message, exception)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isWarnEnabled).thenReturn(false)
- cut.warn(exception) { message }
+ cut().withWarn { log(message, exception) }
verify(slf4jLogger).isWarnEnabled
}
}
}
describe("error levels") {
- it("should log message") {
- cut.error(message)
- verify(slf4jLogger).error(message)
- }
-
- it("should log message with exception") {
- cut.error(message, exception)
- verify(slf4jLogger).error(message, exception)
- }
describe("lazy logging message") {
it("should log when debug is ON") {
whenever(slf4jLogger.isErrorEnabled).thenReturn(true)
- cut.error { message }
+ cut().error { message }
verify(slf4jLogger).isErrorEnabled
verify(slf4jLogger).error(message)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isErrorEnabled).thenReturn(false)
- cut.error { message }
+ cut().error { message }
verify(slf4jLogger).isErrorEnabled
}
}
@@ -212,14 +180,14 @@ object LoggerTest : Spek({
it("should log when debug is ON") {
whenever(slf4jLogger.isErrorEnabled).thenReturn(true)
- cut.error(exception) { message }
+ cut().withError { log(message, exception) }
verify(slf4jLogger).isErrorEnabled
verify(slf4jLogger).error(message, exception)
}
it("should not log when debug is OFF") {
whenever(slf4jLogger.isErrorEnabled).thenReturn(false)
- cut.error(exception) { message }
+ cut().withError { log(message, exception) }
verify(slf4jLogger).isErrorEnabled
}
}
diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
index 0f359df3..da956bec 100644
--- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
+++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
@@ -42,7 +42,7 @@ class ReactiveLoggingTest : Spek({
val cut = Try.just(event)
it("should not filter stream event and log accepted message") {
- cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+ cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
.test()
.expectNext(event)
.verifyComplete()
@@ -53,7 +53,7 @@ class ReactiveLoggingTest : Spek({
val e = Exception()
val cut = Failure(e)
it("should filter stream event and log rejected message") {
- cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+ cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
.test()
.verifyComplete()
}
@@ -65,7 +65,7 @@ class ReactiveLoggingTest : Spek({
val cut = Option.just(event)
it("should not filter stream event and log accepted message") {
- cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+ cut.filterEmptyWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE)
.test()
.expectNext(event)
.verifyComplete()
@@ -75,7 +75,7 @@ class ReactiveLoggingTest : Spek({
given("empty Option") {
val cut = Option.empty<Int>()
it("should filter stream event and log rejected message") {
- cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+ cut.filterEmptyWithLog(logger,::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE)
.test()
.verifyComplete()
}
@@ -88,7 +88,7 @@ class ReactiveLoggingTest : Spek({
val cut = Flux.just(event)
it("should not filter stream event and log accepted message") {
- cut.filterFailedWithLog(logger, right())
+ cut.filterFailedWithLog(logger,::emptyMap, right())
.test()
.expectNext(event)
.verifyComplete()
@@ -99,7 +99,7 @@ class ReactiveLoggingTest : Spek({
val cut = Flux.just(event)
it("should filter stream event and log rejected message") {
- cut.filterFailedWithLog(logger, left())
+ cut.filterFailedWithLog(logger,::emptyMap, left())
.test()
.verifyComplete()
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index 57aaf3db..ca6d169a 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -61,12 +61,14 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
.handle { _, output -> handler(complete, messages, output) }
.connect()
.doOnError {
- logger.info("Failed to connect to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
+ logger.info {
+ "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
+ }
}
.subscribe {
- logger.info("Connected to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
+ logger.info {
+ "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
+ }
}
return complete.then()
}
@@ -86,7 +88,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
.options { it.flushOnBoundary() }
.sendGroups(frames)
.then {
- logger.info("Messages have been sent")
+ logger.info { "Messages have been sent" }
complete.onComplete()
}
.then()
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
index 16019384..cfd3a6e9 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -59,17 +59,17 @@ internal class XnfApiServer(
.post("simulator/async", ::startSimulationHandler)
.get("simulator/:id", ::simulatorStatusHandler)
.get("healthcheck") { ctx ->
- logger.info("Checking health")
+ logger.info { "Checking health" }
ctx.response.status(HttpConstants.STATUS_OK).send()
}
}
private fun startSimulationHandler(ctx: Context) {
- logger.info("Attempting to start asynchronous scenario")
+ logger.info { "Attempting to start asynchronous scenario" }
ctx.request.body.then { body ->
val id = startSimulation(body)
when (id) {
- is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}"}
+ is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" }
is Either.Right -> logger.info { "Scenario started, details: ${id.b}" }
}
ctx.response.sendEitherErrorOrResponse(id)
@@ -83,7 +83,7 @@ internal class XnfApiServer(
}
private fun simulatorStatusHandler(ctx: Context) {
- logger.debug("Checking task status")
+ logger.debug { "Checking task status" }
val id = UUID.fromString(ctx.pathTokens["id"])
logger.debug { "Checking status for id: $id" }
val status = ongoingSimulations.status(id)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
index 21748ae8..d7d42d88 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -43,11 +43,11 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
result.fold(
{ err ->
- logger.warn("Error", err)
+ logger.withWarn { log("Error", err) }
simulations[id] = StatusFailure(err)
},
{
- logger.info("Finished sending messages")
+ logger.info { "Finished sending messages" }
simulations[id] = StatusSuccess
}
)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 4512dfbf..91070d35 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -42,7 +42,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
.map { config ->
- logger.info("Using configuration: $config")
+ logger.info { "Using configuration: $config" }
val xnfSimulator = XnfSimulator(
VesHvClient(config),
MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
@@ -52,10 +52,10 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
}
.unsafeRunEitherSync(
{ ex ->
- logger.error("Failed to start a server", ex)
+ logger.withError { log("Failed to start a server", ex) }
ExitFailure(1)
},
{
- logger.info("Started xNF Simulator API server")
+ logger.info { "Started xNF Simulator API server" }
}
)