aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt119
1 files changed, 119 insertions, 0 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
new file mode 100644
index 00000000..2038c31a
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -0,0 +1,119 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
+ private val cbsConfiguration: CbsConfiguration,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
+ private val configurationStateListener: ConfigurationStateListener,
+ retrySpec: Retry<Any>,
+ private val mdc: MappedDiagnosticContext
+
+) {
+ constructor(cbsClientMono: Mono<CbsClient>,
+ cbsConfig: CbsConfiguration,
+ configurationStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext) :
+ this(
+ cbsClientMono,
+ cbsConfig,
+ StreamFromGsonParsers.kafkaSinkParser(),
+ configurationStateListener,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(cbsConfig.requestInterval)
+ .jitter(Jitter.random()),
+ mdc
+ )
+
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception from configuration provider client, retrying subscription", it.exception())
+ }
+ configurationStateListener.retrying()
+ }
+
+ operator fun invoke(): Flux<PartialConfiguration> =
+ cbsClientMono
+ .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry)
+ .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
+ .flatMapMany(::handleUpdates)
+
+ private fun handleUpdates(cbsClient: CbsClient) = cbsClient
+ .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
+ cbsConfiguration.firstRequestDelay,
+ cbsConfiguration.requestInterval)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+ .map(::createRoutingDescription)
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
+ .retryWhen(retry)
+ .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) }
+
+ private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
+ val routes = DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .map { Route(it.name(), it) }
+ .asIterable()
+ .toList()
+ Some(routes)
+ } catch (e: NullPointerException) {
+ logger.withWarn(mdc) {
+ log("Invalid streams configuration", e)
+ }
+ None
+ }
+
+ companion object {
+ private const val MAX_RETRIES = 5L
+ private val logger = Logger(CbsConfigurationProvider::class)
+ }
+}