diff options
Diffstat (limited to 'sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt')
-rw-r--r-- | sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt | 68 |
1 files changed, 40 insertions, 28 deletions
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt index d5fe588e..94eb519d 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.config.impl +import arrow.core.Some import com.google.gson.JsonParser import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq @@ -51,9 +52,9 @@ internal object CbsConfigurationProviderTest : Spek({ describe("Configuration provider") { - val cbsClient: CbsClient = mock() - val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient) - val configStateListener: ConfigurationStateListener = mock() + val cbsClient = mock<CbsClient>() + val cbsClientMock = Mono.just(cbsClient) + val configStateListener = mock<ConfigurationStateListener>() given("configuration is never in cbs") { val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) @@ -78,29 +79,32 @@ internal object CbsConfigurationProviderTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val routes = it.routing.orNull()!! - val route1 = routes.elementAt(0) - val route2 = routes.elementAt(1) - val receivedSink1 = route1.sink - val receivedSink2 = route2.sink - - assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) - assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) - assertThat(receivedSink1.bootstrapServers()) + + assertThat(it.listenPort).isEqualTo(Some(6061)) + assertThat(it.idleTimeoutSec).isEqualTo(Some(60L)) + assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576)) + + + val sinks = it.streamPublishers.orNull()!! + val sink1 = sinks[0] + val sink2 = sinks[1] + + assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL) + assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(sink1.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") - assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") - assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) - assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) - assertThat(receivedSink2.bootstrapServers()) + assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL) + assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(sink2.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") - assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") - + assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") }.verifyComplete() } } - } + given("invalid configuration from cbs") { val iterationCount = 3L val configProvider = constructConfigurationProvider( @@ -112,7 +116,8 @@ internal object CbsConfigurationProviderTest : Spek({ .thenReturn(Flux.just(invalidConfiguration)) it("should interrupt the flux") { - StepVerifier.create(configProvider()) + StepVerifier + .create(configProvider()) .verifyError() } @@ -126,8 +131,8 @@ internal object CbsConfigurationProviderTest : Spek({ }) -val PERF3GPP_REGIONAL = "perf3gpp_regional" -val PERF3GPP_CENTRAL = "perf3gpp_central" +private const val PERF3GPP_REGIONAL = "perf3gpp_regional" +private const val PERF3GPP_CENTRAL = "perf3gpp_central" private val aafCredentials1 = ImmutableAafCredentials.builder() .username("client") @@ -141,6 +146,9 @@ private val aafCredentials2 = ImmutableAafCredentials.builder() private val validConfiguration = JsonParser().parse(""" { + "server.listenPort": 6061, + "server.idleTimeoutSec": 60, + "server.maxPayloadSizeBytes": 1048576, "streams_publishes": { "$PERF3GPP_REGIONAL": { "type": "kafka", @@ -173,12 +181,12 @@ private val invalidConfiguration = JsonParser().parse(""" "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { - "username": "client", + "user": "client", "password": "very secure password" }, "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "popic_name": "REG_HVVES_PERF3GPP" + "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "name": "REG_HVVES_PERF3GPP" } } } @@ -187,20 +195,24 @@ private val invalidConfiguration = JsonParser().parse(""" private val firstRequestDelay = Duration.ofMillis(1) private val requestInterval = Duration.ofMillis(1) private val streamParser = StreamFromGsonParsers.kafkaSinkParser() +private val configParser = JsonConfigurationParser() private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, configurationStateListener: ConfigurationStateListener, iterationCount: Long = 1 ): CbsConfigurationProvider { - val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + val retry = Retry + .onlyIf<Any> { it.iteration() <= iterationCount } + .fixedBackoff(Duration.ofNanos(1)) return CbsConfigurationProvider( cbsClientMono, CbsConfiguration(firstRequestDelay, requestInterval), + configParser, streamParser, configurationStateListener, - retry, - { mapOf("k" to "v") } + { mapOf("k" to "v") }, + retry ) } |