aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-29 11:22:24 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-01 12:32:42 +0200
commit6725abbaa6249e107126ffd5ec58f2a96ce60eee (patch)
treef3fa6d11a04b60a631ee4160a69744b44e08e1ed /sources/hv-collector-configuration
parent4281a12d8e892f46f5f2226ee0f8aee8b862b177 (diff)
Move ConfigurationProvider to config module
Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-configuration')
-rw-r--r--sources/hv-collector-configuration/pom.xml5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt32
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt24
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt)2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt)0
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt)0
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt119
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt)2
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt206
11 files changed, 384 insertions, 15 deletions
diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml
index 792b9eaa..b6ec4ca2 100644
--- a/sources/hv-collector-configuration/pom.xml
+++ b/sources/hv-collector-configuration/pom.xml
@@ -77,7 +77,10 @@
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>cbs-client</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index dd1b171e..9684484b 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
-import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
+import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
class ConfigurationModule {
@@ -34,16 +40,28 @@ class ConfigurationModule {
private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
- private lateinit var initialConfig: HvVesConfiguration
-
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
- fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
+ fun hvVesConfigurationUpdates(args: Array<String>,
+ configStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Flux.just(cmd.getConfigurationFile(args))
.throwOnLeft { MissingArgumentException(it.message, it.cause) }
.map { it.reader().use(configReader::loadConfig) }
- .map { configValidator.validate(it) }
- .throwOnLeft { ValidationException(it.message) }
- .doOnNext { initialConfig = it }
+ .cache()
+ .flatMap { basePartialConfig ->
+ val baseConfig = configValidator.validate(basePartialConfig)
+ .rightOrThrow { ValidationException(it.message) }
+ val cbsConfigProvider = CbsConfigurationProvider(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ baseConfig.cbs,
+ configStateListener,
+ mdc)
+ val merger = ConfigurationMerger()
+ cbsConfigProvider()
+ .map { merger.merge(basePartialConfig, it) }
+ .map { configValidator.validate(it) }
+ .throwOnLeft { ValidationException(it.message) }
+ }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt
new file mode 100644
index 00000000..9fa6660e
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt
@@ -0,0 +1,24 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+interface ConfigurationStateListener {
+ fun retrying() {}
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index 3375821e..c1807be2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -47,7 +47,5 @@ data class CbsConfiguration(
)
data class CollectorConfiguration(
- val maxRequestSizeBytes: Int,
- val kafkaServers: String,
val routing: Routing
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
index 2fc29829..2fc29829 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
index e5a83ac4..e5a83ac4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
new file mode 100644
index 00000000..2038c31a
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -0,0 +1,119 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
+ private val cbsConfiguration: CbsConfiguration,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
+ private val configurationStateListener: ConfigurationStateListener,
+ retrySpec: Retry<Any>,
+ private val mdc: MappedDiagnosticContext
+
+) {
+ constructor(cbsClientMono: Mono<CbsClient>,
+ cbsConfig: CbsConfiguration,
+ configurationStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext) :
+ this(
+ cbsClientMono,
+ cbsConfig,
+ StreamFromGsonParsers.kafkaSinkParser(),
+ configurationStateListener,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(cbsConfig.requestInterval)
+ .jitter(Jitter.random()),
+ mdc
+ )
+
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception from configuration provider client, retrying subscription", it.exception())
+ }
+ configurationStateListener.retrying()
+ }
+
+ operator fun invoke(): Flux<PartialConfiguration> =
+ cbsClientMono
+ .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry)
+ .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
+ .flatMapMany(::handleUpdates)
+
+ private fun handleUpdates(cbsClient: CbsClient) = cbsClient
+ .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
+ cbsConfiguration.firstRequestDelay,
+ cbsConfiguration.requestInterval)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+ .map(::createRoutingDescription)
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
+ .retryWhen(retry)
+ .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) }
+
+ private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
+ val routes = DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .map { Route(it.name(), it) }
+ .asIterable()
+ .toList()
+ Some(routes)
+ } catch (e: NullPointerException) {
+ logger.withWarn(mdc) {
+ log("Invalid streams configuration", e)
+ }
+ None
+ }
+
+ companion object {
+ private const val MAX_RETRIES = 5L
+ private val logger = Logger(CbsConfigurationProvider::class)
+ }
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 04bba7e2..3e599b58 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -63,9 +63,7 @@ internal class ConfigurationValidator {
securityConfiguration,
// TOD0: swap when ConfigurationMerger is implemented
// collectorConfiguration
- CollectorConfiguration(-1,
- "I do not exist. I'm not even a URL :o",
- emptyList()),
+ CollectorConfiguration(emptyList()),
// end TOD0
logLevel
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index 3e93a400..c1a98294 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -19,7 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.*
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.getOrElse
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.CommandLineParser
import org.apache.commons.cli.DefaultParser
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index a27998e1..f3c149cd 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -54,6 +54,6 @@ internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
internal data class PartialCollectorConfig(
val maxRequestSizeBytes: Option<Int> = None,
- val kafkaServers: Option<List<InetSocketAddress>> = None,
+ val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part
val routing: Option<Routing> = None
)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
new file mode 100644
index 00000000..0cbc0e4a
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -0,0 +1,206 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.google.gson.JsonParser
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import reactor.test.StepVerifier
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object CbsConfigurationProviderTest : Spek({
+
+ describe("Configuration provider") {
+
+ val cbsClient: CbsClient = mock()
+ val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
+ val configStateListener: ConfigurationStateListener = mock()
+
+ given("configuration is never in cbs") {
+ val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
+
+ on("waiting for configuration") {
+ val waitTime = Duration.ofMillis(100)
+
+ it("should not get it") {
+ StepVerifier.create(configProvider().take(1))
+ .expectNoEvent(waitTime)
+ }
+ }
+ }
+
+ given("valid configuration from cbs") {
+ val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
+
+ on("new configuration") {
+ whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ .thenReturn(Flux.just(validConfiguration))
+ it("should use received configuration") {
+
+ StepVerifier.create(configProvider().take(1))
+ .consumeNextWith {
+ val routes = it.collector.orNull()!!.routing.orNull()!!
+ val route1 = routes.elementAt(0)
+ val route2 = routes.elementAt(1)
+ val receivedSink1 = route1.sink
+ val receivedSink2 = route2.sink
+
+ assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
+ assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
+ assertThat(receivedSink1.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
+ assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+
+ assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
+ assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
+ assertThat(receivedSink2.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
+ assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
+
+ }.verifyComplete()
+ }
+ }
+
+ }
+ given("invalid configuration from cbs") {
+ val iterationCount = 3L
+ val configProvider = constructConfigurationProvider(
+ cbsClientMock, configStateListener, iterationCount
+ )
+
+ on("new configuration") {
+ whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ .thenReturn(Flux.just(invalidConfiguration))
+
+ it("should interrupt the flux") {
+ StepVerifier.create(configProvider())
+ .verifyError()
+ }
+
+ it("should call state listener when retrying") {
+ verify(configStateListener, times(iterationCount.toInt())).retrying()
+ }
+ }
+ }
+ }
+
+})
+
+
+val PERF3GPP_REGIONAL = "perf3gpp_regional"
+val PERF3GPP_CENTRAL = "perf3gpp_central"
+
+private val aafCredentials1 = ImmutableAafCredentials.builder()
+ .username("client")
+ .password("very secure password")
+ .build()
+
+private val aafCredentials2 = ImmutableAafCredentials.builder()
+ .username("other_client")
+ .password("another very secure password")
+ .build()
+
+private val validConfiguration = JsonParser().parse("""
+{
+ "streams_publishes": {
+ "$PERF3GPP_REGIONAL": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "topic_name": "REG_HVVES_PERF3GPP"
+ }
+ },
+ "$PERF3GPP_CENTRAL": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "other_client",
+ "password": "another very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
+ "topic_name": "CEN_HVVES_PERF3GPP"
+ }
+ }
+ }
+}""").asJsonObject
+
+private val invalidConfiguration = JsonParser().parse("""
+{
+ "streams_publishes": {
+ "$PERF3GPP_REGIONAL": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "popic_name": "REG_HVVES_PERF3GPP"
+ }
+ }
+ }
+}""").asJsonObject
+
+private val firstRequestDelay = Duration.ofMillis(1)
+private val requestInterval = Duration.ofMillis(1)
+private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
+
+private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+ configurationStateListener: ConfigurationStateListener,
+ iterationCount: Long = 1
+): CbsConfigurationProvider {
+
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return CbsConfigurationProvider(
+ cbsClientMono,
+ CbsConfiguration(firstRequestDelay, requestInterval),
+ streamParser,
+ configurationStateListener,
+ retry,
+ { mapOf("k" to "v") }
+ )
+}