aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-06-10 09:02:43 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-06-12 13:59:55 +0200
commitb98ad1204d2e3613bfb0648dd7b8ab865752b963 (patch)
treeb64229b5d8fc5e6ced3ec85d07195295e4b2c008 /sources/hv-collector-configuration/src/main
parent1c9ec6db2d7296131b2baf4433bdeb0f228775db (diff)
Retry infinitely
- changed specification to retry infinitely - moved MDC to cbsAdapter constructor as in whole module it contains only local context (instanceID etc.). Also permuted constructor params to match ConfigurationProviders order - refactored module tests as ground for future enhancements Change-Id: Ic074b9c421b60662e5512c55c7b1dfb90ab0d2ea Issue-ID: DCAEGEN2-1557 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-configuration/src/main')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt44
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt28
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt2
3 files changed, 35 insertions, 39 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index e243afe7..35adfe79 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -55,7 +55,6 @@ class ConfigurationModule internal constructor(private val configStateListener:
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
)
-
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
@@ -67,7 +66,7 @@ class ConfigurationModule internal constructor(private val configStateListener:
.doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
- cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+ cbsClientAdapter(basePartialConfig, mdc).let { cbsClientAdapter ->
cbsConfigurationProvider(cbsClientAdapter, mdc)
.invoke()
.map { configMerger.merge(basePartialConfig, it) }
@@ -75,27 +74,28 @@ class ConfigurationModule internal constructor(private val configStateListener:
.throwOnLeft()
.map(configTransformer::toFinalConfiguration)
.doOnNext {
- cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+ cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval)
}
}
}
- private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
- CbsClientAdapter(
- cbsClient,
- configStateListener,
- cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
- retrySpec
- )
+ private fun cbsClientAdapter(basePartialConfig: PartialConfiguration,
+ mdc: MappedDiagnosticContext) = CbsClientAdapter(
+ cbsClient,
+ cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+ configStateListener,
+ mdc,
+ infiniteRetry
+ )
private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
- mdc: MappedDiagnosticContext) =
- CbsConfigurationProvider(
- cbsClientAdapter,
- configParser,
- configStateListener,
- mdc,
- retrySpec)
+ mdc: MappedDiagnosticContext) = CbsConfigurationProvider(
+ cbsClientAdapter,
+ configParser,
+ configStateListener,
+ mdc,
+ infiniteRetry
+ )
private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -104,11 +104,11 @@ class ConfigurationModule internal constructor(private val configStateListener:
companion object {
private val logger = Logger(ConfigurationModule::class)
- private const val MAX_RETRIES = 5L
- private const val INITIAL_BACKOFF = 10L
- private val retrySpec: Retry<Any> = Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+ private val FIRST_BACKOFF_DURATION = Duration.ofSeconds(5)
+ private val MAX_BACKOFF_DURATION = Duration.ofMinutes(5)
+ private val infiniteRetry: Retry<Any> = Retry.any<Any>()
+ .retryMax(Long.MAX_VALUE)
+ .exponentialBackoff(FIRST_BACKOFF_DURATION, MAX_BACKOFF_DURATION)
.jitter(Jitter.random())
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
index d31f6585..8b7ed67f 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
-import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcae.collectors.veshv.utils.rx.delayElements
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
@@ -35,26 +34,31 @@ import java.util.concurrent.atomic.AtomicReference
internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
- private val configurationStateListener: ConfigurationStateListener,
private val firstRequestDelay: Duration,
- private val retrySpec: Retry<Any>) {
+ private val configurationStateListener: ConfigurationStateListener,
+ private val mdc: MappedDiagnosticContext,
+ retrySpec: Retry<Any>) {
private val requestInterval = AtomicReference<Duration>(Duration.ZERO)
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception while creating CBS client, retrying. Reason: ${it.exception().localizedMessage}")
+ }
+ configurationStateListener.retrying()
+ }
- fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono
+ fun configurationUpdates() = cbsClientMono
.doOnNext {
logger.info(mdc) {
"CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s"
}
}
- .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
- .retryWhen(retry(mdc))
+ .retryWhen(retry)
.delayElement(firstRequestDelay)
.flatMapMany(::toPeriodicalConfigurations)
.distinctUntilChanged()
- fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) {
- requestInterval.set(intervalUpdate)
+ fun updateCbsInterval(intervalUpdate: Duration) = requestInterval.set(intervalUpdate).also {
logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" }
}
@@ -67,15 +71,7 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
- private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry {
- logger.withWarn(mdc) {
- log("Exception from HV-VES cbs client, retrying subscription", it.exception())
- }
- configurationStateListener.retrying()
- }
-
companion object {
private val logger = Logger(CbsClientAdapter::class)
}
-
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index 6efa38e6..6f16b3d1 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -55,7 +55,7 @@ internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientA
}
operator fun invoke(): Flux<PartialConfiguration> =
- cbsClientAdapter.configurationUpdates(mdc)
+ cbsClientAdapter.configurationUpdates()
.doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
.map(::parseConfiguration)
.doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }