aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-07 14:41:39 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-10 14:46:23 +0100
commit8b8c37c296e55644063e0332fd455437168e78da (patch)
tree36e9d96217346dd4296677cfd8af584c69a0ad05 /sources/hv-collector-core
parent73293332b2244b66083dc5d3910801c1b1058105 (diff)
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in reactive application, we need to do some work manually. The approach proposed is an explicit MDC handling, which means that context is kept as an object created after establishing client connection. Next, new instance of HvVesCollector (and its dependencies) is created. Every object is propagated with ClientContext so it can use it when calling logger methods. In the future ClientContext might be used to support other use-cases, ie. per-topic access control. As a by-product I had to refactor our Logger wrapper, too. It already had too many functions and after adding MDC number would be doubled. Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe Issue-ID: DCAEGEN2-671 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt43
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt12
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt60
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt21
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt58
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt10
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt4
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt3
16 files changed, 191 insertions, 110 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index dd0111bc..b686b250 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
@@ -35,12 +36,12 @@ interface Metrics {
@FunctionalInterface
interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration): Sink
+ operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
companion object {
fun just(sink: Sink): SinkProvider =
object : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink = sink
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
}
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 3c85a9b1..5584d61d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -23,15 +23,17 @@ import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import java.util.*
interface Collector {
- fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
+ fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = () -> Option<Collector>
+typealias CollectorProvider = (ClientContext) -> Option<Collector>
interface Server {
fun start(): IO<ServerHandle>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 5c96e1c5..2008fc35 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
+ val config: AtomicReference<CollectorConfiguration> = AtomicReference()
configuration()
- .map(this::createVesHvCollector)
.doOnNext {
- logger.info("Using updated configuration for new connections")
+ logger.info { "Using updated configuration for new connections" }
healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error("Failed to acquire configuration from consul")
+ logger.error { "Failed to acquire configuration from consul" }
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
- .subscribe(collector::set)
- return collector::getOption
+ .subscribe(config::set)
+ return { ctx: ClientContext ->
+ config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ }
}
- private fun createVesHvCollector(config: CollectorConfiguration): Collector {
- return VesHvCollector(
- wireChunkDecoderSupplier = { alloc ->
- WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
- },
- protobufDecoder = VesDecoder(),
- router = Router(config.routing),
- sink = sinkProvider(config),
- metrics = metrics)
- }
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(config, ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index cee658b6..0977595a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -20,11 +20,22 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.Routing
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
-class Router(private val routing: Routing) {
+class Router(private val routing: Routing, private val ctx: ClientContext) {
fun findDestination(message: VesMessage): Option<RoutedMessage> =
- routing.routeFor(message.header).map { it(message) }
+ routing.routeFor(message.header).map { it(message) }.also {
+ if (it.isEmpty()) {
+ logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
+ }
+ }
+
+ companion object {
+ private val logger = Logger(Routing::class)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 4176de99..0d07504d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Either
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -42,28 +42,27 @@ import reactor.core.publisher.Mono
* @since May 2018
*/
internal class VesHvCollector(
- private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+ private val clientContext: ClientContext,
+ private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val router: Router,
private val sink: Sink,
private val metrics: Metrics) : Collector {
- override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
- wireChunkDecoderSupplier(alloc).let { wireDecoder ->
- dataStream
- .transform { decodeWireFrame(it, wireDecoder) }
- .transform(::filterInvalidWireFrame)
- .transform(::decodeProtobufPayload)
- .transform(::filterInvalidProtobufMessages)
- .transform(::routeMessage)
- .onErrorResume { logger.handleReactiveStreamError(it) }
- .doFinally { releaseBuffersMemory(wireDecoder) }
- .then()
- }
+ override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+ dataStream
+ .transform { decodeWireFrame(it) }
+ .transform(::filterInvalidWireFrame)
+ .transform(::decodeProtobufPayload)
+ .transform(::filterInvalidProtobufMessages)
+ .transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+ .doFinally { releaseBuffersMemory() }
+ .then()
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(decoder::decode)
+ .concatMap(wireChunkDecoder::decode)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
@@ -75,7 +74,7 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
- .filterFailedWithLog(logger,
+ .filterFailedWithLog(logger, clientContext::asMap,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
@@ -89,15 +88,15 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
- .filterEmptyWithLog(logger,
+ .filterEmptyWithLog(logger, clientContext::asMap,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- .also { logger.debug("Released buffer memory after handling message stream") }
+ private fun releaseBuffersMemory() = wireChunkDecoder.release()
+ .also { logger.debug { "Released buffer memory after handling message stream" } }
fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
- filterFailedWithLog(logger, predicate)
+ filterFailedWithLog(logger, clientContext::asMap, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index cea8a7ee..bbaa47c4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.warn("Could not get fresh configuration", it.exception())
+ logger.withWarn { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index bdce6f73..3fefc6e8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import io.netty.handler.codec.http.HttpStatusClass
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
@@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient
*/
open class HttpAdapter(private val httpClient: HttpClient) {
- private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
-
open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
.get()
.uri(url + createQueryString(queryParams))
@@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
}
.doOnError {
- logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
- logger.debug("Nested exception:", it)
+ logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+ logger.withDebug { log("Nested exception:", it) }
}
private fun createQueryString(params: Map<String, Any>): String {
@@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) {
return builder.removeSuffix("&").toString()
}
+ companion object {
+
+
+ private val logger = Logger(HttpAdapter::class)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 5f4bf354..f6cb018f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
@@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider {
val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
if (msgs % INFO_LOGGING_FREQ == 0L)
- logger.info(logMessageSupplier)
+ logger.info(ctx, logMessageSupplier)
else
- logger.trace(logMessageSupplier)
+ logger.trace(ctx, logMessageSupplier)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index c4d6c87e..fd08ba3d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,6 +20,9 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
if (it.isSuccessful()) {
Mono.just(it)
} else {
- logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
Mono.empty<SenderResult<RoutedMessage>>()
}
}
.map { it.correlationMetadata() }
- return if (logger.traceEnabled) {
- result.doOnNext(::logSentMessage)
- } else {
- result
- }
+ return result.doOnNext(::logSentMessage)
}
private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace {
+ logger.trace(ctx) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 18191952..b4f470d4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.apache.kafka.clients.producer.ProducerConfig
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
@@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions
* @since June 2018
*/
internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
}
private fun constructSenderOptions(config: CollectorConfiguration) =
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 0b2997fa..2d29fe99 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -23,6 +23,10 @@ import arrow.core.getOrElse
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
sslContextFactory
.createSslContext(serverConfig.securityConfiguration)
.map { sslContext ->
- logger.info("Collector configured with SSL enabled")
+ logger.info { "Collector configured with SSL enabled" }
this.secure { b -> b.sslContext(sslContext) }
}.getOrElse {
- logger.info("Collector configured with SSL disabled")
+ logger.info { "Collector configured with SSL disabled" }
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- collectorProvider().fold(
- {
- nettyInbound.withConnection { conn ->
- logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
- }
- Mono.empty()
- },
- {
- nettyInbound.withConnection { conn ->
- logger.info { "Handling connection from ${conn.address()}" }
- conn.configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- }
- it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ val clientContext = ClientContext(nettyOutbound.alloc())
+ nettyInbound.withConnection {
+ clientContext.clientAddress = it.address()
+ }
+
+ return collectorProvider(clientContext).fold(
+ {
+ logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling new connection" }
+ nettyInbound.withConnection { conn ->
+ conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
}
- )
+ it.handleConnection(createDataStream(nettyInbound))
+ }
+ )
+ }
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
- logger.info {
+ logger.info(ctx) {
"Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
- disconnectClient()
+ disconnectClient(ctx)
}
return this
}
- private fun Connection.disconnectClient() {
+ private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${address()}) closed successfully." }
+ logger.debug(ctx) { "Channel closed successfully." }
else
- logger.warn("Channel close failed", it.cause())
+ logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
}
}
- private fun Connection.logConnectionClosed(): Connection {
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
- logger.info("Connection from ${address()} has been closed")
+ logger.info(ctx) { "Connection has been closed" }
}
return this
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 4a2ef6b2..349b0787 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire
import arrow.effects.IO
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
@@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink
*/
internal class WireChunkDecoder(
private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- private val streamBuffer = alloc.compositeBuffer()
+ private val ctx: ClientContext) {
+ private val streamBuffer = ctx.alloc.compositeBuffer()
fun release() {
streamBuffer.release()
@@ -53,7 +54,7 @@ internal class WireChunkDecoder(
} else {
streamBuffer.addComponent(true, byteBuf)
generateFrames()
- .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
.doFinally { streamBuffer.discardReadComponents() }
}
}
@@ -84,15 +85,15 @@ internal class WireChunkDecoder(
}
private fun logIncomingMessage(wire: ByteBuf) {
- logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+ logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" }
}
private fun logDecodedWireMessage(wire: WireFrameMessage) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" }
}
private fun logEndOfData() {
- logger.trace { "End of data in current TCP buffer" }
+ logger.trace(ctx) { "End of data in current TCP buffer" }
}
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
new file mode 100644
index 00000000..f14a7f65
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.slf4j.MDC
+import java.net.InetSocketAddress
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class ClientContext(
+ val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
+ val clientId: String = UUID.randomUUID().toString(),
+ var clientAddress: InetSocketAddress? = null) {
+ fun asMap(): Map<String, String> {
+ val result = mutableMapOf("clientId" to clientId)
+ if (clientAddress != null) {
+ result["clientAddress"] = clientAddress.toString()
+ }
+ return result
+ }
+}
+
+object ClientContextLogging {
+ fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
+ fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
+ fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
+ fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
+ fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+
+ fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
+ fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
+ fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
+ fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
+ fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index 437614ac..ad97a3f7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
- if (it.isEmpty()) {
- logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
- }
- }
-
- companion object {
- private val logger = Logger(Routing::class)
- }
+ Option.fromNullable(routes.find { it.applies(commonHeader) })
}
data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index e8a31231..e4190163 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
import arrow.core.Some
+import io.netty.buffer.ByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
@@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
@@ -56,7 +58,7 @@ object RouterTest : Spek({
withFixedPartitioning()
}
}.build()
- val cut = Router(config)
+ val cut = Router(config, ClientContext())
on("message with existing route (rtpm)") {
val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index f06a0dc7..e0092cf9 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
import reactor.test.test
/**
@@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
- fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc))
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {