aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt47
1 files changed, 22 insertions, 25 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index b6462936..4982c732 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -19,17 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.Some
+import arrow.core.toOption
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.reader
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
@@ -50,26 +47,29 @@ import reactor.retry.Retry
*/
internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
private val cbsConfiguration: CbsConfiguration,
+ private val configParser: JsonConfigurationParser,
private val streamParser: StreamFromGsonParser<KafkaSink>,
private val configurationStateListener: ConfigurationStateListener,
- retrySpec: Retry<Any>,
- private val mdc: MappedDiagnosticContext
+ private val mdc: MappedDiagnosticContext,
+ retrySpec: Retry<Any>
) {
constructor(cbsClientMono: Mono<CbsClient>,
cbsConfig: CbsConfiguration,
+ configParser: JsonConfigurationParser,
configurationStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext) :
this(
cbsClientMono,
cbsConfig,
+ configParser,
StreamFromGsonParsers.kafkaSinkParser(),
configurationStateListener,
+ mdc,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(cbsConfig.requestInterval)
- .jitter(Jitter.random()),
- mdc
+ .jitter(Jitter.random())
)
private val retry = retrySpec.doOnRetry {
@@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
cbsConfiguration.firstRequestDelay,
cbsConfiguration.requestInterval)
.doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
- .map(::createRoutingDescription)
+ .map(::parseConfiguration)
+ .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
.onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
- .map { PartialConfiguration(routing = it) }
- private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
- val routes = DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
- .map(streamParser::unsafeParse)
- .map { Route(it.name(), it) }
- .asIterable()
- .toList()
- Some(routes)
- } catch (e: NullPointerException) {
- logger.withWarn(mdc) {
- log("Invalid streams configuration", e)
- }
- None
- }
+ private fun parseConfiguration(json: JsonObject) =
+ configParser
+ .parse(json.reader())
+ .apply { streamPublishers = extractStreamDefinitions(json).toOption() }
+
+ private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> =
+ DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .asIterable()
+ .toList()
companion object {
private const val MAX_RETRIES = 5L