diff options
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt')
-rw-r--r-- | sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt | 94 |
1 files changed, 59 insertions, 35 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index f830f2c9..e71250ca 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials import reactor.core.publisher.Flux - import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - - val route1 = it.routes[0] - assertThat(FAULT.domainName) - .describedAs("routed domain 1") - .isEqualTo(route1.domain) - assertThat("test-topic-1") - .describedAs("target topic 1") - .isEqualTo(route1.targetTopic) - - val route2 = it.routes[1] - assertThat(HEARTBEAT.domainName) - .describedAs("routed domain 2") - .isEqualTo(route2.domain) - assertThat("test-topic-2") - .describedAs("target topic 2") - .isEqualTo(route2.targetTopic) - + val receivedSink1 = it.elementAt(0) + val receivedSink2 = it.elementAt(1) + + assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(receivedSink1.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") + assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + + assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(receivedSink2.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") + assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") }.verifyComplete() } } @@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({ }) +private val aafCredentials1 = ImmutableAafCredentials.builder() + .username("client") + .password("very secure password") + .build() + +private val aafCredentials2 = ImmutableAafCredentials.builder() + .username("other_client") + .password("another very secure password") + .build() private val validConfiguration = JsonParser().parse(""" { - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "fault", - "toTopic": "test-topic-1" + "streams_publishes": { + "perf3gpp_regional": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "topic_name": "REG_HVVES_PERF3GPP" + } + }, + "perf3gpp_central": { + "type": "kafka", + "aaf_credentials": { + "username": "other_client", + "password": "another very secure password" }, - { - "fromDomain": "heartbeat", - "toTopic": "test-topic-2" + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", + "topic_name": "CEN_HVVES_PERF3GPP" } - ] + } + } }""").asJsonObject private val invalidConfiguration = JsonParser().parse(""" { - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "garbage", - "meaningful": "garbage" + "streams_publishes": { + "perf3gpp_regional": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "popic_name": "REG_HVVES_PERF3GPP" } - ] + } + } }""").asJsonObject private val firstRequestDelay = Duration.ofMillis(1) private val requestInterval = Duration.ofMillis(1) +private val streamParser = StreamFromGsonParsers.kafkaSinkParser() private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, healthState: HealthState, @@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, firstRequestDelay, requestInterval, healthState, + streamParser, retry ) } |