aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt')
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt68
1 files changed, 40 insertions, 28 deletions
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
index d5fe588e..94eb519d 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
+import arrow.core.Some
import com.google.gson.JsonParser
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
@@ -51,9 +52,9 @@ internal object CbsConfigurationProviderTest : Spek({
describe("Configuration provider") {
- val cbsClient: CbsClient = mock()
- val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
- val configStateListener: ConfigurationStateListener = mock()
+ val cbsClient = mock<CbsClient>()
+ val cbsClientMock = Mono.just(cbsClient)
+ val configStateListener = mock<ConfigurationStateListener>()
given("configuration is never in cbs") {
val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
@@ -78,29 +79,32 @@ internal object CbsConfigurationProviderTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val routes = it.routing.orNull()!!
- val route1 = routes.elementAt(0)
- val route2 = routes.elementAt(1)
- val receivedSink1 = route1.sink
- val receivedSink2 = route2.sink
-
- assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
- assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
- assertThat(receivedSink1.bootstrapServers())
+
+ assertThat(it.listenPort).isEqualTo(Some(6061))
+ assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
+ assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
+
+
+ val sinks = it.streamPublishers.orNull()!!
+ val sink1 = sinks[0]
+ val sink2 = sinks[1]
+
+ assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
+ assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
+ assertThat(sink1.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
- assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+ assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
- assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
- assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
- assertThat(receivedSink2.bootstrapServers())
+ assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
+ assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
+ assertThat(sink2.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
- assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
-
+ assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
}.verifyComplete()
}
}
-
}
+
given("invalid configuration from cbs") {
val iterationCount = 3L
val configProvider = constructConfigurationProvider(
@@ -112,7 +116,8 @@ internal object CbsConfigurationProviderTest : Spek({
.thenReturn(Flux.just(invalidConfiguration))
it("should interrupt the flux") {
- StepVerifier.create(configProvider())
+ StepVerifier
+ .create(configProvider())
.verifyError()
}
@@ -126,8 +131,8 @@ internal object CbsConfigurationProviderTest : Spek({
})
-val PERF3GPP_REGIONAL = "perf3gpp_regional"
-val PERF3GPP_CENTRAL = "perf3gpp_central"
+private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
+private const val PERF3GPP_CENTRAL = "perf3gpp_central"
private val aafCredentials1 = ImmutableAafCredentials.builder()
.username("client")
@@ -141,6 +146,9 @@ private val aafCredentials2 = ImmutableAafCredentials.builder()
private val validConfiguration = JsonParser().parse("""
{
+ "server.listenPort": 6061,
+ "server.idleTimeoutSec": 60,
+ "server.maxPayloadSizeBytes": 1048576,
"streams_publishes": {
"$PERF3GPP_REGIONAL": {
"type": "kafka",
@@ -173,12 +181,12 @@ private val invalidConfiguration = JsonParser().parse("""
"$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
- "username": "client",
+ "user": "client",
"password": "very secure password"
},
"kafka_info": {
- "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
- "popic_name": "REG_HVVES_PERF3GPP"
+ "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "name": "REG_HVVES_PERF3GPP"
}
}
}
@@ -187,20 +195,24 @@ private val invalidConfiguration = JsonParser().parse("""
private val firstRequestDelay = Duration.ofMillis(1)
private val requestInterval = Duration.ofMillis(1)
private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
+private val configParser = JsonConfigurationParser()
private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
configurationStateListener: ConfigurationStateListener,
iterationCount: Long = 1
): CbsConfigurationProvider {
- val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+ val retry = Retry
+ .onlyIf<Any> { it.iteration() <= iterationCount }
+ .fixedBackoff(Duration.ofNanos(1))
return CbsConfigurationProvider(
cbsClientMono,
CbsConfiguration(firstRequestDelay, requestInterval),
+ configParser,
streamParser,
configurationStateListener,
- retry,
- { mapOf("k" to "v") }
+ { mapOf("k" to "v") },
+ retry
)
}