aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-10 06:23:20 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-10 06:23:20 +0000
commitc9829d23c12b2824a0d56ee6efbd00ad67b9046e (patch)
tree383f11c46d941f5596e6f775b2c40d680e36aea8 /sources/hv-collector-configuration
parent8809461fd30a83335ede35b342fe4425d2df840d (diff)
parent325387e62a0793871dc1eb97f02a4ae90a977664 (diff)
Merge "Parse whole dynamic configuration"
Diffstat (limited to 'sources/hv-collector-configuration')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt18
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt47
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt9
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt38
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt)14
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt36
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt17
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt68
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt16
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt101
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt (renamed from sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt)31
12 files changed, 193 insertions, 206 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index ccce62a4..93381572 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -26,8 +26,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
-import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono
class ConfigurationModule {
private val cmd = HvVesCommandLineParser()
- private val configReader = FileConfigurationReader()
+ private val configParser = JsonConfigurationParser()
private val configValidator = ConfigurationValidator()
private val merger = ConfigurationMerger()
@@ -51,10 +51,9 @@ class ConfigurationModule {
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Mono.just(cmd.getConfigurationFile(args))
.throwOnLeft(::MissingArgumentException)
- .map {
- logger.info { "Using base configuration file: ${it.absolutePath}" }
- it.reader().use(configReader::loadConfig)
- }
+ .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } }
+ .map { it.reader().use(configParser::parse) }
+ .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
@@ -70,12 +69,13 @@ class ConfigurationModule {
CbsConfigurationProvider(
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
cbsConfigurationFrom(basePartialConfig),
+ configParser,
configStateListener,
mdc)
- private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
- configValidator.validatedCbsConfiguration(basePartialConfig)
- .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+ private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator
+ .validatedCbsConfiguration(basePartialConfig)
+ .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
companion object {
private val logger = Logger(ConfigurationModule::class)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index c1807be2..f745d595 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -37,8 +37,8 @@ data class HvVesConfiguration(
data class ServerConfiguration(
val listenPort: Int,
- val idleTimeout: Duration,
- val maxPayloadSizeBytes: Int
+ val maxPayloadSizeBytes: Int,
+ val idleTimeout: Duration
)
data class CbsConfiguration(
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index b6462936..4982c732 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -19,17 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.Some
+import arrow.core.toOption
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.reader
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
@@ -50,26 +47,29 @@ import reactor.retry.Retry
*/
internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
private val cbsConfiguration: CbsConfiguration,
+ private val configParser: JsonConfigurationParser,
private val streamParser: StreamFromGsonParser<KafkaSink>,
private val configurationStateListener: ConfigurationStateListener,
- retrySpec: Retry<Any>,
- private val mdc: MappedDiagnosticContext
+ private val mdc: MappedDiagnosticContext,
+ retrySpec: Retry<Any>
) {
constructor(cbsClientMono: Mono<CbsClient>,
cbsConfig: CbsConfiguration,
+ configParser: JsonConfigurationParser,
configurationStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext) :
this(
cbsClientMono,
cbsConfig,
+ configParser,
StreamFromGsonParsers.kafkaSinkParser(),
configurationStateListener,
+ mdc,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(cbsConfig.requestInterval)
- .jitter(Jitter.random()),
- mdc
+ .jitter(Jitter.random())
)
private val retry = retrySpec.doOnRetry {
@@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
cbsConfiguration.firstRequestDelay,
cbsConfiguration.requestInterval)
.doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
- .map(::createRoutingDescription)
+ .map(::parseConfiguration)
+ .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
.onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
- .map { PartialConfiguration(routing = it) }
- private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
- val routes = DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
- .map(streamParser::unsafeParse)
- .map { Route(it.name(), it) }
- .asIterable()
- .toList()
- Some(routes)
- } catch (e: NullPointerException) {
- logger.withWarn(mdc) {
- log("Invalid streams configuration", e)
- }
- None
- }
+ private fun parseConfiguration(json: JsonObject) =
+ configParser
+ .parse(json.reader())
+ .apply { streamPublishers = extractStreamDefinitions(json).toOption() }
+
+ private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> =
+ DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .asIterable()
+ .toList()
companion object {
private const val MAX_RETRIES = 5L
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
index 8e6bafc4..e782a1e7 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Option
-import arrow.core.Some
import arrow.core.getOrElse
import arrow.core.toOption
-import kotlin.reflect.KProperty0
-import kotlin.reflect.KProperty1
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -47,15 +44,11 @@ internal class ConfigurationMerger {
trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
trustStorePassword = base.trustStorePassword.updateToGivenOrNone(update.trustStorePassword),
- routing = base.routing.updateToGivenOrNone(update.routing),
+ streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
)
private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
update.getOrElse(this::orNull).toOption()
-
}
-
-
-
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index cfcc7d76..dddf0bed 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -23,19 +23,25 @@ import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.core.getOrElse
+import arrow.core.toOption
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityKeysPaths
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
import java.io.File
+import java.nio.file.Path
+import java.time.Duration
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -88,16 +94,16 @@ internal class ConfigurationValidator {
partial.mapBinding {
ServerConfiguration(
it.listenPort.bind(),
- it.idleTimeoutSec.bind(),
- it.maxPayloadSizeBytes.bind()
+ it.maxPayloadSizeBytes.bind(),
+ Duration.ofSeconds(it.idleTimeoutSec.bind())
)
}
internal fun validatedCbsConfiguration(partial: PartialConfiguration) =
partial.mapBinding {
CbsConfiguration(
- it.firstRequestDelaySec.bind(),
- it.requestIntervalSec.bind()
+ Duration.ofSeconds(it.firstRequestDelaySec.bind()),
+ Duration.ofSeconds(it.requestIntervalSec.bind())
)
}
@@ -113,19 +119,31 @@ internal class ConfigurationValidator {
private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> =
partial.mapBinding {
SecurityConfiguration(
- Option.fromNullable(SecurityKeysPaths(
+ createSecurityKeys(
File(it.keyStoreFile.bind()).toPath(),
it.keyStorePassword.bind(),
File(it.trustStoreFile.bind()).toPath(),
it.trustStorePassword.bind()
- ).asImmutableSecurityKeys())
+ ).toOption()
)
}
+ private fun createSecurityKeys(keyStorePath: Path,
+ keyStorePassword: String,
+ trustStorePath: Path,
+ trustStorePassword: String) =
+ ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(keyStorePath))
+ .keyStorePassword(Passwords.fromString(keyStorePassword))
+ .trustStore(ImmutableSecurityKeysStore.of(trustStorePath))
+ .trustStorePassword(Passwords.fromString(trustStorePassword))
+ .build()
+
+
private fun validatedCollectorConfig(partial: PartialConfiguration) =
- partial.mapBinding {
+ partial.mapBinding { config ->
CollectorConfiguration(
- it.routing.bind()
+ config.streamPublishers.bind().map { Route(it.name(), it) }
)
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt
index 104ca78c..0b3dd0d5 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt
@@ -21,28 +21,18 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Option
import com.google.gson.GsonBuilder
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
import java.io.Reader
-import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
* @since February 2019
*/
-internal class FileConfigurationReader {
+internal class JsonConfigurationParser {
private val gson = GsonBuilder()
.registerTypeAdapter(Option::class.java, OptionAdapter())
- .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter())
.create()
- fun loadConfig(input: Reader): PartialConfiguration =
+ fun parse(input: Reader): PartialConfiguration =
gson.fromJson(input, PartialConfiguration::class.java)
- .also { logger.info { "Successfully read file and parsed json to configuration: $it" } }
-
- companion object {
- private val logger = Logger(FileConfigurationReader::class)
- }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
deleted file mode 100644
index 3bde7089..00000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
-import com.google.gson.JsonElement
-import java.lang.reflect.Type
-import java.time.Duration
-
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since March 2019
- */
-internal class DurationOfSecondsAdapter : JsonDeserializer<Duration> {
- override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) =
- Duration.ofSeconds(json.asLong)
-
-}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index 0be2572d..30f6c3e3 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.None
import arrow.core.Option
import com.google.gson.annotations.SerializedName
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.time.Duration
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -34,14 +33,14 @@ internal data class PartialConfiguration(
@SerializedName("server.listenPort")
val listenPort: Option<Int> = None,
@SerializedName("server.idleTimeoutSec")
- val idleTimeoutSec: Option<Duration> = None,
+ val idleTimeoutSec: Option<Long> = None,
@SerializedName("server.maxPayloadSizeBytes")
val maxPayloadSizeBytes: Option<Int> = None,
@SerializedName("cbs.firstRequestDelaySec")
- val firstRequestDelaySec: Option<Duration> = None,
+ val firstRequestDelaySec: Option<Long> = None,
@SerializedName("cbs.requestIntervalSec")
- val requestIntervalSec: Option<Duration> = None,
+ val requestIntervalSec: Option<Long> = None,
@SerializedName("security.sslDisable")
val sslDisable: Option<Boolean> = None,
@@ -54,9 +53,9 @@ internal data class PartialConfiguration(
@SerializedName("security.keys.trustStorePassword")
val trustStorePassword: Option<String> = None,
- @SerializedName("collector.routing")
- val routing: Option<Routing> = None,
-
@SerializedName("logLevel")
- val logLevel: Option<LogLevel> = None
+ val logLevel: Option<LogLevel> = None,
+
+ @Transient
+ var streamPublishers: Option<List<KafkaSink>> = None
)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
index d5fe588e..94eb519d 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
+import arrow.core.Some
import com.google.gson.JsonParser
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
@@ -51,9 +52,9 @@ internal object CbsConfigurationProviderTest : Spek({
describe("Configuration provider") {
- val cbsClient: CbsClient = mock()
- val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
- val configStateListener: ConfigurationStateListener = mock()
+ val cbsClient = mock<CbsClient>()
+ val cbsClientMock = Mono.just(cbsClient)
+ val configStateListener = mock<ConfigurationStateListener>()
given("configuration is never in cbs") {
val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
@@ -78,29 +79,32 @@ internal object CbsConfigurationProviderTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val routes = it.routing.orNull()!!
- val route1 = routes.elementAt(0)
- val route2 = routes.elementAt(1)
- val receivedSink1 = route1.sink
- val receivedSink2 = route2.sink
-
- assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
- assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
- assertThat(receivedSink1.bootstrapServers())
+
+ assertThat(it.listenPort).isEqualTo(Some(6061))
+ assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
+ assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
+
+
+ val sinks = it.streamPublishers.orNull()!!
+ val sink1 = sinks[0]
+ val sink2 = sinks[1]
+
+ assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
+ assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
+ assertThat(sink1.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
- assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+ assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
- assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
- assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
- assertThat(receivedSink2.bootstrapServers())
+ assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
+ assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
+ assertThat(sink2.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
- assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
-
+ assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
}.verifyComplete()
}
}
-
}
+
given("invalid configuration from cbs") {
val iterationCount = 3L
val configProvider = constructConfigurationProvider(
@@ -112,7 +116,8 @@ internal object CbsConfigurationProviderTest : Spek({
.thenReturn(Flux.just(invalidConfiguration))
it("should interrupt the flux") {
- StepVerifier.create(configProvider())
+ StepVerifier
+ .create(configProvider())
.verifyError()
}
@@ -126,8 +131,8 @@ internal object CbsConfigurationProviderTest : Spek({
})
-val PERF3GPP_REGIONAL = "perf3gpp_regional"
-val PERF3GPP_CENTRAL = "perf3gpp_central"
+private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
+private const val PERF3GPP_CENTRAL = "perf3gpp_central"
private val aafCredentials1 = ImmutableAafCredentials.builder()
.username("client")
@@ -141,6 +146,9 @@ private val aafCredentials2 = ImmutableAafCredentials.builder()
private val validConfiguration = JsonParser().parse("""
{
+ "server.listenPort": 6061,
+ "server.idleTimeoutSec": 60,
+ "server.maxPayloadSizeBytes": 1048576,
"streams_publishes": {
"$PERF3GPP_REGIONAL": {
"type": "kafka",
@@ -173,12 +181,12 @@ private val invalidConfiguration = JsonParser().parse("""
"$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
- "username": "client",
+ "user": "client",
"password": "very secure password"
},
"kafka_info": {
- "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
- "popic_name": "REG_HVVES_PERF3GPP"
+ "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "name": "REG_HVVES_PERF3GPP"
}
}
}
@@ -187,20 +195,24 @@ private val invalidConfiguration = JsonParser().parse("""
private val firstRequestDelay = Duration.ofMillis(1)
private val requestInterval = Duration.ofMillis(1)
private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
+private val configParser = JsonConfigurationParser()
private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
configurationStateListener: ConfigurationStateListener,
iterationCount: Long = 1
): CbsConfigurationProvider {
- val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+ val retry = Retry
+ .onlyIf<Any> { it.iteration() <= iterationCount }
+ .fixedBackoff(Duration.ofNanos(1))
return CbsConfigurationProvider(
cbsClientMono,
CbsConfiguration(firstRequestDelay, requestInterval),
+ configParser,
streamParser,
configurationStateListener,
- retry,
- { mapOf("k" to "v") }
+ { mapOf("k" to "v") },
+ retry
)
}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
index bc61b57d..4cd2ba97 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
@@ -55,9 +55,9 @@ internal object ConfigurationMergerTest : Spek({
}
it("merges single parameter into full config") {
- val actual = FileConfigurationReader().loadConfig(
+ val actual = JsonConfigurationParser().parse(
InputStreamReader(
- FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
val result = ConfigurationMerger().merge(actual, diff)
@@ -66,31 +66,31 @@ internal object ConfigurationMergerTest : Spek({
}
it("merges single embedded parameter into full config") {
- val actual = FileConfigurationReader().loadConfig(
+ val actual = JsonConfigurationParser().parse(
InputStreamReader(
- FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
val diff = PartialConfiguration(listenPort = someListenPort)
val result = ConfigurationMerger().merge(actual, diff)
assertThat(result.listenPort).isEqualTo(someListenPort)
assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
- assertThat(result.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+ assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse()
assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
}
it("merges full config into single parameter") {
val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO))
- val diff = FileConfigurationReader().loadConfig(
+ val diff = JsonConfigurationParser().parse(
InputStreamReader(
- FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
val result = ConfigurationMerger().merge(actual, diff)
assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
- assertThat(result.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+ assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
assertThat(result.keyStoreFile.isEmpty()).isFalse()
assertThat(result.firstRequestDelaySec.isEmpty()).isFalse()
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index e43acfa3..5fa1fd62 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -23,14 +23,17 @@ import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.core.getOrElse
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
import java.io.File
import java.time.Duration
@@ -81,7 +84,7 @@ internal object ConfigurationValidatorTest : Spek({
keyStorePassword = Some(KEYSTORE_PASSWORD),
trustStoreFile = Some(TRUSTSTORE),
trustStorePassword = Some(TRUSTSTORE_PASSWORD),
- routing = Some(emptyRouting),
+ streamPublishers = Some(sampleStreamsDefinition),
logLevel = Some(LogLevel.TRACE)
)
@@ -92,9 +95,12 @@ internal object ConfigurationValidatorTest : Spek({
fail("Configuration should have been created successfully")
},
{
- assertThat(it.server.listenPort).isEqualTo(defaultListenPort)
- assertThat(it.server.idleTimeout).isEqualTo(defaultIdleTimeoutSec)
- assertThat(it.server.maxPayloadSizeBytes).isEqualTo(defaultMaxPayloadSizeBytes)
+ assertThat(it.server.listenPort)
+ .isEqualTo(defaultListenPort)
+ assertThat(it.server.maxPayloadSizeBytes)
+ .isEqualTo(defaultMaxPayloadSizeBytes)
+ assertThat(it.server.idleTimeout)
+ .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
val securityKeys = it.security.keys
.getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
@@ -103,10 +109,14 @@ internal object ConfigurationValidatorTest : Spek({
securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
- assertThat(it.cbs.firstRequestDelay).isEqualTo(defaultFirstReqDelaySec)
- assertThat(it.cbs.requestInterval).isEqualTo(defaultRequestIntervalSec)
+ assertThat(it.cbs.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
+ assertThat(it.cbs.requestInterval)
+ .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
+
+ assertThat(it.collector.routing)
+ .isEqualTo(sampleRouting)
- assertThat(it.collector.routing).isEqualTo(emptyRouting)
assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
}
)
@@ -130,16 +140,16 @@ internal object ConfigurationValidatorTest : Spek({
},
{
assertThat(it.server.idleTimeout)
- .isEqualTo(defaultIdleTimeoutSec)
+ .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
assertThat(it.security.keys)
.isEqualTo(None)
assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(defaultFirstReqDelaySec)
+ .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
assertThat(it.collector.routing)
- .isEqualTo(emptyRouting)
+ .isEqualTo(sampleRouting)
}
)
}
@@ -172,42 +182,47 @@ internal object ConfigurationValidatorTest : Spek({
})
private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort),
- idleTimeoutSec: Option<Duration> = Some(defaultIdleTimeoutSec),
+ idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec),
maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes),
- firstReqDelaySec: Option<Duration> = Some(defaultFirstReqDelaySec),
- requestIntervalSec: Option<Duration> = Some(defaultRequestIntervalSec),
+ firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
+ requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
sslDisable: Option<Boolean> = Some(false),
keyStoreFile: Option<String> = Some(KEYSTORE),
keyStorePassword: Option<String> = Some(KEYSTORE_PASSWORD),
trustStoreFile: Option<String> = Some(TRUSTSTORE),
trustStorePassword: Option<String> = Some(TRUSTSTORE_PASSWORD),
- routing: Option<Routing> = Some(emptyRouting),
+ streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition),
logLevel: Option<LogLevel> = Some(LogLevel.INFO)
-) =
- PartialConfiguration(
- listenPort = listenPort,
- idleTimeoutSec = idleTimeoutSec,
- maxPayloadSizeBytes = maxPayloadSizeBytes,
- firstRequestDelaySec = firstReqDelaySec,
- requestIntervalSec = requestIntervalSec,
- sslDisable = sslDisable,
- keyStoreFile = keyStoreFile,
- keyStorePassword = keyStorePassword,
- trustStoreFile = trustStoreFile,
- trustStorePassword = trustStorePassword,
- routing = routing,
- logLevel = logLevel
- )
-
-val defaultListenPort = 1234
-val defaultRequestIntervalSec = Duration.ofSeconds(3)
-val defaultMaxPayloadSizeBytes = 2
-val defaultIdleTimeoutSec = Duration.ofSeconds(10L)
-val defaultFirstReqDelaySec = Duration.ofSeconds(10L)
-
-val KEYSTORE = "test.ks.pkcs12"
-val KEYSTORE_PASSWORD = "changeMe"
-val TRUSTSTORE = "trust.ks.pkcs12"
-val TRUSTSTORE_PASSWORD = "changeMeToo"
-
-val emptyRouting: Routing = emptyList()
+) = PartialConfiguration(
+ listenPort = listenPort,
+ idleTimeoutSec = idleTimeoutSec,
+ maxPayloadSizeBytes = maxPayloadSizeBytes,
+ firstRequestDelaySec = firstReqDelaySec,
+ requestIntervalSec = requestIntervalSec,
+ sslDisable = sslDisable,
+ keyStoreFile = keyStoreFile,
+ keyStorePassword = keyStorePassword,
+ trustStoreFile = trustStoreFile,
+ trustStorePassword = trustStorePassword,
+ streamPublishers = streamPublishers,
+ logLevel = logLevel
+)
+
+const val defaultListenPort = 1234
+const val defaultMaxPayloadSizeBytes = 2
+const val defaultRequestIntervalSec = 3L
+const val defaultIdleTimeoutSec = 10L
+const val defaultFirstReqDelaySec = 10L
+
+const val KEYSTORE = "test.ks.pkcs12"
+const val KEYSTORE_PASSWORD = "changeMe"
+const val TRUSTSTORE = "trust.ks.pkcs12"
+const val TRUSTSTORE_PASSWORD = "changeMeToo"
+
+const val sampleSinkName = "perf3gpp"
+
+private val sampleSink = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn(sampleSinkName)
+}
+val sampleStreamsDefinition = listOf(sampleSink)
+val sampleRouting = listOf(Route(sampleSink.name(), sampleSink)) \ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt
index b4683458..ad38fd51 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt
@@ -35,15 +35,15 @@ import kotlin.test.fail
* @author Pawel Biniek <pawel.biniek@nokia.com>
* @since February 2019
*/
-internal object FileConfigurationReaderTest : Spek({
- describe("A configuration loader utility") {
- val cut = FileConfigurationReader()
+internal object JsonConfigurationParserTest : Spek({
+ describe("A configuration parser utility") {
+ val cut = JsonConfigurationParser()
- describe("partial configuration loading") {
+ describe("partial configuration parsing") {
it("parses enumerations") {
val input = """{"logLevel":"ERROR"}"""
- val config = cut.loadConfig(StringReader(input))
+ val config = cut.parse(StringReader(input))
assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
}
@@ -53,16 +53,16 @@ internal object FileConfigurationReaderTest : Spek({
"cbs.firstRequestDelaySec": 10
}
""".trimIndent()
- val config = cut.loadConfig(StringReader(input))
+ val config = cut.parse(StringReader(input))
assertThat(config.listenPort).isEqualTo(Some(12003))
- assertThat(config.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(10)))
+ assertThat(config.firstRequestDelaySec).isEqualTo(Some(10L))
}
it("parses disabled security configuration") {
val input = """{
"security.sslDisable": true
}""".trimIndent()
- val config = cut.loadConfig(StringReader(input))
+ val config = cut.parse(StringReader(input))
assertThat(config.sslDisable.getOrElse { fail("Should be Some") }).isTrue()
}
@@ -71,26 +71,26 @@ internal object FileConfigurationReaderTest : Spek({
val input = """{
"logLevel": something
}""".trimMargin()
- val config = cut.loadConfig(input.reader())
+ val config = cut.parse(input.reader())
assertThat(config.logLevel.isEmpty())
}
}
- describe("complete file loading") {
- it("loads actual file") {
- val config = cut.loadConfig(
+ describe("complete json parsing") {
+ it("parses actual json") {
+ val config = cut.parse(
javaClass.resourceAsStream("/sampleConfig.json"))
assertThat(config).isNotNull
assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
assertThat(config.listenPort).isEqualTo(Some(6000))
- assertThat(config.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+ assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L))
assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576))
- assertThat(config.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7)))
- assertThat(config.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900)))
+ assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L))
+ assertThat(config.requestIntervalSec).isEqualTo(Some(900L))
assertThat(config.sslDisable).isEqualTo(Some(false))
assertThat(config.keyStoreFile).isEqualTo(Some("test.ks.pkcs12"))
@@ -101,4 +101,3 @@ internal object FileConfigurationReaderTest : Spek({
}
}
})
-