From 1ddd723f22c64dfb8c414fc8573ebe993ed00578 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Wed, 22 May 2019 22:19:49 +0200 Subject: Support CBS request interval reconfiguration Change-Id: Ie8892e33b2f6a58d6076f66e6cc6a2df830dfa48 Issue-ID: DCAEGEN2-1525 Signed-off-by: kjaniak --- .../veshv/config/api/ConfigurationModule.kt | 56 +++++++++++---- .../veshv/config/impl/CbsClientAdapter.kt | 81 ++++++++++++++++++++++ .../veshv/config/impl/CbsConfigurationProvider.kt | 54 +++------------ 3 files changed, 133 insertions(+), 58 deletions(-) create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap') diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index ded75838..e243afe7 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator +import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry +import java.time.Duration -class ConfigurationModule { +class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener, + private val cbsClient: Mono) { private val cmd = HvVesCommandLineParser() private val configParser = JsonConfigurationParser() @@ -44,10 +50,15 @@ class ConfigurationModule { private val configValidator = ConfigurationValidator() private val configTransformer = ConfigurationTransformer() + constructor(configStateListener: ConfigurationStateListener) : this( + configStateListener, + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()) + ) + + fun healthCheckPort(args: Array): Int = cmd.getHealthcheckPort(args) fun hvVesConfigurationUpdates(args: Array, - configStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext): Flux = Mono.just(cmd.getConfigurationFile(args)) .throwOnLeft(::MissingArgumentException) @@ -56,23 +67,35 @@ class ConfigurationModule { .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } } .cache() .flatMapMany { basePartialConfig -> - cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) - .invoke() - .map { configMerger.merge(basePartialConfig, it) } - .map(configValidator::validate) - .throwOnLeft() - .map(configTransformer::toFinalConfiguration) + cbsClientAdapter(basePartialConfig).let { cbsClientAdapter -> + cbsConfigurationProvider(cbsClientAdapter, mdc) + .invoke() + .map { configMerger.merge(basePartialConfig, it) } + .map(configValidator::validate) + .throwOnLeft() + .map(configTransformer::toFinalConfiguration) + .doOnNext { + cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc) + } + } } - private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration, - configStateListener: ConfigurationStateListener, + private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) = + CbsClientAdapter( + cbsClient, + configStateListener, + cbsConfigurationFrom(basePartialConfig).firstRequestDelay, + retrySpec + ) + + private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter, mdc: MappedDiagnosticContext) = CbsConfigurationProvider( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - cbsConfigurationFrom(basePartialConfig), + cbsClientAdapter, configParser, configStateListener, - mdc) + mdc, + retrySpec) private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator.validatedCbsConfiguration(basePartialConfig) @@ -80,6 +103,13 @@ class ConfigurationModule { companion object { private val logger = Logger(ConfigurationModule::class) + + private const val MAX_RETRIES = 5L + private const val INITIAL_BACKOFF = 10L + private val retrySpec: Retry = Retry.any() + .retryMax(MAX_RETRIES) + .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF)) + .jitter(Jitter.random()) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt new file mode 100644 index 00000000..d31f6585 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcae.collectors.veshv.utils.rx.delayElements +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Mono +import reactor.retry.Retry +import java.time.Duration +import java.util.concurrent.atomic.AtomicReference + + +internal class CbsClientAdapter(private val cbsClientMono: Mono, + private val configurationStateListener: ConfigurationStateListener, + private val firstRequestDelay: Duration, + private val retrySpec: Retry) { + + private val requestInterval = AtomicReference(Duration.ZERO) + + fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono + .doOnNext { + logger.info(mdc) { + "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s" + } + } + .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry(mdc)) + .delayElement(firstRequestDelay) + .flatMapMany(::toPeriodicalConfigurations) + .distinctUntilChanged() + + fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) { + requestInterval.set(intervalUpdate) + logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" } + } + + private fun toPeriodicalConfigurations(cbsClient: CbsClient) = + Mono.just(configurationRequest()) + .repeat() + .map(CbsRequest::withNewInvocationId) + .flatMap(cbsClient::get) + .transform(delayElements(requestInterval::get)) + + private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create()) + + private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry { + logger.withWarn(mdc) { + log("Exception from HV-VES cbs client, retrying subscription", it.exception()) + } + configurationStateListener.retrying() + } + + companion object { + private val logger = Logger(CbsClientAdapter::class) + } + +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index 4982c732..6efa38e6 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -22,56 +22,31 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.toOption import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcae.collectors.veshv.utils.reader import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter import reactor.retry.Retry /** * @author Jakub Dudycz * @since May 2018 */ -internal class CbsConfigurationProvider(private val cbsClientMono: Mono, - private val cbsConfiguration: CbsConfiguration, +internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientAdapter, private val configParser: JsonConfigurationParser, - private val streamParser: StreamFromGsonParser, private val configurationStateListener: ConfigurationStateListener, private val mdc: MappedDiagnosticContext, - retrySpec: Retry - + retrySpec: Retry, + private val streamParser: StreamFromGsonParser = + StreamFromGsonParsers.kafkaSinkParser() ) { - constructor(cbsClientMono: Mono, - cbsConfig: CbsConfiguration, - configParser: JsonConfigurationParser, - configurationStateListener: ConfigurationStateListener, - mdc: MappedDiagnosticContext) : - this( - cbsClientMono, - cbsConfig, - configParser, - StreamFromGsonParsers.kafkaSinkParser(), - configurationStateListener, - mdc, - Retry.any() - .retryMax(MAX_RETRIES) - .fixedBackoff(cbsConfig.requestInterval) - .jitter(Jitter.random()) - ) - private val retry = retrySpec.doOnRetry { logger.withWarn(mdc) { log("Exception from configuration provider client, retrying subscription", it.exception()) @@ -80,22 +55,12 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono = - cbsClientMono - .doOnNext { logger.info(mdc) { "CBS client successfully created" } } - .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } + cbsClientAdapter.configurationUpdates(mdc) + .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } + .map(::parseConfiguration) + .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } + .onErrorLog(logger, mdc) { "Error while creating configuration" } .retryWhen(retry) - .doFinally { logger.trace(mdc) { "CBS client subscription finished" } } - .flatMapMany(::handleUpdates) - - private fun handleUpdates(cbsClient: CbsClient) = cbsClient - .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), - cbsConfiguration.firstRequestDelay, - cbsConfiguration.requestInterval) - .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } - .map(::parseConfiguration) - .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } - .onErrorLog(logger, mdc) { "Error while creating configuration" } - .retryWhen(retry) private fun parseConfiguration(json: JsonObject) = configParser @@ -110,7 +75,6 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono