diff options
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes')
-rw-r--r-- | sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt | 85 |
1 files changed, 42 insertions, 43 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 1ad2b0e3..213eff27 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" - -val basicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val twoDomainsToOneTopicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } -}.build() - - -val configurationWithDifferentRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val emptyRouting = routing { }.build() +const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" + +val configWithBasicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithTwoDomainsToOneTopicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(HEARTBEAT.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(MEASUREMENT.domainName) + .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithDifferentRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() + ) + +val configWithEmptyRouting = emptySequence<KafkaSink>() class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() + private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create() - fun updateConfiguration(routing: Routing) = + fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(routing) + configStream.onNext(kafkaSinkSequence) } |