aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main/kotlin/org/onap
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-05-22 22:19:49 +0200
committerkjaniak <kornel.janiak@nokia.com>2019-06-05 16:01:22 +0200
commit1ddd723f22c64dfb8c414fc8573ebe993ed00578 (patch)
tree278aa739a0642cb40c358ffdf8f2e3d2e313425d /sources/hv-collector-configuration/src/main/kotlin/org/onap
parentdc936d27d761bde31ac5916a84efa2f48ec32b83 (diff)
Support CBS request interval reconfiguration
Change-Id: Ie8892e33b2f6a58d6076f66e6cc6a2df830dfa48 Issue-ID: DCAEGEN2-1525 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt56
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt81
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt54
3 files changed, 133 insertions, 58 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index ded75838..e243afe7 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
+import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
-class ConfigurationModule {
+class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener,
+ private val cbsClient: Mono<CbsClient>) {
private val cmd = HvVesCommandLineParser()
private val configParser = JsonConfigurationParser()
@@ -44,10 +50,15 @@ class ConfigurationModule {
private val configValidator = ConfigurationValidator()
private val configTransformer = ConfigurationTransformer()
+ constructor(configStateListener: ConfigurationStateListener) : this(
+ configStateListener,
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
+ )
+
+
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
- configStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Mono.just(cmd.getConfigurationFile(args))
.throwOnLeft(::MissingArgumentException)
@@ -56,23 +67,35 @@ class ConfigurationModule {
.doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
- cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
- .invoke()
- .map { configMerger.merge(basePartialConfig, it) }
- .map(configValidator::validate)
- .throwOnLeft()
- .map(configTransformer::toFinalConfiguration)
+ cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+ cbsConfigurationProvider(cbsClientAdapter, mdc)
+ .invoke()
+ .map { configMerger.merge(basePartialConfig, it) }
+ .map(configValidator::validate)
+ .throwOnLeft()
+ .map(configTransformer::toFinalConfiguration)
+ .doOnNext {
+ cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+ }
+ }
}
- private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
- configStateListener: ConfigurationStateListener,
+ private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
+ CbsClientAdapter(
+ cbsClient,
+ configStateListener,
+ cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+ retrySpec
+ )
+
+ private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
mdc: MappedDiagnosticContext) =
CbsConfigurationProvider(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- cbsConfigurationFrom(basePartialConfig),
+ cbsClientAdapter,
configParser,
configStateListener,
- mdc)
+ mdc,
+ retrySpec)
private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -80,6 +103,13 @@ class ConfigurationModule {
companion object {
private val logger = Logger(ConfigurationModule::class)
+
+ private const val MAX_RETRIES = 5L
+ private const val INITIAL_BACKOFF = 10L
+ private val retrySpec: Retry<Any> = Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+ .jitter(Jitter.random())
}
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
new file mode 100644
index 00000000..d31f6585
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.rx.delayElements
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicReference
+
+
+internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
+ private val configurationStateListener: ConfigurationStateListener,
+ private val firstRequestDelay: Duration,
+ private val retrySpec: Retry<Any>) {
+
+ private val requestInterval = AtomicReference<Duration>(Duration.ZERO)
+
+ fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono
+ .doOnNext {
+ logger.info(mdc) {
+ "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s"
+ }
+ }
+ .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry(mdc))
+ .delayElement(firstRequestDelay)
+ .flatMapMany(::toPeriodicalConfigurations)
+ .distinctUntilChanged()
+
+ fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) {
+ requestInterval.set(intervalUpdate)
+ logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" }
+ }
+
+ private fun toPeriodicalConfigurations(cbsClient: CbsClient) =
+ Mono.just(configurationRequest())
+ .repeat()
+ .map(CbsRequest::withNewInvocationId)
+ .flatMap(cbsClient::get)
+ .transform(delayElements(requestInterval::get))
+
+ private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
+
+ private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception from HV-VES cbs client, retrying subscription", it.exception())
+ }
+ configurationStateListener.retrying()
+ }
+
+ companion object {
+ private val logger = Logger(CbsClientAdapter::class)
+ }
+
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index 4982c732..6efa38e6 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -22,56 +22,31 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.toOption
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcae.collectors.veshv.utils.reader
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
import reactor.retry.Retry
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
- private val cbsConfiguration: CbsConfiguration,
+internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientAdapter,
private val configParser: JsonConfigurationParser,
- private val streamParser: StreamFromGsonParser<KafkaSink>,
private val configurationStateListener: ConfigurationStateListener,
private val mdc: MappedDiagnosticContext,
- retrySpec: Retry<Any>
-
+ retrySpec: Retry<Any>,
+ private val streamParser: StreamFromGsonParser<KafkaSink> =
+ StreamFromGsonParsers.kafkaSinkParser()
) {
- constructor(cbsClientMono: Mono<CbsClient>,
- cbsConfig: CbsConfiguration,
- configParser: JsonConfigurationParser,
- configurationStateListener: ConfigurationStateListener,
- mdc: MappedDiagnosticContext) :
- this(
- cbsClientMono,
- cbsConfig,
- configParser,
- StreamFromGsonParsers.kafkaSinkParser(),
- configurationStateListener,
- mdc,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(cbsConfig.requestInterval)
- .jitter(Jitter.random())
- )
-
private val retry = retrySpec.doOnRetry {
logger.withWarn(mdc) {
log("Exception from configuration provider client, retrying subscription", it.exception())
@@ -80,22 +55,12 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
}
operator fun invoke(): Flux<PartialConfiguration> =
- cbsClientMono
- .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
- .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ cbsClientAdapter.configurationUpdates(mdc)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+ .map(::parseConfiguration)
+ .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
- .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
- .flatMapMany(::handleUpdates)
-
- private fun handleUpdates(cbsClient: CbsClient) = cbsClient
- .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
- cbsConfiguration.firstRequestDelay,
- cbsConfiguration.requestInterval)
- .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
- .map(::parseConfiguration)
- .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
- .onErrorLog(logger, mdc) { "Error while creating configuration" }
- .retryWhen(retry)
private fun parseConfiguration(json: JsonObject) =
configParser
@@ -110,7 +75,6 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
.toList()
companion object {
- private const val MAX_RETRIES = 5L
private val logger = Logger(CbsConfigurationProvider::class)
}
}