diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-07 14:41:39 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-10 14:46:23 +0100 |
commit | 8b8c37c296e55644063e0332fd455437168e78da (patch) | |
tree | 36e9d96217346dd4296677cfd8af584c69a0ad05 /sources/hv-collector-ct | |
parent | 73293332b2244b66083dc5d3910801c1b1058105 (diff) |
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in
reactive application, we need to do some work manually. The approach
proposed is an explicit MDC handling, which means that context is
kept as an object created after establishing client connection. Next,
new instance of HvVesCollector (and its dependencies) is created. Every
object is propagated with ClientContext so it can use it when calling
logger methods.
In the future ClientContext might be used to support other use-cases,
ie. per-topic access control.
As a by-product I had to refactor our Logger wrapper, too. It already
had too many functions and after adding MDC number would be doubled.
Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe
Issue-ID: DCAEGEN2-671
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct')
3 files changed, 9 insertions, 8 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 0897e910..ef4ce967 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({ ) val fluxes = (1.rangeTo(runs)).map { - sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + sut.collector.handleConnection(generateDataStream(sut.alloc, params)) } val durationMs = measureTimeMillis { Flux.merge(fluxes).then().block(timeout) @@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec - logger.info("Processed $runs connections each containing $numMessages msgs.") - logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + logger.info { "Processed $runs connections each containing $numMessages msgs." } + logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({ val dataStream = generateDataStream(sut.alloc, params) .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) - sut.collector.handleConnection(sut.alloc, dataStream) + sut.collector.handleConnection(dataStream) .timeout(timeout) .block() - logger.info("Forwarded ${sink.count} msgs") + logger.info { "Forwarded ${sink.count} msgs" } assertThat(sink.count) .describedAs("should send up to number of events") .isLessThan(numMessages) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 0495ced5..ce242e0b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState @@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) { private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } + get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") } companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 @@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) { } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) return sink.sentMessages } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2d81c671..ab59cc2e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ .map { vesWireFrameMessage(PERF3GPP) } - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } |