aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-29 11:22:24 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-01 12:32:42 +0200
commit6725abbaa6249e107126ffd5ec58f2a96ce60eee (patch)
treef3fa6d11a04b60a631ee4160a69744b44e08e1ed /sources/hv-collector-configuration/src/main
parent4281a12d8e892f46f5f2226ee0f8aee8b862b177 (diff)
Move ConfigurationProvider to config module
Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-configuration/src/main')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt32
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt24
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt)2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt)0
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt)0
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt119
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt)2
9 files changed, 174 insertions, 14 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index dd1b171e..9684484b 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
-import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
+import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
class ConfigurationModule {
@@ -34,16 +40,28 @@ class ConfigurationModule {
private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
- private lateinit var initialConfig: HvVesConfiguration
-
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
- fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
+ fun hvVesConfigurationUpdates(args: Array<String>,
+ configStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Flux.just(cmd.getConfigurationFile(args))
.throwOnLeft { MissingArgumentException(it.message, it.cause) }
.map { it.reader().use(configReader::loadConfig) }
- .map { configValidator.validate(it) }
- .throwOnLeft { ValidationException(it.message) }
- .doOnNext { initialConfig = it }
+ .cache()
+ .flatMap { basePartialConfig ->
+ val baseConfig = configValidator.validate(basePartialConfig)
+ .rightOrThrow { ValidationException(it.message) }
+ val cbsConfigProvider = CbsConfigurationProvider(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ baseConfig.cbs,
+ configStateListener,
+ mdc)
+ val merger = ConfigurationMerger()
+ cbsConfigProvider()
+ .map { merger.merge(basePartialConfig, it) }
+ .map { configValidator.validate(it) }
+ .throwOnLeft { ValidationException(it.message) }
+ }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt
new file mode 100644
index 00000000..9fa6660e
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt
@@ -0,0 +1,24 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+interface ConfigurationStateListener {
+ fun retrying() {}
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index 3375821e..c1807be2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -47,7 +47,5 @@ data class CbsConfiguration(
)
data class CollectorConfiguration(
- val maxRequestSizeBytes: Int,
- val kafkaServers: String,
val routing: Routing
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
index 2fc29829..2fc29829 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
index e5a83ac4..e5a83ac4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
new file mode 100644
index 00000000..2038c31a
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -0,0 +1,119 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
+ private val cbsConfiguration: CbsConfiguration,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
+ private val configurationStateListener: ConfigurationStateListener,
+ retrySpec: Retry<Any>,
+ private val mdc: MappedDiagnosticContext
+
+) {
+ constructor(cbsClientMono: Mono<CbsClient>,
+ cbsConfig: CbsConfiguration,
+ configurationStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext) :
+ this(
+ cbsClientMono,
+ cbsConfig,
+ StreamFromGsonParsers.kafkaSinkParser(),
+ configurationStateListener,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(cbsConfig.requestInterval)
+ .jitter(Jitter.random()),
+ mdc
+ )
+
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception from configuration provider client, retrying subscription", it.exception())
+ }
+ configurationStateListener.retrying()
+ }
+
+ operator fun invoke(): Flux<PartialConfiguration> =
+ cbsClientMono
+ .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry)
+ .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
+ .flatMapMany(::handleUpdates)
+
+ private fun handleUpdates(cbsClient: CbsClient) = cbsClient
+ .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
+ cbsConfiguration.firstRequestDelay,
+ cbsConfiguration.requestInterval)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+ .map(::createRoutingDescription)
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
+ .retryWhen(retry)
+ .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) }
+
+ private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
+ val routes = DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .map { Route(it.name(), it) }
+ .asIterable()
+ .toList()
+ Some(routes)
+ } catch (e: NullPointerException) {
+ logger.withWarn(mdc) {
+ log("Invalid streams configuration", e)
+ }
+ None
+ }
+
+ companion object {
+ private const val MAX_RETRIES = 5L
+ private val logger = Logger(CbsConfigurationProvider::class)
+ }
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 04bba7e2..3e599b58 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -63,9 +63,7 @@ internal class ConfigurationValidator {
securityConfiguration,
// TOD0: swap when ConfigurationMerger is implemented
// collectorConfiguration
- CollectorConfiguration(-1,
- "I do not exist. I'm not even a URL :o",
- emptyList()),
+ CollectorConfiguration(emptyList()),
// end TOD0
logLevel
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index 3e93a400..c1a98294 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -19,7 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.*
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.getOrElse
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.CommandLineParser
import org.apache.commons.cli.DefaultParser
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index a27998e1..f3c149cd 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -54,6 +54,6 @@ internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
internal data class PartialCollectorConfig(
val maxRequestSizeBytes: Option<Int> = None,
- val kafkaServers: Option<List<InetSocketAddress>> = None,
+ val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part
val routing: Option<Routing> = None
)