aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-05-22 22:19:49 +0200
committerkjaniak <kornel.janiak@nokia.com>2019-06-05 16:01:22 +0200
commit1ddd723f22c64dfb8c414fc8573ebe993ed00578 (patch)
tree278aa739a0642cb40c358ffdf8f2e3d2e313425d
parentdc936d27d761bde31ac5916a84efa2f48ec32b83 (diff)
Support CBS request interval reconfiguration
Change-Id: Ie8892e33b2f6a58d6076f66e6cc6a2df830dfa48 Issue-ID: DCAEGEN2-1525 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt56
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt81
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt54
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt120
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt58
-rw-r--r--sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json8
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt5
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt6
8 files changed, 297 insertions, 91 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index ded75838..e243afe7 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
+import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
-class ConfigurationModule {
+class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener,
+ private val cbsClient: Mono<CbsClient>) {
private val cmd = HvVesCommandLineParser()
private val configParser = JsonConfigurationParser()
@@ -44,10 +50,15 @@ class ConfigurationModule {
private val configValidator = ConfigurationValidator()
private val configTransformer = ConfigurationTransformer()
+ constructor(configStateListener: ConfigurationStateListener) : this(
+ configStateListener,
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
+ )
+
+
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
- configStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Mono.just(cmd.getConfigurationFile(args))
.throwOnLeft(::MissingArgumentException)
@@ -56,23 +67,35 @@ class ConfigurationModule {
.doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
- cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
- .invoke()
- .map { configMerger.merge(basePartialConfig, it) }
- .map(configValidator::validate)
- .throwOnLeft()
- .map(configTransformer::toFinalConfiguration)
+ cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+ cbsConfigurationProvider(cbsClientAdapter, mdc)
+ .invoke()
+ .map { configMerger.merge(basePartialConfig, it) }
+ .map(configValidator::validate)
+ .throwOnLeft()
+ .map(configTransformer::toFinalConfiguration)
+ .doOnNext {
+ cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+ }
+ }
}
- private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
- configStateListener: ConfigurationStateListener,
+ private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
+ CbsClientAdapter(
+ cbsClient,
+ configStateListener,
+ cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+ retrySpec
+ )
+
+ private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
mdc: MappedDiagnosticContext) =
CbsConfigurationProvider(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- cbsConfigurationFrom(basePartialConfig),
+ cbsClientAdapter,
configParser,
configStateListener,
- mdc)
+ mdc,
+ retrySpec)
private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -80,6 +103,13 @@ class ConfigurationModule {
companion object {
private val logger = Logger(ConfigurationModule::class)
+
+ private const val MAX_RETRIES = 5L
+ private const val INITIAL_BACKOFF = 10L
+ private val retrySpec: Retry<Any> = Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+ .jitter(Jitter.random())
}
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
new file mode 100644
index 00000000..d31f6585
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.rx.delayElements
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicReference
+
+
+internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
+ private val configurationStateListener: ConfigurationStateListener,
+ private val firstRequestDelay: Duration,
+ private val retrySpec: Retry<Any>) {
+
+ private val requestInterval = AtomicReference<Duration>(Duration.ZERO)
+
+ fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono
+ .doOnNext {
+ logger.info(mdc) {
+ "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s"
+ }
+ }
+ .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry(mdc))
+ .delayElement(firstRequestDelay)
+ .flatMapMany(::toPeriodicalConfigurations)
+ .distinctUntilChanged()
+
+ fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) {
+ requestInterval.set(intervalUpdate)
+ logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" }
+ }
+
+ private fun toPeriodicalConfigurations(cbsClient: CbsClient) =
+ Mono.just(configurationRequest())
+ .repeat()
+ .map(CbsRequest::withNewInvocationId)
+ .flatMap(cbsClient::get)
+ .transform(delayElements(requestInterval::get))
+
+ private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
+
+ private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception from HV-VES cbs client, retrying subscription", it.exception())
+ }
+ configurationStateListener.retrying()
+ }
+
+ companion object {
+ private val logger = Logger(CbsClientAdapter::class)
+ }
+
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index 4982c732..6efa38e6 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -22,56 +22,31 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.toOption
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcae.collectors.veshv.utils.reader
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
import reactor.retry.Retry
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
- private val cbsConfiguration: CbsConfiguration,
+internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientAdapter,
private val configParser: JsonConfigurationParser,
- private val streamParser: StreamFromGsonParser<KafkaSink>,
private val configurationStateListener: ConfigurationStateListener,
private val mdc: MappedDiagnosticContext,
- retrySpec: Retry<Any>
-
+ retrySpec: Retry<Any>,
+ private val streamParser: StreamFromGsonParser<KafkaSink> =
+ StreamFromGsonParsers.kafkaSinkParser()
) {
- constructor(cbsClientMono: Mono<CbsClient>,
- cbsConfig: CbsConfiguration,
- configParser: JsonConfigurationParser,
- configurationStateListener: ConfigurationStateListener,
- mdc: MappedDiagnosticContext) :
- this(
- cbsClientMono,
- cbsConfig,
- configParser,
- StreamFromGsonParsers.kafkaSinkParser(),
- configurationStateListener,
- mdc,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(cbsConfig.requestInterval)
- .jitter(Jitter.random())
- )
-
private val retry = retrySpec.doOnRetry {
logger.withWarn(mdc) {
log("Exception from configuration provider client, retrying subscription", it.exception())
@@ -80,22 +55,12 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
}
operator fun invoke(): Flux<PartialConfiguration> =
- cbsClientMono
- .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
- .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ cbsClientAdapter.configurationUpdates(mdc)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+ .map(::parseConfiguration)
+ .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
- .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
- .flatMapMany(::handleUpdates)
-
- private fun handleUpdates(cbsClient: CbsClient) = cbsClient
- .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
- cbsConfiguration.firstRequestDelay,
- cbsConfiguration.requestInterval)
- .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
- .map(::parseConfiguration)
- .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
- .onErrorLog(logger, mdc) { "Error while creating configuration" }
- .retryWhen(retry)
private fun parseConfiguration(json: JsonObject) =
configParser
@@ -110,7 +75,6 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
.toList()
companion object {
- private const val MAX_RETRIES = 5L
private val logger = Logger(CbsConfigurationProvider::class)
}
}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
new file mode 100644
index 00000000..1b2dbc2b
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+import arrow.core.Option
+import com.google.gson.JsonParser
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.model.*
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Mono
+import reactor.test.StepVerifier
+import java.time.Duration
+
+
+internal object ConfigurationModuleIT : Spek({
+ describe("configuration module") {
+ val cbsClientMock = mock<CbsClient>()
+ val configStateListenerMock = mock<ConfigurationStateListener>()
+ val sut = ConfigurationModule(configStateListenerMock, Mono.just(cbsClientMock))
+ val configPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json")
+
+ given("sample configuration in file: $configPath") {
+ val arguments = arrayOf(
+ "--configuration-file",
+ configPath,
+ "--health-check-api-port",
+ "6062")
+ on("configuration changes in Config Binding Service") {
+ whenever(cbsClientMock.get(any()))
+ .thenReturn(
+ Mono.just(configurationJsonWithIntervalChanged),
+ Mono.just(configurationJsonWithIntervalChangedAgain),
+ Mono.just(configurationJsonWithIntervalRestored)
+ )
+ it("should wait $firstRequestDelayFromFile s as provided in configuration file and later" +
+ " fetch configurations in intervals specified within them") {
+ StepVerifier
+ .withVirtualTime {
+ sut.hvVesConfigurationUpdates(arguments, sampleMdc)
+ .take(3)
+ }
+ .expectSubscription()
+ .expectNoEvent(firstRequestDelayFromFile)
+ .expectNext(configurationWithIntervalChanged)
+ .expectNoEvent(requestIntervalFromCBS)
+ .expectNext(configurationWithIntervalChangedAgain)
+ .expectNoEvent(anotherRequestIntervalFromCBS)
+ .expectNext(configurationWithIntervalRestored)
+ .verifyComplete()
+ }
+ }
+ }
+ }
+})
+
+private val firstRequestDelayFromFile = Duration.ofSeconds(3)
+private val firstRequestDelayFromCBS = Duration.ofSeconds(999)
+private val requestIntervalFromCBS = Duration.ofSeconds(10)
+private val anotherRequestIntervalFromCBS = Duration.ofSeconds(20)
+
+private val sampleMdc = { mapOf("k" to "v") }
+private val emptyRouting = listOf<Route>()
+
+private val configurationJsonWithIntervalChanged = JsonParser().parse("""{
+ "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
+}""").asJsonObject
+
+private val configurationJsonWithIntervalChangedAgain = JsonParser().parse("""{
+ "cbs.firstRequestDelaySec": ${firstRequestDelayFromCBS.seconds},
+ "cbs.requestIntervalSec": ${anotherRequestIntervalFromCBS.seconds}
+}""").asJsonObject
+
+private val configurationJsonWithIntervalRestored = JsonParser().parse("""{
+ "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
+}""").asJsonObject
+
+private val configurationWithIntervalChanged =
+ hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
+
+private val configurationWithIntervalChangedAgain =
+ hvVesConfiguration(firstRequestDelayFromCBS, anotherRequestIntervalFromCBS)
+
+private val configurationWithIntervalRestored =
+ hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
+
+private fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration): HvVesConfiguration {
+ return HvVesConfiguration(
+ ServerConfiguration(6061, Duration.ofSeconds(60)),
+ CbsConfiguration(firstRequestDelay, requestInterval),
+ SecurityConfiguration(Option.empty()),
+ CollectorConfiguration(emptyRouting, 1024 * 1024),
+ LogLevel.DEBUG)
+} \ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
index 8c3c22aa..31415454 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Some
import com.google.gson.JsonParser
import com.nhaarman.mockitokotlin2.any
-import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.times
import com.nhaarman.mockitokotlin2.verify
@@ -34,10 +33,8 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -52,12 +49,15 @@ internal object CbsConfigurationProviderTest : Spek({
describe("Configuration provider") {
- val cbsClient = mock<CbsClient>()
- val cbsClientMock = Mono.just(cbsClient)
+ val cbsClientAdapter = mock<CbsClientAdapter>()
val configStateListener = mock<ConfigurationStateListener>()
given("configuration is never in cbs") {
- val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
+ val cbsClientMock = mock<CbsClient>()
+ val configProvider = constructConfigurationProvider(
+ constructCbsClientAdapter(cbsClientMock, configStateListener),
+ configStateListener
+ )
on("waiting for configuration") {
val waitTime = Duration.ofMillis(100)
@@ -70,16 +70,16 @@ internal object CbsConfigurationProviderTest : Spek({
}
given("valid configuration from cbs") {
- val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
+ val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener)
on("new configuration") {
- whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ whenever(cbsClientAdapter.configurationUpdates(any()))
.thenReturn(Flux.just(validConfiguration))
it("should use received configuration") {
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
-
+ assertThat(it.requestIntervalSec).isEqualTo(Some(5L))
assertThat(it.listenPort).isEqualTo(Some(6061))
assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
@@ -106,11 +106,11 @@ internal object CbsConfigurationProviderTest : Spek({
given("invalid configuration from cbs") {
val iterationCount = 3L
val configProvider = constructConfigurationProvider(
- cbsClientMock, configStateListener, iterationCount
+ cbsClientAdapter, configStateListener, iterationCount
)
on("new configuration") {
- whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ whenever(cbsClientAdapter.configurationUpdates(any()))
.thenReturn(Flux.just(invalidConfiguration))
it("should interrupt the flux") {
@@ -146,6 +146,7 @@ private val validConfiguration = JsonParser().parse("""
{
"server.listenPort": 6061,
"server.idleTimeoutSec": 60,
+ "cbs.requestIntervalSec": 5,
"streams_publishes": {
"$PERF3GPP_REGIONAL": {
"type": "kafka",
@@ -190,26 +191,23 @@ private val invalidConfiguration = JsonParser().parse("""
}""").asJsonObject
private val firstRequestDelay = Duration.ofMillis(1)
-private val requestInterval = Duration.ofMillis(1)
-private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
private val configParser = JsonConfigurationParser()
-private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+private fun retry(iterationCount: Long = 1) = Retry
+ .onlyIf<Any> { it.iteration() <= iterationCount }
+ .fixedBackoff(Duration.ofNanos(1))
+
+private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) =
+ CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry())
+
+private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
configurationStateListener: ConfigurationStateListener,
iterationCount: Long = 1
-): CbsConfigurationProvider {
-
- val retry = Retry
- .onlyIf<Any> { it.iteration() <= iterationCount }
- .fixedBackoff(Duration.ofNanos(1))
-
- return CbsConfigurationProvider(
- cbsClientMono,
- CbsConfiguration(firstRequestDelay, requestInterval),
- configParser,
- streamParser,
- configurationStateListener,
- { mapOf("k" to "v") },
- retry
- )
-}
+): CbsConfigurationProvider =
+ CbsConfigurationProvider(
+ cbsClientAdapter,
+ configParser,
+ configurationStateListener,
+ { mapOf("k" to "v") },
+ retry(iterationCount)
+ )
diff --git a/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json b/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json
new file mode 100644
index 00000000..4fc59212
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json
@@ -0,0 +1,8 @@
+{
+ "logLevel": "DEBUG",
+ "server.listenPort": 6061,
+ "server.idleTimeoutSec": 60,
+ "cbs.firstRequestDelaySec": 3,
+ "cbs.requestIntervalSec": 5,
+ "security.sslDisable": "true"
+} \ No newline at end of file
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 123d2dc9..3dcb5ce1 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -42,7 +42,6 @@ private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
private val logger = Logger("$VES_HV_PACKAGE.main")
private val hvVesServer = AtomicReference<ServerHandle>()
-private val configurationModule = ConfigurationModule()
private val sslContextFactory = SslContextFactory()
private val maxCloseTime = Duration.ofSeconds(10)
@@ -52,10 +51,10 @@ fun main(args: Array<String>) {
HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
}
-
+ val configurationModule = ConfigurationModule(configStateListener)
HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
configurationModule
- .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
+ .hvVesConfigurationUpdates(args, ServiceContext::mdc)
.publishOn(Schedulers.single(Schedulers.elastic()))
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
index ceccbcba..e1886055 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
@@ -26,8 +26,14 @@
package org.onap.dcae.collectors.veshv.utils.rx
import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toMono
+import java.time.Duration
fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
toMono().then(Mono.fromCallable(callback))
+
+fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux ->
+ flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) }
+}