diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-10 06:23:20 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-04-10 06:23:20 +0000 |
commit | c9829d23c12b2824a0d56ee6efbd00ad67b9046e (patch) | |
tree | 383f11c46d941f5596e6f775b2c40d680e36aea8 /sources/hv-collector-configuration/src | |
parent | 8809461fd30a83335ede35b342fe4425d2df840d (diff) | |
parent | 325387e62a0793871dc1eb97f02a4ae90a977664 (diff) |
Merge "Parse whole dynamic configuration"
Diffstat (limited to 'sources/hv-collector-configuration/src')
12 files changed, 193 insertions, 206 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index ccce62a4..93381572 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -26,8 +26,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator -import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono class ConfigurationModule { private val cmd = HvVesCommandLineParser() - private val configReader = FileConfigurationReader() + private val configParser = JsonConfigurationParser() private val configValidator = ConfigurationValidator() private val merger = ConfigurationMerger() @@ -51,10 +51,9 @@ class ConfigurationModule { mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> = Mono.just(cmd.getConfigurationFile(args)) .throwOnLeft(::MissingArgumentException) - .map { - logger.info { "Using base configuration file: ${it.absolutePath}" } - it.reader().use(configReader::loadConfig) - } + .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } } + .map { it.reader().use(configParser::parse) } + .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } } .cache() .flatMapMany { basePartialConfig -> cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) @@ -70,12 +69,13 @@ class ConfigurationModule { CbsConfigurationProvider( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), cbsConfigurationFrom(basePartialConfig), + configParser, configStateListener, mdc) - private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = - configValidator.validatedCbsConfiguration(basePartialConfig) - .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } + private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator + .validatedCbsConfiguration(basePartialConfig) + .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } companion object { private val logger = Logger(ConfigurationModule::class) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index c1807be2..f745d595 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -37,8 +37,8 @@ data class HvVesConfiguration( data class ServerConfiguration( val listenPort: Int, - val idleTimeout: Duration, - val maxPayloadSizeBytes: Int + val maxPayloadSizeBytes: Int, + val idleTimeout: Duration ) data class CbsConfiguration( diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index b6462936..4982c732 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -19,17 +19,14 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.None -import arrow.core.Option -import arrow.core.Some +import arrow.core.toOption import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcae.collectors.veshv.utils.reader import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient @@ -50,26 +47,29 @@ import reactor.retry.Retry */ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>, private val cbsConfiguration: CbsConfiguration, + private val configParser: JsonConfigurationParser, private val streamParser: StreamFromGsonParser<KafkaSink>, private val configurationStateListener: ConfigurationStateListener, - retrySpec: Retry<Any>, - private val mdc: MappedDiagnosticContext + private val mdc: MappedDiagnosticContext, + retrySpec: Retry<Any> ) { constructor(cbsClientMono: Mono<CbsClient>, cbsConfig: CbsConfiguration, + configParser: JsonConfigurationParser, configurationStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext) : this( cbsClientMono, cbsConfig, + configParser, StreamFromGsonParsers.kafkaSinkParser(), configurationStateListener, + mdc, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(cbsConfig.requestInterval) - .jitter(Jitter.random()), - mdc + .jitter(Jitter.random()) ) private val retry = retrySpec.doOnRetry { @@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien cbsConfiguration.firstRequestDelay, cbsConfiguration.requestInterval) .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } - .map(::createRoutingDescription) + .map(::parseConfiguration) + .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } .onErrorLog(logger, mdc) { "Error while creating configuration" } .retryWhen(retry) - .map { PartialConfiguration(routing = it) } - private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try { - val routes = DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - Some(routes) - } catch (e: NullPointerException) { - logger.withWarn(mdc) { - log("Invalid streams configuration", e) - } - None - } + private fun parseConfiguration(json: JsonObject) = + configParser + .parse(json.reader()) + .apply { streamPublishers = extractStreamDefinitions(json).toOption() } + + private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> = + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .asIterable() + .toList() companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index 8e6bafc4..e782a1e7 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Option -import arrow.core.Some import arrow.core.getOrElse import arrow.core.toOption -import kotlin.reflect.KProperty0 -import kotlin.reflect.KProperty1 /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -47,15 +44,11 @@ internal class ConfigurationMerger { trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), trustStorePassword = base.trustStorePassword.updateToGivenOrNone(update.trustStorePassword), - routing = base.routing.updateToGivenOrNone(update.routing), + streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) ) private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) = update.getOrElse(this::orNull).toOption() - } - - - diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index cfcc7d76..dddf0bed 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -23,19 +23,25 @@ import arrow.core.None import arrow.core.Option import arrow.core.Some import arrow.core.getOrElse +import arrow.core.toOption import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityKeysPaths import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords import java.io.File +import java.nio.file.Path +import java.time.Duration /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -88,16 +94,16 @@ internal class ConfigurationValidator { partial.mapBinding { ServerConfiguration( it.listenPort.bind(), - it.idleTimeoutSec.bind(), - it.maxPayloadSizeBytes.bind() + it.maxPayloadSizeBytes.bind(), + Duration.ofSeconds(it.idleTimeoutSec.bind()) ) } internal fun validatedCbsConfiguration(partial: PartialConfiguration) = partial.mapBinding { CbsConfiguration( - it.firstRequestDelaySec.bind(), - it.requestIntervalSec.bind() + Duration.ofSeconds(it.firstRequestDelaySec.bind()), + Duration.ofSeconds(it.requestIntervalSec.bind()) ) } @@ -113,19 +119,31 @@ internal class ConfigurationValidator { private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> = partial.mapBinding { SecurityConfiguration( - Option.fromNullable(SecurityKeysPaths( + createSecurityKeys( File(it.keyStoreFile.bind()).toPath(), it.keyStorePassword.bind(), File(it.trustStoreFile.bind()).toPath(), it.trustStorePassword.bind() - ).asImmutableSecurityKeys()) + ).toOption() ) } + private fun createSecurityKeys(keyStorePath: Path, + keyStorePassword: String, + trustStorePath: Path, + trustStorePassword: String) = + ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(keyStorePath)) + .keyStorePassword(Passwords.fromString(keyStorePassword)) + .trustStore(ImmutableSecurityKeysStore.of(trustStorePath)) + .trustStorePassword(Passwords.fromString(trustStorePassword)) + .build() + + private fun validatedCollectorConfig(partial: PartialConfiguration) = - partial.mapBinding { + partial.mapBinding { config -> CollectorConfiguration( - it.routing.bind() + config.streamPublishers.bind().map { Route(it.name(), it) } ) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt index 104ca78c..0b3dd0d5 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt @@ -21,28 +21,18 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Option import com.google.gson.GsonBuilder -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter -import org.onap.dcae.collectors.veshv.utils.logging.Logger - import java.io.Reader -import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> * @since February 2019 */ -internal class FileConfigurationReader { +internal class JsonConfigurationParser { private val gson = GsonBuilder() .registerTypeAdapter(Option::class.java, OptionAdapter()) - .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter()) .create() - fun loadConfig(input: Reader): PartialConfiguration = + fun parse(input: Reader): PartialConfiguration = gson.fromJson(input, PartialConfiguration::class.java) - .also { logger.info { "Successfully read file and parsed json to configuration: $it" } } - - companion object { - private val logger = Logger(FileConfigurationReader::class) - } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt deleted file mode 100644 index 3bde7089..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl.gsonadapters - -import com.google.gson.JsonDeserializationContext -import com.google.gson.JsonDeserializer -import com.google.gson.JsonElement -import java.lang.reflect.Type -import java.time.Duration - -/** - * @author Pawel Biniek <pawel.biniek@nokia.com> - * @since March 2019 - */ -internal class DurationOfSecondsAdapter : JsonDeserializer<Duration> { - override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) = - Duration.ofSeconds(json.asLong) - -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index 0be2572d..30f6c3e3 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option import com.google.gson.annotations.SerializedName -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.time.Duration +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -34,14 +33,14 @@ internal data class PartialConfiguration( @SerializedName("server.listenPort") val listenPort: Option<Int> = None, @SerializedName("server.idleTimeoutSec") - val idleTimeoutSec: Option<Duration> = None, + val idleTimeoutSec: Option<Long> = None, @SerializedName("server.maxPayloadSizeBytes") val maxPayloadSizeBytes: Option<Int> = None, @SerializedName("cbs.firstRequestDelaySec") - val firstRequestDelaySec: Option<Duration> = None, + val firstRequestDelaySec: Option<Long> = None, @SerializedName("cbs.requestIntervalSec") - val requestIntervalSec: Option<Duration> = None, + val requestIntervalSec: Option<Long> = None, @SerializedName("security.sslDisable") val sslDisable: Option<Boolean> = None, @@ -54,9 +53,9 @@ internal data class PartialConfiguration( @SerializedName("security.keys.trustStorePassword") val trustStorePassword: Option<String> = None, - @SerializedName("collector.routing") - val routing: Option<Routing> = None, - @SerializedName("logLevel") - val logLevel: Option<LogLevel> = None + val logLevel: Option<LogLevel> = None, + + @Transient + var streamPublishers: Option<List<KafkaSink>> = None ) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt index d5fe588e..94eb519d 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.config.impl +import arrow.core.Some import com.google.gson.JsonParser import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq @@ -51,9 +52,9 @@ internal object CbsConfigurationProviderTest : Spek({ describe("Configuration provider") { - val cbsClient: CbsClient = mock() - val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient) - val configStateListener: ConfigurationStateListener = mock() + val cbsClient = mock<CbsClient>() + val cbsClientMock = Mono.just(cbsClient) + val configStateListener = mock<ConfigurationStateListener>() given("configuration is never in cbs") { val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) @@ -78,29 +79,32 @@ internal object CbsConfigurationProviderTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val routes = it.routing.orNull()!! - val route1 = routes.elementAt(0) - val route2 = routes.elementAt(1) - val receivedSink1 = route1.sink - val receivedSink2 = route2.sink - - assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) - assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) - assertThat(receivedSink1.bootstrapServers()) + + assertThat(it.listenPort).isEqualTo(Some(6061)) + assertThat(it.idleTimeoutSec).isEqualTo(Some(60L)) + assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576)) + + + val sinks = it.streamPublishers.orNull()!! + val sink1 = sinks[0] + val sink2 = sinks[1] + + assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL) + assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(sink1.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") - assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") - assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) - assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) - assertThat(receivedSink2.bootstrapServers()) + assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL) + assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(sink2.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") - assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") - + assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") }.verifyComplete() } } - } + given("invalid configuration from cbs") { val iterationCount = 3L val configProvider = constructConfigurationProvider( @@ -112,7 +116,8 @@ internal object CbsConfigurationProviderTest : Spek({ .thenReturn(Flux.just(invalidConfiguration)) it("should interrupt the flux") { - StepVerifier.create(configProvider()) + StepVerifier + .create(configProvider()) .verifyError() } @@ -126,8 +131,8 @@ internal object CbsConfigurationProviderTest : Spek({ }) -val PERF3GPP_REGIONAL = "perf3gpp_regional" -val PERF3GPP_CENTRAL = "perf3gpp_central" +private const val PERF3GPP_REGIONAL = "perf3gpp_regional" +private const val PERF3GPP_CENTRAL = "perf3gpp_central" private val aafCredentials1 = ImmutableAafCredentials.builder() .username("client") @@ -141,6 +146,9 @@ private val aafCredentials2 = ImmutableAafCredentials.builder() private val validConfiguration = JsonParser().parse(""" { + "server.listenPort": 6061, + "server.idleTimeoutSec": 60, + "server.maxPayloadSizeBytes": 1048576, "streams_publishes": { "$PERF3GPP_REGIONAL": { "type": "kafka", @@ -173,12 +181,12 @@ private val invalidConfiguration = JsonParser().parse(""" "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { - "username": "client", + "user": "client", "password": "very secure password" }, "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "popic_name": "REG_HVVES_PERF3GPP" + "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "name": "REG_HVVES_PERF3GPP" } } } @@ -187,20 +195,24 @@ private val invalidConfiguration = JsonParser().parse(""" private val firstRequestDelay = Duration.ofMillis(1) private val requestInterval = Duration.ofMillis(1) private val streamParser = StreamFromGsonParsers.kafkaSinkParser() +private val configParser = JsonConfigurationParser() private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, configurationStateListener: ConfigurationStateListener, iterationCount: Long = 1 ): CbsConfigurationProvider { - val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + val retry = Retry + .onlyIf<Any> { it.iteration() <= iterationCount } + .fixedBackoff(Duration.ofNanos(1)) return CbsConfigurationProvider( cbsClientMono, CbsConfiguration(firstRequestDelay, requestInterval), + configParser, streamParser, configurationStateListener, - retry, - { mapOf("k" to "v") } + { mapOf("k" to "v") }, + retry ) } diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt index bc61b57d..4cd2ba97 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt @@ -55,9 +55,9 @@ internal object ConfigurationMergerTest : Spek({ } it("merges single parameter into full config") { - val actual = FileConfigurationReader().loadConfig( + val actual = JsonConfigurationParser().parse( InputStreamReader( - FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) + JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) val result = ConfigurationMerger().merge(actual, diff) @@ -66,31 +66,31 @@ internal object ConfigurationMergerTest : Spek({ } it("merges single embedded parameter into full config") { - val actual = FileConfigurationReader().loadConfig( + val actual = JsonConfigurationParser().parse( InputStreamReader( - FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) + JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) val diff = PartialConfiguration(listenPort = someListenPort) val result = ConfigurationMerger().merge(actual, diff) assertThat(result.listenPort).isEqualTo(someListenPort) assertThat(result.idleTimeoutSec.isEmpty()).isFalse() - assertThat(result.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200))) + assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L)) assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse() assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576)) } it("merges full config into single parameter") { val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO)) - val diff = FileConfigurationReader().loadConfig( + val diff = JsonConfigurationParser().parse( InputStreamReader( - FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) + JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) val result = ConfigurationMerger().merge(actual, diff) assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR)) assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576)) - assertThat(result.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200))) + assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L)) assertThat(result.keyStoreFile.isEmpty()).isFalse() assertThat(result.firstRequestDelaySec.isEmpty()).isFalse() diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index e43acfa3..5fa1fd62 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -23,14 +23,17 @@ import arrow.core.None import arrow.core.Option import arrow.core.Some import arrow.core.getOrElse +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.fail import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys import java.io.File import java.time.Duration @@ -81,7 +84,7 @@ internal object ConfigurationValidatorTest : Spek({ keyStorePassword = Some(KEYSTORE_PASSWORD), trustStoreFile = Some(TRUSTSTORE), trustStorePassword = Some(TRUSTSTORE_PASSWORD), - routing = Some(emptyRouting), + streamPublishers = Some(sampleStreamsDefinition), logLevel = Some(LogLevel.TRACE) ) @@ -92,9 +95,12 @@ internal object ConfigurationValidatorTest : Spek({ fail("Configuration should have been created successfully") }, { - assertThat(it.server.listenPort).isEqualTo(defaultListenPort) - assertThat(it.server.idleTimeout).isEqualTo(defaultIdleTimeoutSec) - assertThat(it.server.maxPayloadSizeBytes).isEqualTo(defaultMaxPayloadSizeBytes) + assertThat(it.server.listenPort) + .isEqualTo(defaultListenPort) + assertThat(it.server.maxPayloadSizeBytes) + .isEqualTo(defaultMaxPayloadSizeBytes) + assertThat(it.server.idleTimeout) + .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) val securityKeys = it.security.keys .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys @@ -103,10 +109,14 @@ internal object ConfigurationValidatorTest : Spek({ securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) } securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) } - assertThat(it.cbs.firstRequestDelay).isEqualTo(defaultFirstReqDelaySec) - assertThat(it.cbs.requestInterval).isEqualTo(defaultRequestIntervalSec) + assertThat(it.cbs.firstRequestDelay) + .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) + assertThat(it.cbs.requestInterval) + .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec)) + + assertThat(it.collector.routing) + .isEqualTo(sampleRouting) - assertThat(it.collector.routing).isEqualTo(emptyRouting) assertThat(it.logLevel).isEqualTo(LogLevel.TRACE) } ) @@ -130,16 +140,16 @@ internal object ConfigurationValidatorTest : Spek({ }, { assertThat(it.server.idleTimeout) - .isEqualTo(defaultIdleTimeoutSec) + .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) assertThat(it.security.keys) .isEqualTo(None) assertThat(it.cbs.firstRequestDelay) - .isEqualTo(defaultFirstReqDelaySec) + .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) assertThat(it.collector.routing) - .isEqualTo(emptyRouting) + .isEqualTo(sampleRouting) } ) } @@ -172,42 +182,47 @@ internal object ConfigurationValidatorTest : Spek({ }) private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort), - idleTimeoutSec: Option<Duration> = Some(defaultIdleTimeoutSec), + idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec), maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes), - firstReqDelaySec: Option<Duration> = Some(defaultFirstReqDelaySec), - requestIntervalSec: Option<Duration> = Some(defaultRequestIntervalSec), + firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec), + requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec), sslDisable: Option<Boolean> = Some(false), keyStoreFile: Option<String> = Some(KEYSTORE), keyStorePassword: Option<String> = Some(KEYSTORE_PASSWORD), trustStoreFile: Option<String> = Some(TRUSTSTORE), trustStorePassword: Option<String> = Some(TRUSTSTORE_PASSWORD), - routing: Option<Routing> = Some(emptyRouting), + streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition), logLevel: Option<LogLevel> = Some(LogLevel.INFO) -) = - PartialConfiguration( - listenPort = listenPort, - idleTimeoutSec = idleTimeoutSec, - maxPayloadSizeBytes = maxPayloadSizeBytes, - firstRequestDelaySec = firstReqDelaySec, - requestIntervalSec = requestIntervalSec, - sslDisable = sslDisable, - keyStoreFile = keyStoreFile, - keyStorePassword = keyStorePassword, - trustStoreFile = trustStoreFile, - trustStorePassword = trustStorePassword, - routing = routing, - logLevel = logLevel - ) - -val defaultListenPort = 1234 -val defaultRequestIntervalSec = Duration.ofSeconds(3) -val defaultMaxPayloadSizeBytes = 2 -val defaultIdleTimeoutSec = Duration.ofSeconds(10L) -val defaultFirstReqDelaySec = Duration.ofSeconds(10L) - -val KEYSTORE = "test.ks.pkcs12" -val KEYSTORE_PASSWORD = "changeMe" -val TRUSTSTORE = "trust.ks.pkcs12" -val TRUSTSTORE_PASSWORD = "changeMeToo" - -val emptyRouting: Routing = emptyList() +) = PartialConfiguration( + listenPort = listenPort, + idleTimeoutSec = idleTimeoutSec, + maxPayloadSizeBytes = maxPayloadSizeBytes, + firstRequestDelaySec = firstReqDelaySec, + requestIntervalSec = requestIntervalSec, + sslDisable = sslDisable, + keyStoreFile = keyStoreFile, + keyStorePassword = keyStorePassword, + trustStoreFile = trustStoreFile, + trustStorePassword = trustStorePassword, + streamPublishers = streamPublishers, + logLevel = logLevel +) + +const val defaultListenPort = 1234 +const val defaultMaxPayloadSizeBytes = 2 +const val defaultRequestIntervalSec = 3L +const val defaultIdleTimeoutSec = 10L +const val defaultFirstReqDelaySec = 10L + +const val KEYSTORE = "test.ks.pkcs12" +const val KEYSTORE_PASSWORD = "changeMe" +const val TRUSTSTORE = "trust.ks.pkcs12" +const val TRUSTSTORE_PASSWORD = "changeMeToo" + +const val sampleSinkName = "perf3gpp" + +private val sampleSink = mock<KafkaSink>().also { + whenever(it.name()).thenReturn(sampleSinkName) +} +val sampleStreamsDefinition = listOf(sampleSink) +val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt index b4683458..ad38fd51 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt @@ -35,15 +35,15 @@ import kotlin.test.fail * @author Pawel Biniek <pawel.biniek@nokia.com> * @since February 2019 */ -internal object FileConfigurationReaderTest : Spek({ - describe("A configuration loader utility") { - val cut = FileConfigurationReader() +internal object JsonConfigurationParserTest : Spek({ + describe("A configuration parser utility") { + val cut = JsonConfigurationParser() - describe("partial configuration loading") { + describe("partial configuration parsing") { it("parses enumerations") { val input = """{"logLevel":"ERROR"}""" - val config = cut.loadConfig(StringReader(input)) + val config = cut.parse(StringReader(input)) assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) } @@ -53,16 +53,16 @@ internal object FileConfigurationReaderTest : Spek({ "cbs.firstRequestDelaySec": 10 } """.trimIndent() - val config = cut.loadConfig(StringReader(input)) + val config = cut.parse(StringReader(input)) assertThat(config.listenPort).isEqualTo(Some(12003)) - assertThat(config.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(10))) + assertThat(config.firstRequestDelaySec).isEqualTo(Some(10L)) } it("parses disabled security configuration") { val input = """{ "security.sslDisable": true }""".trimIndent() - val config = cut.loadConfig(StringReader(input)) + val config = cut.parse(StringReader(input)) assertThat(config.sslDisable.getOrElse { fail("Should be Some") }).isTrue() } @@ -71,26 +71,26 @@ internal object FileConfigurationReaderTest : Spek({ val input = """{ "logLevel": something }""".trimMargin() - val config = cut.loadConfig(input.reader()) + val config = cut.parse(input.reader()) assertThat(config.logLevel.isEmpty()) } } - describe("complete file loading") { - it("loads actual file") { - val config = cut.loadConfig( + describe("complete json parsing") { + it("parses actual json") { + val config = cut.parse( javaClass.resourceAsStream("/sampleConfig.json")) assertThat(config).isNotNull assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) assertThat(config.listenPort).isEqualTo(Some(6000)) - assertThat(config.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200))) + assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L)) assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576)) - assertThat(config.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7))) - assertThat(config.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900))) + assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L)) + assertThat(config.requestIntervalSec).isEqualTo(Some(900L)) assertThat(config.sslDisable).isEqualTo(Some(false)) assertThat(config.keyStoreFile).isEqualTo(Some("test.ks.pkcs12")) @@ -101,4 +101,3 @@ internal object FileConfigurationReaderTest : Spek({ } } }) - |