aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt85
1 files changed, 42 insertions, 43 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 1ad2b0e3..213eff27 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,11 +20,12 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
@@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException
const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-
-val basicRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val twoDomainsToOneTopicRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(HEARTBEAT.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(MEASUREMENT.domainName)
- toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val configurationWithDifferentRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(ALTERNATE_PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val emptyRouting = routing { }.build()
+const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
+
+val configWithBasicRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
+
+val configWithTwoDomainsToOneTopicRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build(),
+ ImmutableKafkaSink.builder()
+ .name(HEARTBEAT.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build(),
+ ImmutableKafkaSink.builder()
+ .name(MEASUREMENT.domainName)
+ .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
+
+val configWithDifferentRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+ )
+
+val configWithEmptyRouting = emptySequence<KafkaSink>()
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
- fun updateConfiguration(routing: Routing) =
+ fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(routing)
+ configStream.onNext(kafkaSinkSequence)
}