diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt | 34 |
1 files changed, 16 insertions, 18 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 51b6d4f0..754a2efc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog @@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie retrySpec: Retry<Any> ) : ConfigurationProvider { - constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this( + constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this( cbsClientMono, params.firstRequestDelay, params.requestInterval, @@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<CollectorConfiguration> = + override fun invoke(): Flux<Routing> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient .updates(RequestDiagnosticContext.create(), firstRequestDelay, requestInterval) @@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .retryWhen(retry) - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + private fun createCollectorConfiguration(configuration: JsonObject): Routing = try { val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) - CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject - defineRoute { - fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - ) + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject + defineRoute { + fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() } catch (e: NullPointerException) { throw ParsingException("Failed to parse configuration", e) } |