summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt34
1 files changed, 16 insertions, 18 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index 51b6d4f0..754a2efc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
@@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
retrySpec: Retry<Any>
) : ConfigurationProvider {
- constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+ constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
cbsClientMono,
params.firstRequestDelay,
params.requestInterval,
@@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<CollectorConfiguration> =
+ override fun invoke(): Flux<Routing> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
.updates(RequestDiagnosticContext.create(),
firstRequestDelay,
requestInterval)
@@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.retryWhen(retry)
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ private fun createCollectorConfiguration(configuration: JsonObject): Routing =
try {
val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
- CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject
- defineRoute {
- fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- )
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject
+ defineRoute {
+ fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
} catch (e: NullPointerException) {
throw ParsingException("Failed to parse configuration", e)
}