From 7e77162022371860d13939be1848982a735cdab9 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Thu, 21 Mar 2019 14:03:53 +0100 Subject: Use DataStream API from CBS client Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb Issue-ID: DCAEGEN2-1346 Signed-off-by: kjaniak --- .../veshv/tests/component/MetricsSpecification.kt | 14 ++-- .../tests/component/PerformanceSpecification.kt | 6 +- .../dcae/collectors/veshv/tests/component/Sut.kt | 16 ++-- .../veshv/tests/component/VesHvSpecification.kt | 26 +++---- .../collectors/veshv/tests/fakes/configuration.kt | 85 +++++++++++----------- 5 files changed, 74 insertions(+), 73 deletions(-) (limited to 'sources/hv-collector-ct/src/test/kotlin') diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index bd056d4d..a6b32ed9 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(basicRouting) + val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index ece42285..50fe098c 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e84e9486..da9290d3 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -38,7 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -101,17 +103,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence = configWithBasicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } -fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = +fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence = configWithBasicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } -fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence = configWithBasicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 17f6ce32..21c5c189 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -152,7 +152,7 @@ object VesHvSpecification : Spek({ it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -205,7 +205,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,12 +213,12 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(emptyRouting) + sut.configurationProvider.updateConfiguration(configWithEmptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) @@ -242,7 +242,7 @@ object VesHvSpecification : Spek({ .isEqualTo(0) - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) @@ -261,7 +261,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) println("config changed") } } @@ -320,7 +320,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +349,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 1ad2b0e3..213eff27 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" - -val basicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val twoDomainsToOneTopicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } -}.build() - - -val configurationWithDifferentRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val emptyRouting = routing { }.build() +const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" + +val configWithBasicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithTwoDomainsToOneTopicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(HEARTBEAT.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(MEASUREMENT.domainName) + .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithDifferentRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() + ) + +val configWithEmptyRouting = emptySequence() class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor = UnicastProcessor.create() + private val configStream: FluxProcessor, Sequence> = UnicastProcessor.create() - fun updateConfiguration(routing: Routing) = + fun updateConfiguration(kafkaSinkSequence: Sequence) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(routing) + configStream.onNext(kafkaSinkSequence) } -- cgit 1.2.3-korg