aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt54
1 files changed, 9 insertions, 45 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index 4982c732..6efa38e6 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -22,56 +22,31 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.toOption
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcae.collectors.veshv.utils.reader
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
import reactor.retry.Retry
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
- private val cbsConfiguration: CbsConfiguration,
+internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientAdapter,
private val configParser: JsonConfigurationParser,
- private val streamParser: StreamFromGsonParser<KafkaSink>,
private val configurationStateListener: ConfigurationStateListener,
private val mdc: MappedDiagnosticContext,
- retrySpec: Retry<Any>
-
+ retrySpec: Retry<Any>,
+ private val streamParser: StreamFromGsonParser<KafkaSink> =
+ StreamFromGsonParsers.kafkaSinkParser()
) {
- constructor(cbsClientMono: Mono<CbsClient>,
- cbsConfig: CbsConfiguration,
- configParser: JsonConfigurationParser,
- configurationStateListener: ConfigurationStateListener,
- mdc: MappedDiagnosticContext) :
- this(
- cbsClientMono,
- cbsConfig,
- configParser,
- StreamFromGsonParsers.kafkaSinkParser(),
- configurationStateListener,
- mdc,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(cbsConfig.requestInterval)
- .jitter(Jitter.random())
- )
-
private val retry = retrySpec.doOnRetry {
logger.withWarn(mdc) {
log("Exception from configuration provider client, retrying subscription", it.exception())
@@ -80,22 +55,12 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
}
operator fun invoke(): Flux<PartialConfiguration> =
- cbsClientMono
- .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
- .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ cbsClientAdapter.configurationUpdates(mdc)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+ .map(::parseConfiguration)
+ .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
- .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
- .flatMapMany(::handleUpdates)
-
- private fun handleUpdates(cbsClient: CbsClient) = cbsClient
- .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
- cbsConfiguration.firstRequestDelay,
- cbsConfiguration.requestInterval)
- .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
- .map(::parseConfiguration)
- .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
- .onErrorLog(logger, mdc) { "Error while creating configuration" }
- .retryWhen(retry)
private fun parseConfiguration(json: JsonObject) =
configParser
@@ -110,7 +75,6 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
.toList()
companion object {
- private const val MAX_RETRIES = 5L
private val logger = Logger(CbsConfigurationProvider::class)
}
}