summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-07 14:41:39 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-10 14:46:23 +0100
commit8b8c37c296e55644063e0332fd455437168e78da (patch)
tree36e9d96217346dd4296677cfd8af584c69a0ad05 /sources/hv-collector-ct
parent73293332b2244b66083dc5d3910801c1b1058105 (diff)
Add log diagnostic context
As it's not trivial to use MDC directly from logging framework in reactive application, we need to do some work manually. The approach proposed is an explicit MDC handling, which means that context is kept as an object created after establishing client connection. Next, new instance of HvVesCollector (and its dependencies) is created. Every object is propagated with ClientContext so it can use it when calling logger methods. In the future ClientContext might be used to support other use-cases, ie. per-topic access control. As a by-product I had to refactor our Logger wrapper, too. It already had too many functions and after adding MDC number would be doubled. Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe Issue-ID: DCAEGEN2-671 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt10
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt5
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt2
3 files changed, 9 insertions, 8 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 0897e910..ef4ce967 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({
)
val fluxes = (1.rangeTo(runs)).map {
- sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+ sut.collector.handleConnection(generateDataStream(sut.alloc, params))
}
val durationMs = measureTimeMillis {
Flux.merge(fluxes).then().block(timeout)
@@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
- logger.info("Processed $runs connections each containing $numMessages msgs.")
- logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ logger.info { "Processed $runs connections each containing $numMessages msgs." }
+ logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
@@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({
val dataStream = generateDataStream(sut.alloc, params)
.transform(::dropWhenIndex.partially1 { it % 101 == 0L })
- sut.collector.handleConnection(sut.alloc, dataStream)
+ sut.collector.handleConnection(dataStream)
.timeout(timeout)
.block()
- logger.info("Forwarded ${sink.count} msgs")
+ logger.info { "Forwarded ${sink.count} msgs" }
assertThat(sink.count)
.describedAs("should send up to number of events")
.isLessThan(numMessages)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 0495ced5..ce242e0b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
@@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) {
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+ get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") }
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
@@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) {
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
return sink.sentMessages
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2d81c671..ab59cc2e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
.map { vesWireFrameMessage(PERF3GPP) }
- sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+ sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }