diff options
author | kjaniak <kornel.janiak@nokia.com> | 2019-03-21 14:03:53 +0100 |
---|---|---|
committer | kjaniak <kornel.janiak@nokia.com> | 2019-03-26 13:05:51 +0100 |
commit | 7e77162022371860d13939be1848982a735cdab9 (patch) | |
tree | 6603c70af9029701294fd3783c9df1a1016b7c8f /sources/hv-collector-ct | |
parent | c979ea3bd6059cb067a84ba8e6f8d1cf96d61ba2 (diff) |
Use DataStream API from CBS client
Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb
Issue-ID: DCAEGEN2-1346
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct')
5 files changed, 74 insertions, 73 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index bd056d4d..a6b32ed9 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(basicRouting) + val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index ece42285..50fe098c 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e84e9486..da9290d3 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -38,7 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -101,17 +103,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } -fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = +fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } -fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 17f6ce32..21c5c189 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -152,7 +152,7 @@ object VesHvSpecification : Spek({ it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -205,7 +205,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,12 +213,12 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(emptyRouting) + sut.configurationProvider.updateConfiguration(configWithEmptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) @@ -242,7 +242,7 @@ object VesHvSpecification : Spek({ .isEqualTo(0) - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) @@ -261,7 +261,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) println("config changed") } } @@ -320,7 +320,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +349,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 1ad2b0e3..213eff27 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" - -val basicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val twoDomainsToOneTopicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } -}.build() - - -val configurationWithDifferentRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val emptyRouting = routing { }.build() +const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" + +val configWithBasicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithTwoDomainsToOneTopicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(HEARTBEAT.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(MEASUREMENT.domainName) + .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithDifferentRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() + ) + +val configWithEmptyRouting = emptySequence<KafkaSink>() class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() + private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create() - fun updateConfiguration(routing: Routing) = + fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(routing) + configStream.onNext(kafkaSinkSequence) } |