diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-04-08 13:48:42 +0200 |
---|---|---|
committer | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-04-09 15:50:41 +0200 |
commit | 325387e62a0793871dc1eb97f02a4ae90a977664 (patch) | |
tree | e61544a1df539fffd2ae5efc5961155306bfb347 /sources/hv-collector-configuration/src/main | |
parent | e55809c0219be0898138c436d82ceba212b92df9 (diff) |
Parse whole dynamic configuration
Change-Id: I96e4cf3ac75920ed909da9063ba0b788b55474e4
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1386
Diffstat (limited to 'sources/hv-collector-configuration/src/main')
8 files changed, 72 insertions, 111 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index ccce62a4..93381572 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -26,8 +26,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator -import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono class ConfigurationModule { private val cmd = HvVesCommandLineParser() - private val configReader = FileConfigurationReader() + private val configParser = JsonConfigurationParser() private val configValidator = ConfigurationValidator() private val merger = ConfigurationMerger() @@ -51,10 +51,9 @@ class ConfigurationModule { mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> = Mono.just(cmd.getConfigurationFile(args)) .throwOnLeft(::MissingArgumentException) - .map { - logger.info { "Using base configuration file: ${it.absolutePath}" } - it.reader().use(configReader::loadConfig) - } + .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } } + .map { it.reader().use(configParser::parse) } + .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } } .cache() .flatMapMany { basePartialConfig -> cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) @@ -70,12 +69,13 @@ class ConfigurationModule { CbsConfigurationProvider( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), cbsConfigurationFrom(basePartialConfig), + configParser, configStateListener, mdc) - private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = - configValidator.validatedCbsConfiguration(basePartialConfig) - .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } + private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator + .validatedCbsConfiguration(basePartialConfig) + .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } companion object { private val logger = Logger(ConfigurationModule::class) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index c1807be2..f745d595 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -37,8 +37,8 @@ data class HvVesConfiguration( data class ServerConfiguration( val listenPort: Int, - val idleTimeout: Duration, - val maxPayloadSizeBytes: Int + val maxPayloadSizeBytes: Int, + val idleTimeout: Duration ) data class CbsConfiguration( diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index b6462936..4982c732 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -19,17 +19,14 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.None -import arrow.core.Option -import arrow.core.Some +import arrow.core.toOption import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcae.collectors.veshv.utils.reader import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient @@ -50,26 +47,29 @@ import reactor.retry.Retry */ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>, private val cbsConfiguration: CbsConfiguration, + private val configParser: JsonConfigurationParser, private val streamParser: StreamFromGsonParser<KafkaSink>, private val configurationStateListener: ConfigurationStateListener, - retrySpec: Retry<Any>, - private val mdc: MappedDiagnosticContext + private val mdc: MappedDiagnosticContext, + retrySpec: Retry<Any> ) { constructor(cbsClientMono: Mono<CbsClient>, cbsConfig: CbsConfiguration, + configParser: JsonConfigurationParser, configurationStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext) : this( cbsClientMono, cbsConfig, + configParser, StreamFromGsonParsers.kafkaSinkParser(), configurationStateListener, + mdc, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(cbsConfig.requestInterval) - .jitter(Jitter.random()), - mdc + .jitter(Jitter.random()) ) private val retry = retrySpec.doOnRetry { @@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien cbsConfiguration.firstRequestDelay, cbsConfiguration.requestInterval) .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } - .map(::createRoutingDescription) + .map(::parseConfiguration) + .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } .onErrorLog(logger, mdc) { "Error while creating configuration" } .retryWhen(retry) - .map { PartialConfiguration(routing = it) } - private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try { - val routes = DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - Some(routes) - } catch (e: NullPointerException) { - logger.withWarn(mdc) { - log("Invalid streams configuration", e) - } - None - } + private fun parseConfiguration(json: JsonObject) = + configParser + .parse(json.reader()) + .apply { streamPublishers = extractStreamDefinitions(json).toOption() } + + private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> = + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .asIterable() + .toList() companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index 8e6bafc4..e782a1e7 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Option -import arrow.core.Some import arrow.core.getOrElse import arrow.core.toOption -import kotlin.reflect.KProperty0 -import kotlin.reflect.KProperty1 /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -47,15 +44,11 @@ internal class ConfigurationMerger { trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), trustStorePassword = base.trustStorePassword.updateToGivenOrNone(update.trustStorePassword), - routing = base.routing.updateToGivenOrNone(update.routing), + streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) ) private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) = update.getOrElse(this::orNull).toOption() - } - - - diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index cfcc7d76..dddf0bed 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -23,19 +23,25 @@ import arrow.core.None import arrow.core.Option import arrow.core.Some import arrow.core.getOrElse +import arrow.core.toOption import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityKeysPaths import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords import java.io.File +import java.nio.file.Path +import java.time.Duration /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -88,16 +94,16 @@ internal class ConfigurationValidator { partial.mapBinding { ServerConfiguration( it.listenPort.bind(), - it.idleTimeoutSec.bind(), - it.maxPayloadSizeBytes.bind() + it.maxPayloadSizeBytes.bind(), + Duration.ofSeconds(it.idleTimeoutSec.bind()) ) } internal fun validatedCbsConfiguration(partial: PartialConfiguration) = partial.mapBinding { CbsConfiguration( - it.firstRequestDelaySec.bind(), - it.requestIntervalSec.bind() + Duration.ofSeconds(it.firstRequestDelaySec.bind()), + Duration.ofSeconds(it.requestIntervalSec.bind()) ) } @@ -113,19 +119,31 @@ internal class ConfigurationValidator { private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> = partial.mapBinding { SecurityConfiguration( - Option.fromNullable(SecurityKeysPaths( + createSecurityKeys( File(it.keyStoreFile.bind()).toPath(), it.keyStorePassword.bind(), File(it.trustStoreFile.bind()).toPath(), it.trustStorePassword.bind() - ).asImmutableSecurityKeys()) + ).toOption() ) } + private fun createSecurityKeys(keyStorePath: Path, + keyStorePassword: String, + trustStorePath: Path, + trustStorePassword: String) = + ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(keyStorePath)) + .keyStorePassword(Passwords.fromString(keyStorePassword)) + .trustStore(ImmutableSecurityKeysStore.of(trustStorePath)) + .trustStorePassword(Passwords.fromString(trustStorePassword)) + .build() + + private fun validatedCollectorConfig(partial: PartialConfiguration) = - partial.mapBinding { + partial.mapBinding { config -> CollectorConfiguration( - it.routing.bind() + config.streamPublishers.bind().map { Route(it.name(), it) } ) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt index 104ca78c..0b3dd0d5 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt @@ -21,28 +21,18 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Option import com.google.gson.GsonBuilder -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter -import org.onap.dcae.collectors.veshv.utils.logging.Logger - import java.io.Reader -import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> * @since February 2019 */ -internal class FileConfigurationReader { +internal class JsonConfigurationParser { private val gson = GsonBuilder() .registerTypeAdapter(Option::class.java, OptionAdapter()) - .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter()) .create() - fun loadConfig(input: Reader): PartialConfiguration = + fun parse(input: Reader): PartialConfiguration = gson.fromJson(input, PartialConfiguration::class.java) - .also { logger.info { "Successfully read file and parsed json to configuration: $it" } } - - companion object { - private val logger = Logger(FileConfigurationReader::class) - } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt deleted file mode 100644 index 3bde7089..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl.gsonadapters - -import com.google.gson.JsonDeserializationContext -import com.google.gson.JsonDeserializer -import com.google.gson.JsonElement -import java.lang.reflect.Type -import java.time.Duration - -/** - * @author Pawel Biniek <pawel.biniek@nokia.com> - * @since March 2019 - */ -internal class DurationOfSecondsAdapter : JsonDeserializer<Duration> { - override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) = - Duration.ofSeconds(json.asLong) - -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index 0be2572d..30f6c3e3 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option import com.google.gson.annotations.SerializedName -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.time.Duration +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -34,14 +33,14 @@ internal data class PartialConfiguration( @SerializedName("server.listenPort") val listenPort: Option<Int> = None, @SerializedName("server.idleTimeoutSec") - val idleTimeoutSec: Option<Duration> = None, + val idleTimeoutSec: Option<Long> = None, @SerializedName("server.maxPayloadSizeBytes") val maxPayloadSizeBytes: Option<Int> = None, @SerializedName("cbs.firstRequestDelaySec") - val firstRequestDelaySec: Option<Duration> = None, + val firstRequestDelaySec: Option<Long> = None, @SerializedName("cbs.requestIntervalSec") - val requestIntervalSec: Option<Duration> = None, + val requestIntervalSec: Option<Long> = None, @SerializedName("security.sslDisable") val sslDisable: Option<Boolean> = None, @@ -54,9 +53,9 @@ internal data class PartialConfiguration( @SerializedName("security.keys.trustStorePassword") val trustStorePassword: Option<String> = None, - @SerializedName("collector.routing") - val routing: Option<Routing> = None, - @SerializedName("logLevel") - val logLevel: Option<LogLevel> = None + val logLevel: Option<LogLevel> = None, + + @Transient + var streamPublishers: Option<List<KafkaSink>> = None ) |