aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-03-21 14:03:53 +0100
committerkjaniak <kornel.janiak@nokia.com>2019-03-26 13:05:51 +0100
commit7e77162022371860d13939be1848982a735cdab9 (patch)
tree6603c70af9029701294fd3783c9df1a1016b7c8f /sources/hv-collector-core/src/test
parentc979ea3bd6059cb067a84ba8e6f8d1cf96d61ba2 (diff)
Use DataStream API from CBS client
Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb Issue-ID: DCAEGEN2-1346 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt94
1 files changed, 59 insertions, 35 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index f830f2c9..e71250ca 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials
import reactor.core.publisher.Flux
-
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
-
- val route1 = it.routes[0]
- assertThat(FAULT.domainName)
- .describedAs("routed domain 1")
- .isEqualTo(route1.domain)
- assertThat("test-topic-1")
- .describedAs("target topic 1")
- .isEqualTo(route1.targetTopic)
-
- val route2 = it.routes[1]
- assertThat(HEARTBEAT.domainName)
- .describedAs("routed domain 2")
- .isEqualTo(route2.domain)
- assertThat("test-topic-2")
- .describedAs("target topic 2")
- .isEqualTo(route2.targetTopic)
-
+ val receivedSink1 = it.elementAt(0)
+ val receivedSink2 = it.elementAt(1)
+
+ assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
+ assertThat(receivedSink1.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
+ assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+
+ assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
+ assertThat(receivedSink2.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
+ assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
}.verifyComplete()
}
}
@@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({
})
+private val aafCredentials1 = ImmutableAafCredentials.builder()
+ .username("client")
+ .password("very secure password")
+ .build()
+
+private val aafCredentials2 = ImmutableAafCredentials.builder()
+ .username("other_client")
+ .password("another very secure password")
+ .build()
private val validConfiguration = JsonParser().parse("""
{
- "whatever": "garbage",
- "collector.routing": [
- {
- "fromDomain": "fault",
- "toTopic": "test-topic-1"
+ "streams_publishes": {
+ "perf3gpp_regional": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "topic_name": "REG_HVVES_PERF3GPP"
+ }
+ },
+ "perf3gpp_central": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "other_client",
+ "password": "another very secure password"
},
- {
- "fromDomain": "heartbeat",
- "toTopic": "test-topic-2"
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
+ "topic_name": "CEN_HVVES_PERF3GPP"
}
- ]
+ }
+ }
}""").asJsonObject
private val invalidConfiguration = JsonParser().parse("""
{
- "whatever": "garbage",
- "collector.routing": [
- {
- "fromDomain": "garbage",
- "meaningful": "garbage"
+ "streams_publishes": {
+ "perf3gpp_regional": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "popic_name": "REG_HVVES_PERF3GPP"
}
- ]
+ }
+ }
}""").asJsonObject
private val firstRequestDelay = Duration.ofMillis(1)
private val requestInterval = Duration.ofMillis(1)
+private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
healthState: HealthState,
@@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
firstRequestDelay,
requestInterval,
healthState,
+ streamParser,
retry
)
}