From b98ad1204d2e3613bfb0648dd7b8ab865752b963 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Mon, 10 Jun 2019 09:02:43 +0200 Subject: Retry infinitely - changed specification to retry infinitely - moved MDC to cbsAdapter constructor as in whole module it contains only local context (instanceID etc.). Also permuted constructor params to match ConfigurationProviders order - refactored module tests as ground for future enhancements Change-Id: Ic074b9c421b60662e5512c55c7b1dfb90ab0d2ea Issue-ID: DCAEGEN2-1557 Signed-off-by: Filip Krzywka --- .../veshv/config/api/ConfigurationModule.kt | 44 +++++++++++----------- .../veshv/config/impl/CbsClientAdapter.kt | 28 ++++++-------- .../veshv/config/impl/CbsConfigurationProvider.kt | 2 +- 3 files changed, 35 insertions(+), 39 deletions(-) (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap') diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index e243afe7..35adfe79 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -55,7 +55,6 @@ class ConfigurationModule internal constructor(private val configStateListener: CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()) ) - fun healthCheckPort(args: Array): Int = cmd.getHealthcheckPort(args) fun hvVesConfigurationUpdates(args: Array, @@ -67,7 +66,7 @@ class ConfigurationModule internal constructor(private val configStateListener: .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } } .cache() .flatMapMany { basePartialConfig -> - cbsClientAdapter(basePartialConfig).let { cbsClientAdapter -> + cbsClientAdapter(basePartialConfig, mdc).let { cbsClientAdapter -> cbsConfigurationProvider(cbsClientAdapter, mdc) .invoke() .map { configMerger.merge(basePartialConfig, it) } @@ -75,27 +74,28 @@ class ConfigurationModule internal constructor(private val configStateListener: .throwOnLeft() .map(configTransformer::toFinalConfiguration) .doOnNext { - cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc) + cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval) } } } - private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) = - CbsClientAdapter( - cbsClient, - configStateListener, - cbsConfigurationFrom(basePartialConfig).firstRequestDelay, - retrySpec - ) + private fun cbsClientAdapter(basePartialConfig: PartialConfiguration, + mdc: MappedDiagnosticContext) = CbsClientAdapter( + cbsClient, + cbsConfigurationFrom(basePartialConfig).firstRequestDelay, + configStateListener, + mdc, + infiniteRetry + ) private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter, - mdc: MappedDiagnosticContext) = - CbsConfigurationProvider( - cbsClientAdapter, - configParser, - configStateListener, - mdc, - retrySpec) + mdc: MappedDiagnosticContext) = CbsConfigurationProvider( + cbsClientAdapter, + configParser, + configStateListener, + mdc, + infiniteRetry + ) private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator.validatedCbsConfiguration(basePartialConfig) @@ -104,11 +104,11 @@ class ConfigurationModule internal constructor(private val configStateListener: companion object { private val logger = Logger(ConfigurationModule::class) - private const val MAX_RETRIES = 5L - private const val INITIAL_BACKOFF = 10L - private val retrySpec: Retry = Retry.any() - .retryMax(MAX_RETRIES) - .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF)) + private val FIRST_BACKOFF_DURATION = Duration.ofSeconds(5) + private val MAX_BACKOFF_DURATION = Duration.ofMinutes(5) + private val infiniteRetry: Retry = Retry.any() + .retryMax(Long.MAX_VALUE) + .exponentialBackoff(FIRST_BACKOFF_DURATION, MAX_BACKOFF_DURATION) .jitter(Jitter.random()) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt index d31f6585..8b7ed67f 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext -import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcae.collectors.veshv.utils.rx.delayElements import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests @@ -35,26 +34,31 @@ import java.util.concurrent.atomic.AtomicReference internal class CbsClientAdapter(private val cbsClientMono: Mono, - private val configurationStateListener: ConfigurationStateListener, private val firstRequestDelay: Duration, - private val retrySpec: Retry) { + private val configurationStateListener: ConfigurationStateListener, + private val mdc: MappedDiagnosticContext, + retrySpec: Retry) { private val requestInterval = AtomicReference(Duration.ZERO) + private val retry = retrySpec.doOnRetry { + logger.withWarn(mdc) { + log("Exception while creating CBS client, retrying. Reason: ${it.exception().localizedMessage}") + } + configurationStateListener.retrying() + } - fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono + fun configurationUpdates() = cbsClientMono .doOnNext { logger.info(mdc) { "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s" } } - .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } - .retryWhen(retry(mdc)) + .retryWhen(retry) .delayElement(firstRequestDelay) .flatMapMany(::toPeriodicalConfigurations) .distinctUntilChanged() - fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) { - requestInterval.set(intervalUpdate) + fun updateCbsInterval(intervalUpdate: Duration) = requestInterval.set(intervalUpdate).also { logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" } } @@ -67,15 +71,7 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono, private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create()) - private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry { - logger.withWarn(mdc) { - log("Exception from HV-VES cbs client, retrying subscription", it.exception()) - } - configurationStateListener.retrying() - } - companion object { private val logger = Logger(CbsClientAdapter::class) } - } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index 6efa38e6..6f16b3d1 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -55,7 +55,7 @@ internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientA } operator fun invoke(): Flux = - cbsClientAdapter.configurationUpdates(mdc) + cbsClientAdapter.configurationUpdates() .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } .map(::parseConfiguration) .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } -- cgit 1.2.3-korg