diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-18 15:58:56 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-20 14:57:25 +0100 |
commit | 4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch) | |
tree | cff4cf2428a288b7b86830f282b81d41a41ad250 /sources/hv-collector-ct/src | |
parent | 4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff) |
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However,
as the configuration might be changed (Consul update) it cannot be a
strict singleton. Hence there should be 1to1 relationship beetween
ConsulConfiguration and KafkaSender.
Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668
Issue-ID: DCAEGEN2-1047
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
4 files changed, 46 insertions, 22 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index f457aeaf..aaa3ee3b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -31,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC @@ -50,7 +51,7 @@ object MetricsSpecification : Spek({ describe("Bytes received metrics") { it("should sum up all bytes received") { - val sut = vesHvWithNoOpSink() + val sut = vesHvWithAlwaysSuccessfulSink() val vesWireFrameMessage = vesWireFrameMessage() val invalidWireFrame = messageWithInvalidWireFrameHeader() @@ -70,7 +71,7 @@ object MetricsSpecification : Spek({ describe("Messages received metrics") { it("should sum up all received messages bytes") { - val sut = vesHvWithNoOpSink() + val sut = vesHvWithAlwaysSuccessfulSink() val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10))) val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40))) val firstVesMessage = vesWireFrameMessage(firstVesEvent) @@ -91,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -129,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithNoOpSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -145,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithNoOpSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -158,8 +159,19 @@ object MetricsSpecification : Spek({ .isEqualTo(1) } + it("should gather metrics for sing errors") { + val sut = vesHvWithAlwaysFailingSink(basicConfiguration) + + sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) + + val metrics = sut.metrics + assertThat(metrics.messagesDropped(KAFKA_FAILURE)) + .describedAs("messagesDroppedCause $KAFKA_FAILURE metric") + .isEqualTo(1) + } + it("should gather summed metrics for dropped messages") { - val sut = vesHvWithNoOpSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -183,7 +195,7 @@ object MetricsSpecification : Spek({ ).forEach { cause, vesMessage -> on("cause $cause") { it("should notify correct metrics") { - val sut = vesHvWithNoOpSink() + val sut = vesHvWithAlwaysSuccessfulSink() sut.handleConnection(vesMessage) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 7ebbfba0..c3e4a581 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -73,12 +73,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = - Sut(NoOpSink()).apply { +fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = + Sut(AlwaysSuccessfulSink()).apply { + configurationProvider.updateConfiguration(collectorConfiguration) + } + +fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = + Sut(AlwaysFailingSink()).apply { configurationProvider.updateConfiguration(collectorConfiguration) } fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = - Sut(ProcessingSink { it.delayElements(delay) }).apply { + Sut(DelayingSink(delay)).apply { configurationProvider.updateConfiguration(collectorConfiguration) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 3770913a..db56e88c 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -36,7 +36,6 @@ const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { fromDomain(PERF3GPP.domainName) @@ -47,7 +46,6 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration( ) val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { fromDomain(PERF3GPP.domainName) @@ -69,7 +67,6 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { fromDomain(PERF3GPP.domainName) @@ -81,7 +78,6 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { }.build() ) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 2f731f53..b4ce6499 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -21,13 +21,17 @@ package org.onap.dcae.collectors.veshv.tests.fakes import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicLong -import java.util.function.Function /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -39,8 +43,8 @@ class StoringSink : Sink { val sentMessages: List<RoutedMessage> get() = sent.toList() - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - return messages.doOnNext(sent::addLast) + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { + return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } } @@ -54,16 +58,23 @@ class CountingSink : Sink { val count: Long get() = atomicCount.get() - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { return messages.doOnNext { atomicCount.incrementAndGet() - } + }.map(::SuccessfullyConsumedMessage) } } -open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink { - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer) +open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink { + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = + messages.transform(transformer) } -class NoOpSink : ProcessingSink(::identity) +class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) }) + +class AlwaysFailingSink : ProcessingSink({ stream -> + stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) } +}) + +class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) }) |