From 6725abbaa6249e107126ffd5ec58f2a96ce60eee Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 29 Mar 2019 11:22:24 +0100 Subject: Move ConfigurationProvider to config module Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk --- .../veshv/config/api/ConfigurationModule.kt | 32 +++- .../dcae/collectors/veshv/config/api/adapters.kt | 24 +++ .../veshv/config/api/model/Configuration.kt | 53 ------ .../veshv/config/api/model/Exceptions.kt | 24 --- .../collectors/veshv/config/api/model/Routing.kt | 26 --- .../veshv/config/api/model/configuration.kt | 51 +++++ .../veshv/config/api/model/exceptions.kt | 24 +++ .../collectors/veshv/config/api/model/routing.kt | 26 +++ .../veshv/config/impl/CbsConfigurationProvider.kt | 119 ++++++++++++ .../veshv/config/impl/ConfigurationValidator.kt | 4 +- .../veshv/config/impl/HvVesCommandLineParser.kt | 5 +- .../veshv/config/impl/PartialConfiguration.kt | 59 ------ .../veshv/config/impl/partial_configuration.kt | 59 ++++++ .../config/impl/CbsConfigurationProviderTest.kt | 206 +++++++++++++++++++++ 14 files changed, 539 insertions(+), 173 deletions(-) create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt create mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt (limited to 'sources/hv-collector-configuration/src') diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index dd1b171e..9684484b 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException import org.onap.dcae.collectors.veshv.config.api.model.ValidationException -import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader +import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux class ConfigurationModule { @@ -34,16 +40,28 @@ class ConfigurationModule { private val configReader = FileConfigurationReader() private val configValidator = ConfigurationValidator() - private lateinit var initialConfig: HvVesConfiguration - fun healthCheckPort(args: Array): Int = cmd.getHealthcheckPort(args) - fun hvVesConfigurationUpdates(args: Array): Flux = + fun hvVesConfigurationUpdates(args: Array, + configStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext): Flux = Flux.just(cmd.getConfigurationFile(args)) .throwOnLeft { MissingArgumentException(it.message, it.cause) } .map { it.reader().use(configReader::loadConfig) } - .map { configValidator.validate(it) } - .throwOnLeft { ValidationException(it.message) } - .doOnNext { initialConfig = it } + .cache() + .flatMap { basePartialConfig -> + val baseConfig = configValidator.validate(basePartialConfig) + .rightOrThrow { ValidationException(it.message) } + val cbsConfigProvider = CbsConfigurationProvider( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + baseConfig.cbs, + configStateListener, + mdc) + val merger = ConfigurationMerger() + cbsConfigProvider() + .map { merger.merge(basePartialConfig, it) } + .map { configValidator.validate(it) } + .throwOnLeft { ValidationException(it.message) } + } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt new file mode 100644 index 00000000..9fa6660e --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt @@ -0,0 +1,24 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api + +interface ConfigurationStateListener { + fun retrying() {} +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt deleted file mode 100644 index 3375821e..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.time.Duration - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -data class HvVesConfiguration( - val server: ServerConfiguration, - val cbs: CbsConfiguration, - val security: SecurityConfiguration, - val collector: CollectorConfiguration, - val logLevel: LogLevel -) - -data class ServerConfiguration( - val listenPort: Int, - val idleTimeout: Duration, - val maxPayloadSizeBytes: Int -) - -data class CbsConfiguration( - val firstRequestDelay: Duration, - val requestInterval: Duration -) - -data class CollectorConfiguration( - val maxRequestSizeBytes: Int, - val kafkaServers: String, - val routing: Routing -) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt deleted file mode 100644 index 2fc29829..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) - -class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt deleted file mode 100644 index e5a83ac4..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink - -data class Route(val domain: String, val sink: KafkaSink) - -typealias Routing = List diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt new file mode 100644 index 00000000..c1807be2 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import java.time.Duration + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +data class HvVesConfiguration( + val server: ServerConfiguration, + val cbs: CbsConfiguration, + val security: SecurityConfiguration, + val collector: CollectorConfiguration, + val logLevel: LogLevel +) + +data class ServerConfiguration( + val listenPort: Int, + val idleTimeout: Duration, + val maxPayloadSizeBytes: Int +) + +data class CbsConfiguration( + val firstRequestDelay: Duration, + val requestInterval: Duration +) + +data class CollectorConfiguration( + val routing: Routing +) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt new file mode 100644 index 00000000..2fc29829 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt @@ -0,0 +1,24 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) + +class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt new file mode 100644 index 00000000..e5a83ac4 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink + +data class Route(val domain: String, val sink: KafkaSink) + +typealias Routing = List diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt new file mode 100644 index 00000000..2038c31a --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -0,0 +1,119 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import com.google.gson.JsonObject +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal class CbsConfigurationProvider(private val cbsClientMono: Mono, + private val cbsConfiguration: CbsConfiguration, + private val streamParser: StreamFromGsonParser, + private val configurationStateListener: ConfigurationStateListener, + retrySpec: Retry, + private val mdc: MappedDiagnosticContext + +) { + constructor(cbsClientMono: Mono, + cbsConfig: CbsConfiguration, + configurationStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext) : + this( + cbsClientMono, + cbsConfig, + StreamFromGsonParsers.kafkaSinkParser(), + configurationStateListener, + Retry.any() + .retryMax(MAX_RETRIES) + .fixedBackoff(cbsConfig.requestInterval) + .jitter(Jitter.random()), + mdc + ) + + private val retry = retrySpec.doOnRetry { + logger.withWarn(mdc) { + log("Exception from configuration provider client, retrying subscription", it.exception()) + } + configurationStateListener.retrying() + } + + operator fun invoke(): Flux = + cbsClientMono + .doOnNext { logger.info(mdc) { "CBS client successfully created" } } + .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry) + .doFinally { logger.trace(mdc) { "CBS client subscription finished" } } + .flatMapMany(::handleUpdates) + + private fun handleUpdates(cbsClient: CbsClient) = cbsClient + .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), + cbsConfiguration.firstRequestDelay, + cbsConfiguration.requestInterval) + .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } + .map(::createRoutingDescription) + .onErrorLog(logger, mdc) { "Error while creating configuration" } + .retryWhen(retry) + .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) } + + private fun createRoutingDescription(configuration: JsonObject): Option = try { + val routes = DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + Some(routes) + } catch (e: NullPointerException) { + logger.withWarn(mdc) { + log("Invalid streams configuration", e) + } + None + } + + companion object { + private const val MAX_RETRIES = 5L + private val logger = Logger(CbsConfigurationProvider::class) + } +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index 04bba7e2..3e599b58 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -63,9 +63,7 @@ internal class ConfigurationValidator { securityConfiguration, // TOD0: swap when ConfigurationMerger is implemented // collectorConfiguration - CollectorConfiguration(-1, - "I do not exist. I'm not even a URL :o", - emptyList()), + CollectorConfiguration(emptyList()), // end TOD0 logLevel ) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt index 3e93a400..c1a98294 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt @@ -19,7 +19,10 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.* +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try +import arrow.core.getOrElse import org.apache.commons.cli.CommandLine import org.apache.commons.cli.CommandLineParser import org.apache.commons.cli.DefaultParser diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt deleted file mode 100644 index a27998e1..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import arrow.core.None -import arrow.core.Option -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.net.InetSocketAddress -import java.time.Duration - -/** - * @author Pawel Biniek - * @since February 2019 - */ -internal data class PartialConfiguration( - val server: Option = None, - val cbs: Option = None, - val security: Option = None, - val collector: Option = None, - val logLevel: Option = None -) - -internal data class PartialServerConfig( - val listenPort: Option = None, - val idleTimeoutSec: Option = None, - val maxPayloadSizeBytes: Option = None -) - -internal data class PartialCbsConfig( - val firstRequestDelaySec: Option = None, - val requestIntervalSec: Option = None -) - -internal data class PartialSecurityConfig(val keys: Option = None) - -internal data class PartialCollectorConfig( - val maxRequestSizeBytes: Option = None, - val kafkaServers: Option> = None, - val routing: Option = None -) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt new file mode 100644 index 00000000..f3c149cd --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys +import java.net.InetSocketAddress +import java.time.Duration + +/** + * @author Pawel Biniek + * @since February 2019 + */ +internal data class PartialConfiguration( + val server: Option = None, + val cbs: Option = None, + val security: Option = None, + val collector: Option = None, + val logLevel: Option = None +) + +internal data class PartialServerConfig( + val listenPort: Option = None, + val idleTimeoutSec: Option = None, + val maxPayloadSizeBytes: Option = None +) + +internal data class PartialCbsConfig( + val firstRequestDelaySec: Option = None, + val requestIntervalSec: Option = None +) + +internal data class PartialSecurityConfig(val keys: Option = None) + +internal data class PartialCollectorConfig( + val maxRequestSizeBytes: Option = None, + val kafkaServers: Option> = None, // TOD0: remove properties and simplify this part + val routing: Option = None +) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt new file mode 100644 index 00000000..0cbc0e4a --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -0,0 +1,206 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import com.google.gson.JsonParser +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Retry +import reactor.test.StepVerifier +import java.time.Duration + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal object CbsConfigurationProviderTest : Spek({ + + describe("Configuration provider") { + + val cbsClient: CbsClient = mock() + val cbsClientMock: Mono = Mono.just(cbsClient) + val configStateListener: ConfigurationStateListener = mock() + + given("configuration is never in cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) + + on("waiting for configuration") { + val waitTime = Duration.ofMillis(100) + + it("should not get it") { + StepVerifier.create(configProvider().take(1)) + .expectNoEvent(waitTime) + } + } + } + + given("valid configuration from cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(validConfiguration)) + it("should use received configuration") { + + StepVerifier.create(configProvider().take(1)) + .consumeNextWith { + val routes = it.collector.orNull()!!.routing.orNull()!! + val route1 = routes.elementAt(0) + val route2 = routes.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) + assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(receivedSink1.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") + assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) + assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(receivedSink2.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") + assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + + }.verifyComplete() + } + } + + } + given("invalid configuration from cbs") { + val iterationCount = 3L + val configProvider = constructConfigurationProvider( + cbsClientMock, configStateListener, iterationCount + ) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(invalidConfiguration)) + + it("should interrupt the flux") { + StepVerifier.create(configProvider()) + .verifyError() + } + + it("should call state listener when retrying") { + verify(configStateListener, times(iterationCount.toInt())).retrying() + } + } + } + } + +}) + + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + +private val aafCredentials1 = ImmutableAafCredentials.builder() + .username("client") + .password("very secure password") + .build() + +private val aafCredentials2 = ImmutableAafCredentials.builder() + .username("other_client") + .password("another very secure password") + .build() + +private val validConfiguration = JsonParser().parse(""" +{ + "streams_publishes": { + "$PERF3GPP_REGIONAL": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "topic_name": "REG_HVVES_PERF3GPP" + } + }, + "$PERF3GPP_CENTRAL": { + "type": "kafka", + "aaf_credentials": { + "username": "other_client", + "password": "another very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", + "topic_name": "CEN_HVVES_PERF3GPP" + } + } + } +}""").asJsonObject + +private val invalidConfiguration = JsonParser().parse(""" +{ + "streams_publishes": { + "$PERF3GPP_REGIONAL": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "popic_name": "REG_HVVES_PERF3GPP" + } + } + } +}""").asJsonObject + +private val firstRequestDelay = Duration.ofMillis(1) +private val requestInterval = Duration.ofMillis(1) +private val streamParser = StreamFromGsonParsers.kafkaSinkParser() + +private fun constructConfigurationProvider(cbsClientMono: Mono, + configurationStateListener: ConfigurationStateListener, + iterationCount: Long = 1 +): CbsConfigurationProvider { + + val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return CbsConfigurationProvider( + cbsClientMono, + CbsConfiguration(firstRequestDelay, requestInterval), + streamParser, + configurationStateListener, + retry, + { mapOf("k" to "v") } + ) +} -- cgit 1.2.3-korg