diff options
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt')
-rw-r--r-- | sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt | 56 |
1 files changed, 43 insertions, 13 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index ded75838..e243afe7 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator +import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry +import java.time.Duration -class ConfigurationModule { +class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener, + private val cbsClient: Mono<CbsClient>) { private val cmd = HvVesCommandLineParser() private val configParser = JsonConfigurationParser() @@ -44,10 +50,15 @@ class ConfigurationModule { private val configValidator = ConfigurationValidator() private val configTransformer = ConfigurationTransformer() + constructor(configStateListener: ConfigurationStateListener) : this( + configStateListener, + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()) + ) + + fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args) fun hvVesConfigurationUpdates(args: Array<String>, - configStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> = Mono.just(cmd.getConfigurationFile(args)) .throwOnLeft(::MissingArgumentException) @@ -56,23 +67,35 @@ class ConfigurationModule { .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } } .cache() .flatMapMany { basePartialConfig -> - cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) - .invoke() - .map { configMerger.merge(basePartialConfig, it) } - .map(configValidator::validate) - .throwOnLeft() - .map(configTransformer::toFinalConfiguration) + cbsClientAdapter(basePartialConfig).let { cbsClientAdapter -> + cbsConfigurationProvider(cbsClientAdapter, mdc) + .invoke() + .map { configMerger.merge(basePartialConfig, it) } + .map(configValidator::validate) + .throwOnLeft() + .map(configTransformer::toFinalConfiguration) + .doOnNext { + cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc) + } + } } - private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration, - configStateListener: ConfigurationStateListener, + private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) = + CbsClientAdapter( + cbsClient, + configStateListener, + cbsConfigurationFrom(basePartialConfig).firstRequestDelay, + retrySpec + ) + + private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter, mdc: MappedDiagnosticContext) = CbsConfigurationProvider( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - cbsConfigurationFrom(basePartialConfig), + cbsClientAdapter, configParser, configStateListener, - mdc) + mdc, + retrySpec) private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator.validatedCbsConfiguration(basePartialConfig) @@ -80,6 +103,13 @@ class ConfigurationModule { companion object { private val logger = Logger(ConfigurationModule::class) + + private const val MAX_RETRIES = 5L + private const val INITIAL_BACKOFF = 10L + private val retrySpec: Retry<Any> = Retry.any<Any>() + .retryMax(MAX_RETRIES) + .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF)) + .jitter(Jitter.random()) } } |