aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt56
1 files changed, 43 insertions, 13 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index ded75838..e243afe7 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
+import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
-class ConfigurationModule {
+class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener,
+ private val cbsClient: Mono<CbsClient>) {
private val cmd = HvVesCommandLineParser()
private val configParser = JsonConfigurationParser()
@@ -44,10 +50,15 @@ class ConfigurationModule {
private val configValidator = ConfigurationValidator()
private val configTransformer = ConfigurationTransformer()
+ constructor(configStateListener: ConfigurationStateListener) : this(
+ configStateListener,
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
+ )
+
+
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
- configStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Mono.just(cmd.getConfigurationFile(args))
.throwOnLeft(::MissingArgumentException)
@@ -56,23 +67,35 @@ class ConfigurationModule {
.doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
- cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
- .invoke()
- .map { configMerger.merge(basePartialConfig, it) }
- .map(configValidator::validate)
- .throwOnLeft()
- .map(configTransformer::toFinalConfiguration)
+ cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+ cbsConfigurationProvider(cbsClientAdapter, mdc)
+ .invoke()
+ .map { configMerger.merge(basePartialConfig, it) }
+ .map(configValidator::validate)
+ .throwOnLeft()
+ .map(configTransformer::toFinalConfiguration)
+ .doOnNext {
+ cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+ }
+ }
}
- private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
- configStateListener: ConfigurationStateListener,
+ private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
+ CbsClientAdapter(
+ cbsClient,
+ configStateListener,
+ cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+ retrySpec
+ )
+
+ private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
mdc: MappedDiagnosticContext) =
CbsConfigurationProvider(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- cbsConfigurationFrom(basePartialConfig),
+ cbsClientAdapter,
configParser,
configStateListener,
- mdc)
+ mdc,
+ retrySpec)
private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -80,6 +103,13 @@ class ConfigurationModule {
companion object {
private val logger = Logger(ConfigurationModule::class)
+
+ private const val MAX_RETRIES = 5L
+ private const val INITIAL_BACKOFF = 10L
+ private val retrySpec: Retry<Any> = Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+ .jitter(Jitter.random())
}
}