aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src/main/kotlin/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src/main/kotlin/org/onap')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt18
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt47
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt9
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt38
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt)14
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt36
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt17
8 files changed, 72 insertions, 111 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index ccce62a4..93381572 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -26,8 +26,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
-import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono
class ConfigurationModule {
private val cmd = HvVesCommandLineParser()
- private val configReader = FileConfigurationReader()
+ private val configParser = JsonConfigurationParser()
private val configValidator = ConfigurationValidator()
private val merger = ConfigurationMerger()
@@ -51,10 +51,9 @@ class ConfigurationModule {
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Mono.just(cmd.getConfigurationFile(args))
.throwOnLeft(::MissingArgumentException)
- .map {
- logger.info { "Using base configuration file: ${it.absolutePath}" }
- it.reader().use(configReader::loadConfig)
- }
+ .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } }
+ .map { it.reader().use(configParser::parse) }
+ .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
@@ -70,12 +69,13 @@ class ConfigurationModule {
CbsConfigurationProvider(
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
cbsConfigurationFrom(basePartialConfig),
+ configParser,
configStateListener,
mdc)
- private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
- configValidator.validatedCbsConfiguration(basePartialConfig)
- .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+ private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator
+ .validatedCbsConfiguration(basePartialConfig)
+ .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
companion object {
private val logger = Logger(ConfigurationModule::class)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index c1807be2..f745d595 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -37,8 +37,8 @@ data class HvVesConfiguration(
data class ServerConfiguration(
val listenPort: Int,
- val idleTimeout: Duration,
- val maxPayloadSizeBytes: Int
+ val maxPayloadSizeBytes: Int,
+ val idleTimeout: Duration
)
data class CbsConfiguration(
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index b6462936..4982c732 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -19,17 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.Some
+import arrow.core.toOption
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.reader
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
@@ -50,26 +47,29 @@ import reactor.retry.Retry
*/
internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
private val cbsConfiguration: CbsConfiguration,
+ private val configParser: JsonConfigurationParser,
private val streamParser: StreamFromGsonParser<KafkaSink>,
private val configurationStateListener: ConfigurationStateListener,
- retrySpec: Retry<Any>,
- private val mdc: MappedDiagnosticContext
+ private val mdc: MappedDiagnosticContext,
+ retrySpec: Retry<Any>
) {
constructor(cbsClientMono: Mono<CbsClient>,
cbsConfig: CbsConfiguration,
+ configParser: JsonConfigurationParser,
configurationStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext) :
this(
cbsClientMono,
cbsConfig,
+ configParser,
StreamFromGsonParsers.kafkaSinkParser(),
configurationStateListener,
+ mdc,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(cbsConfig.requestInterval)
- .jitter(Jitter.random()),
- mdc
+ .jitter(Jitter.random())
)
private val retry = retrySpec.doOnRetry {
@@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
cbsConfiguration.firstRequestDelay,
cbsConfiguration.requestInterval)
.doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
- .map(::createRoutingDescription)
+ .map(::parseConfiguration)
+ .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
.onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
- .map { PartialConfiguration(routing = it) }
- private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
- val routes = DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
- .map(streamParser::unsafeParse)
- .map { Route(it.name(), it) }
- .asIterable()
- .toList()
- Some(routes)
- } catch (e: NullPointerException) {
- logger.withWarn(mdc) {
- log("Invalid streams configuration", e)
- }
- None
- }
+ private fun parseConfiguration(json: JsonObject) =
+ configParser
+ .parse(json.reader())
+ .apply { streamPublishers = extractStreamDefinitions(json).toOption() }
+
+ private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> =
+ DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .asIterable()
+ .toList()
companion object {
private const val MAX_RETRIES = 5L
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
index 8e6bafc4..e782a1e7 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Option
-import arrow.core.Some
import arrow.core.getOrElse
import arrow.core.toOption
-import kotlin.reflect.KProperty0
-import kotlin.reflect.KProperty1
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -47,15 +44,11 @@ internal class ConfigurationMerger {
trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
trustStorePassword = base.trustStorePassword.updateToGivenOrNone(update.trustStorePassword),
- routing = base.routing.updateToGivenOrNone(update.routing),
+ streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
)
private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
update.getOrElse(this::orNull).toOption()
-
}
-
-
-
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index cfcc7d76..dddf0bed 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -23,19 +23,25 @@ import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.core.getOrElse
+import arrow.core.toOption
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityKeysPaths
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
import java.io.File
+import java.nio.file.Path
+import java.time.Duration
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -88,16 +94,16 @@ internal class ConfigurationValidator {
partial.mapBinding {
ServerConfiguration(
it.listenPort.bind(),
- it.idleTimeoutSec.bind(),
- it.maxPayloadSizeBytes.bind()
+ it.maxPayloadSizeBytes.bind(),
+ Duration.ofSeconds(it.idleTimeoutSec.bind())
)
}
internal fun validatedCbsConfiguration(partial: PartialConfiguration) =
partial.mapBinding {
CbsConfiguration(
- it.firstRequestDelaySec.bind(),
- it.requestIntervalSec.bind()
+ Duration.ofSeconds(it.firstRequestDelaySec.bind()),
+ Duration.ofSeconds(it.requestIntervalSec.bind())
)
}
@@ -113,19 +119,31 @@ internal class ConfigurationValidator {
private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> =
partial.mapBinding {
SecurityConfiguration(
- Option.fromNullable(SecurityKeysPaths(
+ createSecurityKeys(
File(it.keyStoreFile.bind()).toPath(),
it.keyStorePassword.bind(),
File(it.trustStoreFile.bind()).toPath(),
it.trustStorePassword.bind()
- ).asImmutableSecurityKeys())
+ ).toOption()
)
}
+ private fun createSecurityKeys(keyStorePath: Path,
+ keyStorePassword: String,
+ trustStorePath: Path,
+ trustStorePassword: String) =
+ ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(keyStorePath))
+ .keyStorePassword(Passwords.fromString(keyStorePassword))
+ .trustStore(ImmutableSecurityKeysStore.of(trustStorePath))
+ .trustStorePassword(Passwords.fromString(trustStorePassword))
+ .build()
+
+
private fun validatedCollectorConfig(partial: PartialConfiguration) =
- partial.mapBinding {
+ partial.mapBinding { config ->
CollectorConfiguration(
- it.routing.bind()
+ config.streamPublishers.bind().map { Route(it.name(), it) }
)
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt
index 104ca78c..0b3dd0d5 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt
@@ -21,28 +21,18 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Option
import com.google.gson.GsonBuilder
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
import java.io.Reader
-import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
* @since February 2019
*/
-internal class FileConfigurationReader {
+internal class JsonConfigurationParser {
private val gson = GsonBuilder()
.registerTypeAdapter(Option::class.java, OptionAdapter())
- .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter())
.create()
- fun loadConfig(input: Reader): PartialConfiguration =
+ fun parse(input: Reader): PartialConfiguration =
gson.fromJson(input, PartialConfiguration::class.java)
- .also { logger.info { "Successfully read file and parsed json to configuration: $it" } }
-
- companion object {
- private val logger = Logger(FileConfigurationReader::class)
- }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
deleted file mode 100644
index 3bde7089..00000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
-import com.google.gson.JsonElement
-import java.lang.reflect.Type
-import java.time.Duration
-
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since March 2019
- */
-internal class DurationOfSecondsAdapter : JsonDeserializer<Duration> {
- override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) =
- Duration.ofSeconds(json.asLong)
-
-}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index 0be2572d..30f6c3e3 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.None
import arrow.core.Option
import com.google.gson.annotations.SerializedName
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.time.Duration
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -34,14 +33,14 @@ internal data class PartialConfiguration(
@SerializedName("server.listenPort")
val listenPort: Option<Int> = None,
@SerializedName("server.idleTimeoutSec")
- val idleTimeoutSec: Option<Duration> = None,
+ val idleTimeoutSec: Option<Long> = None,
@SerializedName("server.maxPayloadSizeBytes")
val maxPayloadSizeBytes: Option<Int> = None,
@SerializedName("cbs.firstRequestDelaySec")
- val firstRequestDelaySec: Option<Duration> = None,
+ val firstRequestDelaySec: Option<Long> = None,
@SerializedName("cbs.requestIntervalSec")
- val requestIntervalSec: Option<Duration> = None,
+ val requestIntervalSec: Option<Long> = None,
@SerializedName("security.sslDisable")
val sslDisable: Option<Boolean> = None,
@@ -54,9 +53,9 @@ internal data class PartialConfiguration(
@SerializedName("security.keys.trustStorePassword")
val trustStorePassword: Option<String> = None,
- @SerializedName("collector.routing")
- val routing: Option<Routing> = None,
-
@SerializedName("logLevel")
- val logLevel: Option<LogLevel> = None
+ val logLevel: Option<LogLevel> = None,
+
+ @Transient
+ var streamPublishers: Option<List<KafkaSink>> = None
)