From 1ddd723f22c64dfb8c414fc8573ebe993ed00578 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Wed, 22 May 2019 22:19:49 +0200 Subject: Support CBS request interval reconfiguration Change-Id: Ie8892e33b2f6a58d6076f66e6cc6a2df830dfa48 Issue-ID: DCAEGEN2-1525 Signed-off-by: kjaniak --- .../veshv/config/api/ConfigurationModule.kt | 56 +++++++--- .../veshv/config/impl/CbsClientAdapter.kt | 81 ++++++++++++++ .../veshv/config/impl/CbsConfigurationProvider.kt | 54 ++-------- .../veshv/config/api/ConfigurationModuleIT.kt | 120 +++++++++++++++++++++ .../config/impl/CbsConfigurationProviderTest.kt | 58 +++++----- .../src/test/resources/insecureSampleConfig.json | 8 ++ .../org/onap/dcae/collectors/veshv/main/main.kt | 5 +- .../org/onap/dcae/collectors/veshv/utils/rx/rx.kt | 6 ++ 8 files changed, 297 insertions(+), 91 deletions(-) create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt create mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt create mode 100644 sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index ded75838..e243afe7 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator +import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry +import java.time.Duration -class ConfigurationModule { +class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener, + private val cbsClient: Mono) { private val cmd = HvVesCommandLineParser() private val configParser = JsonConfigurationParser() @@ -44,10 +50,15 @@ class ConfigurationModule { private val configValidator = ConfigurationValidator() private val configTransformer = ConfigurationTransformer() + constructor(configStateListener: ConfigurationStateListener) : this( + configStateListener, + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()) + ) + + fun healthCheckPort(args: Array): Int = cmd.getHealthcheckPort(args) fun hvVesConfigurationUpdates(args: Array, - configStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext): Flux = Mono.just(cmd.getConfigurationFile(args)) .throwOnLeft(::MissingArgumentException) @@ -56,23 +67,35 @@ class ConfigurationModule { .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } } .cache() .flatMapMany { basePartialConfig -> - cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) - .invoke() - .map { configMerger.merge(basePartialConfig, it) } - .map(configValidator::validate) - .throwOnLeft() - .map(configTransformer::toFinalConfiguration) + cbsClientAdapter(basePartialConfig).let { cbsClientAdapter -> + cbsConfigurationProvider(cbsClientAdapter, mdc) + .invoke() + .map { configMerger.merge(basePartialConfig, it) } + .map(configValidator::validate) + .throwOnLeft() + .map(configTransformer::toFinalConfiguration) + .doOnNext { + cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc) + } + } } - private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration, - configStateListener: ConfigurationStateListener, + private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) = + CbsClientAdapter( + cbsClient, + configStateListener, + cbsConfigurationFrom(basePartialConfig).firstRequestDelay, + retrySpec + ) + + private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter, mdc: MappedDiagnosticContext) = CbsConfigurationProvider( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - cbsConfigurationFrom(basePartialConfig), + cbsClientAdapter, configParser, configStateListener, - mdc) + mdc, + retrySpec) private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator.validatedCbsConfiguration(basePartialConfig) @@ -80,6 +103,13 @@ class ConfigurationModule { companion object { private val logger = Logger(ConfigurationModule::class) + + private const val MAX_RETRIES = 5L + private const val INITIAL_BACKOFF = 10L + private val retrySpec: Retry = Retry.any() + .retryMax(MAX_RETRIES) + .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF)) + .jitter(Jitter.random()) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt new file mode 100644 index 00000000..d31f6585 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcae.collectors.veshv.utils.rx.delayElements +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Mono +import reactor.retry.Retry +import java.time.Duration +import java.util.concurrent.atomic.AtomicReference + + +internal class CbsClientAdapter(private val cbsClientMono: Mono, + private val configurationStateListener: ConfigurationStateListener, + private val firstRequestDelay: Duration, + private val retrySpec: Retry) { + + private val requestInterval = AtomicReference(Duration.ZERO) + + fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono + .doOnNext { + logger.info(mdc) { + "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s" + } + } + .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry(mdc)) + .delayElement(firstRequestDelay) + .flatMapMany(::toPeriodicalConfigurations) + .distinctUntilChanged() + + fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) { + requestInterval.set(intervalUpdate) + logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" } + } + + private fun toPeriodicalConfigurations(cbsClient: CbsClient) = + Mono.just(configurationRequest()) + .repeat() + .map(CbsRequest::withNewInvocationId) + .flatMap(cbsClient::get) + .transform(delayElements(requestInterval::get)) + + private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create()) + + private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry { + logger.withWarn(mdc) { + log("Exception from HV-VES cbs client, retrying subscription", it.exception()) + } + configurationStateListener.retrying() + } + + companion object { + private val logger = Logger(CbsClientAdapter::class) + } + +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index 4982c732..6efa38e6 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -22,56 +22,31 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.toOption import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcae.collectors.veshv.utils.reader import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter import reactor.retry.Retry /** * @author Jakub Dudycz * @since May 2018 */ -internal class CbsConfigurationProvider(private val cbsClientMono: Mono, - private val cbsConfiguration: CbsConfiguration, +internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientAdapter, private val configParser: JsonConfigurationParser, - private val streamParser: StreamFromGsonParser, private val configurationStateListener: ConfigurationStateListener, private val mdc: MappedDiagnosticContext, - retrySpec: Retry - + retrySpec: Retry, + private val streamParser: StreamFromGsonParser = + StreamFromGsonParsers.kafkaSinkParser() ) { - constructor(cbsClientMono: Mono, - cbsConfig: CbsConfiguration, - configParser: JsonConfigurationParser, - configurationStateListener: ConfigurationStateListener, - mdc: MappedDiagnosticContext) : - this( - cbsClientMono, - cbsConfig, - configParser, - StreamFromGsonParsers.kafkaSinkParser(), - configurationStateListener, - mdc, - Retry.any() - .retryMax(MAX_RETRIES) - .fixedBackoff(cbsConfig.requestInterval) - .jitter(Jitter.random()) - ) - private val retry = retrySpec.doOnRetry { logger.withWarn(mdc) { log("Exception from configuration provider client, retrying subscription", it.exception()) @@ -80,22 +55,12 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono = - cbsClientMono - .doOnNext { logger.info(mdc) { "CBS client successfully created" } } - .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } + cbsClientAdapter.configurationUpdates(mdc) + .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } + .map(::parseConfiguration) + .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } + .onErrorLog(logger, mdc) { "Error while creating configuration" } .retryWhen(retry) - .doFinally { logger.trace(mdc) { "CBS client subscription finished" } } - .flatMapMany(::handleUpdates) - - private fun handleUpdates(cbsClient: CbsClient) = cbsClient - .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), - cbsConfiguration.firstRequestDelay, - cbsConfiguration.requestInterval) - .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } - .map(::parseConfiguration) - .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } - .onErrorLog(logger, mdc) { "Error while creating configuration" } - .retryWhen(retry) private fun parseConfiguration(json: JsonObject) = configParser @@ -110,7 +75,6 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono() + val configStateListenerMock = mock() + val sut = ConfigurationModule(configStateListenerMock, Mono.just(cbsClientMock)) + val configPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json") + + given("sample configuration in file: $configPath") { + val arguments = arrayOf( + "--configuration-file", + configPath, + "--health-check-api-port", + "6062") + on("configuration changes in Config Binding Service") { + whenever(cbsClientMock.get(any())) + .thenReturn( + Mono.just(configurationJsonWithIntervalChanged), + Mono.just(configurationJsonWithIntervalChangedAgain), + Mono.just(configurationJsonWithIntervalRestored) + ) + it("should wait $firstRequestDelayFromFile s as provided in configuration file and later" + + " fetch configurations in intervals specified within them") { + StepVerifier + .withVirtualTime { + sut.hvVesConfigurationUpdates(arguments, sampleMdc) + .take(3) + } + .expectSubscription() + .expectNoEvent(firstRequestDelayFromFile) + .expectNext(configurationWithIntervalChanged) + .expectNoEvent(requestIntervalFromCBS) + .expectNext(configurationWithIntervalChangedAgain) + .expectNoEvent(anotherRequestIntervalFromCBS) + .expectNext(configurationWithIntervalRestored) + .verifyComplete() + } + } + } + } +}) + +private val firstRequestDelayFromFile = Duration.ofSeconds(3) +private val firstRequestDelayFromCBS = Duration.ofSeconds(999) +private val requestIntervalFromCBS = Duration.ofSeconds(10) +private val anotherRequestIntervalFromCBS = Duration.ofSeconds(20) + +private val sampleMdc = { mapOf("k" to "v") } +private val emptyRouting = listOf() + +private val configurationJsonWithIntervalChanged = JsonParser().parse("""{ + "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds} +}""").asJsonObject + +private val configurationJsonWithIntervalChangedAgain = JsonParser().parse("""{ + "cbs.firstRequestDelaySec": ${firstRequestDelayFromCBS.seconds}, + "cbs.requestIntervalSec": ${anotherRequestIntervalFromCBS.seconds} +}""").asJsonObject + +private val configurationJsonWithIntervalRestored = JsonParser().parse("""{ + "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds} +}""").asJsonObject + +private val configurationWithIntervalChanged = + hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS) + +private val configurationWithIntervalChangedAgain = + hvVesConfiguration(firstRequestDelayFromCBS, anotherRequestIntervalFromCBS) + +private val configurationWithIntervalRestored = + hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS) + +private fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration): HvVesConfiguration { + return HvVesConfiguration( + ServerConfiguration(6061, Duration.ofSeconds(60)), + CbsConfiguration(firstRequestDelay, requestInterval), + SecurityConfiguration(Option.empty()), + CollectorConfiguration(emptyRouting, 1024 * 1024), + LogLevel.DEBUG) +} \ No newline at end of file diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt index 8c3c22aa..31415454 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Some import com.google.gson.JsonParser import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.times import com.nhaarman.mockitokotlin2.verify @@ -34,10 +33,8 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Retry @@ -52,12 +49,15 @@ internal object CbsConfigurationProviderTest : Spek({ describe("Configuration provider") { - val cbsClient = mock() - val cbsClientMock = Mono.just(cbsClient) + val cbsClientAdapter = mock() val configStateListener = mock() given("configuration is never in cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) + val cbsClientMock = mock() + val configProvider = constructConfigurationProvider( + constructCbsClientAdapter(cbsClientMock, configStateListener), + configStateListener + ) on("waiting for configuration") { val waitTime = Duration.ofMillis(100) @@ -70,16 +70,16 @@ internal object CbsConfigurationProviderTest : Spek({ } given("valid configuration from cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) + val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener) on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + whenever(cbsClientAdapter.configurationUpdates(any())) .thenReturn(Flux.just(validConfiguration)) it("should use received configuration") { StepVerifier.create(configProvider().take(1)) .consumeNextWith { - + assertThat(it.requestIntervalSec).isEqualTo(Some(5L)) assertThat(it.listenPort).isEqualTo(Some(6061)) assertThat(it.idleTimeoutSec).isEqualTo(Some(60L)) @@ -106,11 +106,11 @@ internal object CbsConfigurationProviderTest : Spek({ given("invalid configuration from cbs") { val iterationCount = 3L val configProvider = constructConfigurationProvider( - cbsClientMock, configStateListener, iterationCount + cbsClientAdapter, configStateListener, iterationCount ) on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + whenever(cbsClientAdapter.configurationUpdates(any())) .thenReturn(Flux.just(invalidConfiguration)) it("should interrupt the flux") { @@ -146,6 +146,7 @@ private val validConfiguration = JsonParser().parse(""" { "server.listenPort": 6061, "server.idleTimeoutSec": 60, + "cbs.requestIntervalSec": 5, "streams_publishes": { "$PERF3GPP_REGIONAL": { "type": "kafka", @@ -190,26 +191,23 @@ private val invalidConfiguration = JsonParser().parse(""" }""").asJsonObject private val firstRequestDelay = Duration.ofMillis(1) -private val requestInterval = Duration.ofMillis(1) -private val streamParser = StreamFromGsonParsers.kafkaSinkParser() private val configParser = JsonConfigurationParser() -private fun constructConfigurationProvider(cbsClientMono: Mono, +private fun retry(iterationCount: Long = 1) = Retry + .onlyIf { it.iteration() <= iterationCount } + .fixedBackoff(Duration.ofNanos(1)) + +private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) = + CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry()) + +private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter, configurationStateListener: ConfigurationStateListener, iterationCount: Long = 1 -): CbsConfigurationProvider { - - val retry = Retry - .onlyIf { it.iteration() <= iterationCount } - .fixedBackoff(Duration.ofNanos(1)) - - return CbsConfigurationProvider( - cbsClientMono, - CbsConfiguration(firstRequestDelay, requestInterval), - configParser, - streamParser, - configurationStateListener, - { mapOf("k" to "v") }, - retry - ) -} +): CbsConfigurationProvider = + CbsConfigurationProvider( + cbsClientAdapter, + configParser, + configurationStateListener, + { mapOf("k" to "v") }, + retry(iterationCount) + ) diff --git a/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json b/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json new file mode 100644 index 00000000..4fc59212 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json @@ -0,0 +1,8 @@ +{ + "logLevel": "DEBUG", + "server.listenPort": 6061, + "server.idleTimeoutSec": 60, + "cbs.firstRequestDelaySec": 3, + "cbs.requestIntervalSec": 5, + "security.sslDisable": "true" +} \ No newline at end of file diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 123d2dc9..3dcb5ce1 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -42,7 +42,6 @@ private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv" private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference() -private val configurationModule = ConfigurationModule() private val sslContextFactory = SslContextFactory() private val maxCloseTime = Duration.ofSeconds(10) @@ -52,10 +51,10 @@ fun main(args: Array) { HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } } - + val configurationModule = ConfigurationModule(configStateListener) HealthCheckServer.start(configurationModule.healthCheckPort(args)).block() configurationModule - .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc) + .hvVesConfigurationUpdates(args, ServiceContext::mdc) .publishOn(Schedulers.single(Schedulers.elastic())) .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt index ceccbcba..e1886055 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt @@ -26,8 +26,14 @@ package org.onap.dcae.collectors.veshv.utils.rx import org.reactivestreams.Publisher +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.toMono +import java.time.Duration fun Publisher.then(callback: () -> Unit): Mono = toMono().then(Mono.fromCallable(callback)) + +fun delayElements(intervalSupplier: () -> Duration): (Flux) -> Flux = { flux -> + flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) } +} -- cgit 1.2.3-korg