From 325387e62a0793871dc1eb97f02a4ae90a977664 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Mon, 8 Apr 2019 13:48:42 +0200 Subject: Parse whole dynamic configuration Change-Id: I96e4cf3ac75920ed909da9063ba0b788b55474e4 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1386 --- .../veshv/config/api/ConfigurationModule.kt | 18 ++++---- .../veshv/config/api/model/configuration.kt | 4 +- .../veshv/config/impl/CbsConfigurationProvider.kt | 47 ++++++++++----------- .../veshv/config/impl/ConfigurationMerger.kt | 9 +--- .../veshv/config/impl/ConfigurationValidator.kt | 38 ++++++++++++----- .../veshv/config/impl/FileConfigurationReader.kt | 48 ---------------------- .../veshv/config/impl/JsonConfigurationParser.kt | 38 +++++++++++++++++ .../impl/gsonadapters/DurationOfSecondsAdapter.kt | 36 ---------------- .../veshv/config/impl/partial_configuration.kt | 17 ++++---- 9 files changed, 108 insertions(+), 147 deletions(-) delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae') diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index ccce62a4..93381572 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -26,8 +26,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator -import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono class ConfigurationModule { private val cmd = HvVesCommandLineParser() - private val configReader = FileConfigurationReader() + private val configParser = JsonConfigurationParser() private val configValidator = ConfigurationValidator() private val merger = ConfigurationMerger() @@ -51,10 +51,9 @@ class ConfigurationModule { mdc: MappedDiagnosticContext): Flux = Mono.just(cmd.getConfigurationFile(args)) .throwOnLeft(::MissingArgumentException) - .map { - logger.info { "Using base configuration file: ${it.absolutePath}" } - it.reader().use(configReader::loadConfig) - } + .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } } + .map { it.reader().use(configParser::parse) } + .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } } .cache() .flatMapMany { basePartialConfig -> cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) @@ -70,12 +69,13 @@ class ConfigurationModule { CbsConfigurationProvider( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), cbsConfigurationFrom(basePartialConfig), + configParser, configStateListener, mdc) - private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = - configValidator.validatedCbsConfiguration(basePartialConfig) - .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } + private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator + .validatedCbsConfiguration(basePartialConfig) + .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } companion object { private val logger = Logger(ConfigurationModule::class) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index c1807be2..f745d595 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -37,8 +37,8 @@ data class HvVesConfiguration( data class ServerConfiguration( val listenPort: Int, - val idleTimeout: Duration, - val maxPayloadSizeBytes: Int + val maxPayloadSizeBytes: Int, + val idleTimeout: Duration ) data class CbsConfiguration( diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index b6462936..4982c732 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -19,17 +19,14 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.None -import arrow.core.Option -import arrow.core.Some +import arrow.core.toOption import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcae.collectors.veshv.utils.reader import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient @@ -50,26 +47,29 @@ import reactor.retry.Retry */ internal class CbsConfigurationProvider(private val cbsClientMono: Mono, private val cbsConfiguration: CbsConfiguration, + private val configParser: JsonConfigurationParser, private val streamParser: StreamFromGsonParser, private val configurationStateListener: ConfigurationStateListener, - retrySpec: Retry, - private val mdc: MappedDiagnosticContext + private val mdc: MappedDiagnosticContext, + retrySpec: Retry ) { constructor(cbsClientMono: Mono, cbsConfig: CbsConfiguration, + configParser: JsonConfigurationParser, configurationStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext) : this( cbsClientMono, cbsConfig, + configParser, StreamFromGsonParsers.kafkaSinkParser(), configurationStateListener, + mdc, Retry.any() .retryMax(MAX_RETRIES) .fixedBackoff(cbsConfig.requestInterval) - .jitter(Jitter.random()), - mdc + .jitter(Jitter.random()) ) private val retry = retrySpec.doOnRetry { @@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono = try { - val routes = DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - Some(routes) - } catch (e: NullPointerException) { - logger.withWarn(mdc) { - log("Invalid streams configuration", e) - } - None - } + private fun parseConfiguration(json: JsonObject) = + configParser + .parse(json.reader()) + .apply { streamPublishers = extractStreamDefinitions(json).toOption() } + + private fun extractStreamDefinitions(configuration: JsonObject): List = + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .asIterable() + .toList() companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index 8e6bafc4..e782a1e7 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Option -import arrow.core.Some import arrow.core.getOrElse import arrow.core.toOption -import kotlin.reflect.KProperty0 -import kotlin.reflect.KProperty1 /** * @author Pawel Biniek @@ -47,15 +44,11 @@ internal class ConfigurationMerger { trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), trustStorePassword = base.trustStorePassword.updateToGivenOrNone(update.trustStorePassword), - routing = base.routing.updateToGivenOrNone(update.routing), + streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) ) private fun Option.updateToGivenOrNone(update: Option) = update.getOrElse(this::orNull).toOption() - } - - - diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index cfcc7d76..dddf0bed 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -23,19 +23,25 @@ import arrow.core.None import arrow.core.Option import arrow.core.Some import arrow.core.getOrElse +import arrow.core.toOption import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityKeysPaths import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords import java.io.File +import java.nio.file.Path +import java.time.Duration /** * @author Jakub Dudycz @@ -88,16 +94,16 @@ internal class ConfigurationValidator { partial.mapBinding { ServerConfiguration( it.listenPort.bind(), - it.idleTimeoutSec.bind(), - it.maxPayloadSizeBytes.bind() + it.maxPayloadSizeBytes.bind(), + Duration.ofSeconds(it.idleTimeoutSec.bind()) ) } internal fun validatedCbsConfiguration(partial: PartialConfiguration) = partial.mapBinding { CbsConfiguration( - it.firstRequestDelaySec.bind(), - it.requestIntervalSec.bind() + Duration.ofSeconds(it.firstRequestDelaySec.bind()), + Duration.ofSeconds(it.requestIntervalSec.bind()) ) } @@ -113,19 +119,31 @@ internal class ConfigurationValidator { private fun createSecurityConfiguration(partial: PartialConfiguration): Option = partial.mapBinding { SecurityConfiguration( - Option.fromNullable(SecurityKeysPaths( + createSecurityKeys( File(it.keyStoreFile.bind()).toPath(), it.keyStorePassword.bind(), File(it.trustStoreFile.bind()).toPath(), it.trustStorePassword.bind() - ).asImmutableSecurityKeys()) + ).toOption() ) } + private fun createSecurityKeys(keyStorePath: Path, + keyStorePassword: String, + trustStorePath: Path, + trustStorePassword: String) = + ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(keyStorePath)) + .keyStorePassword(Passwords.fromString(keyStorePassword)) + .trustStore(ImmutableSecurityKeysStore.of(trustStorePath)) + .trustStorePassword(Passwords.fromString(trustStorePassword)) + .build() + + private fun validatedCollectorConfig(partial: PartialConfiguration) = - partial.mapBinding { + partial.mapBinding { config -> CollectorConfiguration( - it.routing.bind() + config.streamPublishers.bind().map { Route(it.name(), it) } ) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt deleted file mode 100644 index 104ca78c..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import arrow.core.Option -import com.google.gson.GsonBuilder -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -import java.io.Reader -import java.time.Duration - -/** - * @author Pawel Biniek - * @since February 2019 - */ -internal class FileConfigurationReader { - private val gson = GsonBuilder() - .registerTypeAdapter(Option::class.java, OptionAdapter()) - .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter()) - .create() - - fun loadConfig(input: Reader): PartialConfiguration = - gson.fromJson(input, PartialConfiguration::class.java) - .also { logger.info { "Successfully read file and parsed json to configuration: $it" } } - - companion object { - private val logger = Logger(FileConfigurationReader::class) - } -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt new file mode 100644 index 00000000..0b3dd0d5 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Option +import com.google.gson.GsonBuilder +import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter +import java.io.Reader + +/** + * @author Pawel Biniek + * @since February 2019 + */ +internal class JsonConfigurationParser { + private val gson = GsonBuilder() + .registerTypeAdapter(Option::class.java, OptionAdapter()) + .create() + + fun parse(input: Reader): PartialConfiguration = + gson.fromJson(input, PartialConfiguration::class.java) +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt deleted file mode 100644 index 3bde7089..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl.gsonadapters - -import com.google.gson.JsonDeserializationContext -import com.google.gson.JsonDeserializer -import com.google.gson.JsonElement -import java.lang.reflect.Type -import java.time.Duration - -/** - * @author Pawel Biniek - * @since March 2019 - */ -internal class DurationOfSecondsAdapter : JsonDeserializer { - override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) = - Duration.ofSeconds(json.asLong) - -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index 0be2572d..30f6c3e3 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option import com.google.gson.annotations.SerializedName -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.time.Duration +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink /** * @author Pawel Biniek @@ -34,14 +33,14 @@ internal data class PartialConfiguration( @SerializedName("server.listenPort") val listenPort: Option = None, @SerializedName("server.idleTimeoutSec") - val idleTimeoutSec: Option = None, + val idleTimeoutSec: Option = None, @SerializedName("server.maxPayloadSizeBytes") val maxPayloadSizeBytes: Option = None, @SerializedName("cbs.firstRequestDelaySec") - val firstRequestDelaySec: Option = None, + val firstRequestDelaySec: Option = None, @SerializedName("cbs.requestIntervalSec") - val requestIntervalSec: Option = None, + val requestIntervalSec: Option = None, @SerializedName("security.sslDisable") val sslDisable: Option = None, @@ -54,9 +53,9 @@ internal data class PartialConfiguration( @SerializedName("security.keys.trustStorePassword") val trustStorePassword: Option = None, - @SerializedName("collector.routing") - val routing: Option = None, - @SerializedName("logLevel") - val logLevel: Option = None + val logLevel: Option = None, + + @Transient + var streamPublishers: Option> = None ) -- cgit 1.2.3-korg