diff options
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt')
-rw-r--r-- | sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index b6462936..4982c732 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -19,17 +19,14 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.None -import arrow.core.Option -import arrow.core.Some +import arrow.core.toOption import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcae.collectors.veshv.utils.reader import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient @@ -50,26 +47,29 @@ import reactor.retry.Retry */ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>, private val cbsConfiguration: CbsConfiguration, + private val configParser: JsonConfigurationParser, private val streamParser: StreamFromGsonParser<KafkaSink>, private val configurationStateListener: ConfigurationStateListener, - retrySpec: Retry<Any>, - private val mdc: MappedDiagnosticContext + private val mdc: MappedDiagnosticContext, + retrySpec: Retry<Any> ) { constructor(cbsClientMono: Mono<CbsClient>, cbsConfig: CbsConfiguration, + configParser: JsonConfigurationParser, configurationStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext) : this( cbsClientMono, cbsConfig, + configParser, StreamFromGsonParsers.kafkaSinkParser(), configurationStateListener, + mdc, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(cbsConfig.requestInterval) - .jitter(Jitter.random()), - mdc + .jitter(Jitter.random()) ) private val retry = retrySpec.doOnRetry { @@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien cbsConfiguration.firstRequestDelay, cbsConfiguration.requestInterval) .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } - .map(::createRoutingDescription) + .map(::parseConfiguration) + .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } .onErrorLog(logger, mdc) { "Error while creating configuration" } .retryWhen(retry) - .map { PartialConfiguration(routing = it) } - private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try { - val routes = DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - Some(routes) - } catch (e: NullPointerException) { - logger.withWarn(mdc) { - log("Invalid streams configuration", e) - } - None - } + private fun parseConfiguration(json: JsonObject) = + configParser + .parse(json.reader()) + .apply { streamPublishers = extractStreamDefinitions(json).toOption() } + + private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> = + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .asIterable() + .toList() companion object { private const val MAX_RETRIES = 5L |