From 6725abbaa6249e107126ffd5ec58f2a96ce60eee Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 29 Mar 2019 11:22:24 +0100 Subject: Move ConfigurationProvider to config module Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk --- sources/hv-collector-configuration/pom.xml | 5 +- .../veshv/config/api/ConfigurationModule.kt | 32 +++- .../dcae/collectors/veshv/config/api/adapters.kt | 24 +++ .../veshv/config/api/model/Configuration.kt | 53 ------ .../veshv/config/api/model/Exceptions.kt | 24 --- .../collectors/veshv/config/api/model/Routing.kt | 26 --- .../veshv/config/api/model/configuration.kt | 51 +++++ .../veshv/config/api/model/exceptions.kt | 24 +++ .../collectors/veshv/config/api/model/routing.kt | 26 +++ .../veshv/config/impl/CbsConfigurationProvider.kt | 119 ++++++++++++ .../veshv/config/impl/ConfigurationValidator.kt | 4 +- .../veshv/config/impl/HvVesCommandLineParser.kt | 5 +- .../veshv/config/impl/PartialConfiguration.kt | 59 ------ .../veshv/config/impl/partial_configuration.kt | 59 ++++++ .../config/impl/CbsConfigurationProviderTest.kt | 206 ++++++++++++++++++++ .../dcae/collectors/veshv/boundary/adapters.kt | 3 - .../org/onap/dcae/collectors/veshv/boundary/api.kt | 3 +- .../collectors/veshv/factory/AdapterFactory.kt | 31 +++ .../collectors/veshv/factory/CollectorFactory.kt | 33 +--- .../veshv/impl/adapters/AdapterFactory.kt | 40 ---- .../impl/adapters/ConfigurationProviderImpl.kt | 110 ----------- .../veshv/impl/adapters/ParsingException.kt | 22 --- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 16 +- .../impl/adapters/ConfigurationProviderImplTest.kt | 207 --------------------- .../tests/component/PerformanceSpecification.kt | 7 +- .../dcae/collectors/veshv/tests/component/Sut.kt | 29 +-- .../veshv/tests/component/VesHvSpecification.kt | 155 +-------------- .../veshv/tests/fakes/FakeHealthState.kt | 37 ---- .../collectors/veshv/tests/fakes/configuration.kt | 46 ----- .../org/onap/dcae/collectors/veshv/main/main.kt | 18 +- .../collectors/veshv/main/servers/VesServer.kt | 8 +- 31 files changed, 623 insertions(+), 859 deletions(-) create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt create mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt delete mode 100644 sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt delete mode 100644 sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml index 792b9eaa..b6ec4ca2 100644 --- a/sources/hv-collector-configuration/pom.xml +++ b/sources/hv-collector-configuration/pom.xml @@ -77,7 +77,10 @@ org.onap.dcaegen2.services.sdk.rest.services cbs-client - + + io.projectreactor.addons + reactor-extra + org.jetbrains.kotlin kotlin-stdlib-jdk8 diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index dd1b171e..9684484b 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException import org.onap.dcae.collectors.veshv.config.api.model.ValidationException -import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader +import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux class ConfigurationModule { @@ -34,16 +40,28 @@ class ConfigurationModule { private val configReader = FileConfigurationReader() private val configValidator = ConfigurationValidator() - private lateinit var initialConfig: HvVesConfiguration - fun healthCheckPort(args: Array): Int = cmd.getHealthcheckPort(args) - fun hvVesConfigurationUpdates(args: Array): Flux = + fun hvVesConfigurationUpdates(args: Array, + configStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext): Flux = Flux.just(cmd.getConfigurationFile(args)) .throwOnLeft { MissingArgumentException(it.message, it.cause) } .map { it.reader().use(configReader::loadConfig) } - .map { configValidator.validate(it) } - .throwOnLeft { ValidationException(it.message) } - .doOnNext { initialConfig = it } + .cache() + .flatMap { basePartialConfig -> + val baseConfig = configValidator.validate(basePartialConfig) + .rightOrThrow { ValidationException(it.message) } + val cbsConfigProvider = CbsConfigurationProvider( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + baseConfig.cbs, + configStateListener, + mdc) + val merger = ConfigurationMerger() + cbsConfigProvider() + .map { merger.merge(basePartialConfig, it) } + .map { configValidator.validate(it) } + .throwOnLeft { ValidationException(it.message) } + } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt new file mode 100644 index 00000000..9fa6660e --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt @@ -0,0 +1,24 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api + +interface ConfigurationStateListener { + fun retrying() {} +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt deleted file mode 100644 index 3375821e..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.time.Duration - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -data class HvVesConfiguration( - val server: ServerConfiguration, - val cbs: CbsConfiguration, - val security: SecurityConfiguration, - val collector: CollectorConfiguration, - val logLevel: LogLevel -) - -data class ServerConfiguration( - val listenPort: Int, - val idleTimeout: Duration, - val maxPayloadSizeBytes: Int -) - -data class CbsConfiguration( - val firstRequestDelay: Duration, - val requestInterval: Duration -) - -data class CollectorConfiguration( - val maxRequestSizeBytes: Int, - val kafkaServers: String, - val routing: Routing -) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt deleted file mode 100644 index 2fc29829..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) - -class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt deleted file mode 100644 index e5a83ac4..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink - -data class Route(val domain: String, val sink: KafkaSink) - -typealias Routing = List diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt new file mode 100644 index 00000000..c1807be2 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import java.time.Duration + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +data class HvVesConfiguration( + val server: ServerConfiguration, + val cbs: CbsConfiguration, + val security: SecurityConfiguration, + val collector: CollectorConfiguration, + val logLevel: LogLevel +) + +data class ServerConfiguration( + val listenPort: Int, + val idleTimeout: Duration, + val maxPayloadSizeBytes: Int +) + +data class CbsConfiguration( + val firstRequestDelay: Duration, + val requestInterval: Duration +) + +data class CollectorConfiguration( + val routing: Routing +) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt new file mode 100644 index 00000000..2fc29829 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt @@ -0,0 +1,24 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) + +class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt new file mode 100644 index 00000000..e5a83ac4 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink + +data class Route(val domain: String, val sink: KafkaSink) + +typealias Routing = List diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt new file mode 100644 index 00000000..2038c31a --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -0,0 +1,119 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import com.google.gson.JsonObject +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal class CbsConfigurationProvider(private val cbsClientMono: Mono, + private val cbsConfiguration: CbsConfiguration, + private val streamParser: StreamFromGsonParser, + private val configurationStateListener: ConfigurationStateListener, + retrySpec: Retry, + private val mdc: MappedDiagnosticContext + +) { + constructor(cbsClientMono: Mono, + cbsConfig: CbsConfiguration, + configurationStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext) : + this( + cbsClientMono, + cbsConfig, + StreamFromGsonParsers.kafkaSinkParser(), + configurationStateListener, + Retry.any() + .retryMax(MAX_RETRIES) + .fixedBackoff(cbsConfig.requestInterval) + .jitter(Jitter.random()), + mdc + ) + + private val retry = retrySpec.doOnRetry { + logger.withWarn(mdc) { + log("Exception from configuration provider client, retrying subscription", it.exception()) + } + configurationStateListener.retrying() + } + + operator fun invoke(): Flux = + cbsClientMono + .doOnNext { logger.info(mdc) { "CBS client successfully created" } } + .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry) + .doFinally { logger.trace(mdc) { "CBS client subscription finished" } } + .flatMapMany(::handleUpdates) + + private fun handleUpdates(cbsClient: CbsClient) = cbsClient + .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), + cbsConfiguration.firstRequestDelay, + cbsConfiguration.requestInterval) + .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } + .map(::createRoutingDescription) + .onErrorLog(logger, mdc) { "Error while creating configuration" } + .retryWhen(retry) + .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) } + + private fun createRoutingDescription(configuration: JsonObject): Option = try { + val routes = DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + Some(routes) + } catch (e: NullPointerException) { + logger.withWarn(mdc) { + log("Invalid streams configuration", e) + } + None + } + + companion object { + private const val MAX_RETRIES = 5L + private val logger = Logger(CbsConfigurationProvider::class) + } +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index 04bba7e2..3e599b58 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -63,9 +63,7 @@ internal class ConfigurationValidator { securityConfiguration, // TOD0: swap when ConfigurationMerger is implemented // collectorConfiguration - CollectorConfiguration(-1, - "I do not exist. I'm not even a URL :o", - emptyList()), + CollectorConfiguration(emptyList()), // end TOD0 logLevel ) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt index 3e93a400..c1a98294 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt @@ -19,7 +19,10 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.* +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try +import arrow.core.getOrElse import org.apache.commons.cli.CommandLine import org.apache.commons.cli.CommandLineParser import org.apache.commons.cli.DefaultParser diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt deleted file mode 100644 index a27998e1..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import arrow.core.None -import arrow.core.Option -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.net.InetSocketAddress -import java.time.Duration - -/** - * @author Pawel Biniek - * @since February 2019 - */ -internal data class PartialConfiguration( - val server: Option = None, - val cbs: Option = None, - val security: Option = None, - val collector: Option = None, - val logLevel: Option = None -) - -internal data class PartialServerConfig( - val listenPort: Option = None, - val idleTimeoutSec: Option = None, - val maxPayloadSizeBytes: Option = None -) - -internal data class PartialCbsConfig( - val firstRequestDelaySec: Option = None, - val requestIntervalSec: Option = None -) - -internal data class PartialSecurityConfig(val keys: Option = None) - -internal data class PartialCollectorConfig( - val maxRequestSizeBytes: Option = None, - val kafkaServers: Option> = None, - val routing: Option = None -) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt new file mode 100644 index 00000000..f3c149cd --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys +import java.net.InetSocketAddress +import java.time.Duration + +/** + * @author Pawel Biniek + * @since February 2019 + */ +internal data class PartialConfiguration( + val server: Option = None, + val cbs: Option = None, + val security: Option = None, + val collector: Option = None, + val logLevel: Option = None +) + +internal data class PartialServerConfig( + val listenPort: Option = None, + val idleTimeoutSec: Option = None, + val maxPayloadSizeBytes: Option = None +) + +internal data class PartialCbsConfig( + val firstRequestDelaySec: Option = None, + val requestIntervalSec: Option = None +) + +internal data class PartialSecurityConfig(val keys: Option = None) + +internal data class PartialCollectorConfig( + val maxRequestSizeBytes: Option = None, + val kafkaServers: Option> = None, // TOD0: remove properties and simplify this part + val routing: Option = None +) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt new file mode 100644 index 00000000..0cbc0e4a --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -0,0 +1,206 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import com.google.gson.JsonParser +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Retry +import reactor.test.StepVerifier +import java.time.Duration + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal object CbsConfigurationProviderTest : Spek({ + + describe("Configuration provider") { + + val cbsClient: CbsClient = mock() + val cbsClientMock: Mono = Mono.just(cbsClient) + val configStateListener: ConfigurationStateListener = mock() + + given("configuration is never in cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) + + on("waiting for configuration") { + val waitTime = Duration.ofMillis(100) + + it("should not get it") { + StepVerifier.create(configProvider().take(1)) + .expectNoEvent(waitTime) + } + } + } + + given("valid configuration from cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(validConfiguration)) + it("should use received configuration") { + + StepVerifier.create(configProvider().take(1)) + .consumeNextWith { + val routes = it.collector.orNull()!!.routing.orNull()!! + val route1 = routes.elementAt(0) + val route2 = routes.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) + assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(receivedSink1.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") + assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) + assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(receivedSink2.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") + assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + + }.verifyComplete() + } + } + + } + given("invalid configuration from cbs") { + val iterationCount = 3L + val configProvider = constructConfigurationProvider( + cbsClientMock, configStateListener, iterationCount + ) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(invalidConfiguration)) + + it("should interrupt the flux") { + StepVerifier.create(configProvider()) + .verifyError() + } + + it("should call state listener when retrying") { + verify(configStateListener, times(iterationCount.toInt())).retrying() + } + } + } + } + +}) + + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + +private val aafCredentials1 = ImmutableAafCredentials.builder() + .username("client") + .password("very secure password") + .build() + +private val aafCredentials2 = ImmutableAafCredentials.builder() + .username("other_client") + .password("another very secure password") + .build() + +private val validConfiguration = JsonParser().parse(""" +{ + "streams_publishes": { + "$PERF3GPP_REGIONAL": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "topic_name": "REG_HVVES_PERF3GPP" + } + }, + "$PERF3GPP_CENTRAL": { + "type": "kafka", + "aaf_credentials": { + "username": "other_client", + "password": "another very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", + "topic_name": "CEN_HVVES_PERF3GPP" + } + } + } +}""").asJsonObject + +private val invalidConfiguration = JsonParser().parse(""" +{ + "streams_publishes": { + "$PERF3GPP_REGIONAL": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "popic_name": "REG_HVVES_PERF3GPP" + } + } + } +}""").asJsonObject + +private val firstRequestDelay = Duration.ofMillis(1) +private val requestInterval = Duration.ofMillis(1) +private val streamParser = StreamFromGsonParsers.kafkaSinkParser() + +private fun constructConfigurationProvider(cbsClientMono: Mono, + configurationStateListener: ConfigurationStateListener, + iterationCount: Long = 1 +): CbsConfigurationProvider { + + val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return CbsConfigurationProvider( + cbsClientMono, + CbsConfiguration(firstRequestDelay, requestInterval), + streamParser, + configurationStateListener, + retry, + { mapOf("k" to "v") } + ) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 1b92d90c..e3156a0d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,8 +39,6 @@ interface SinkProvider : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy } -typealias ConfigurationProvider = () -> Flux - interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 5c64c70b..ba0a9eee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext @@ -33,7 +32,7 @@ interface Collector { } interface CollectorProvider : Closeable { - operator fun invoke(ctx: ClientContext): Option + operator fun invoke(ctx: ClientContext): Collector } interface Server { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt new file mode 100644 index 00000000..04e575ae --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +object AdapterFactory { + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2b29acd9..1c79abd2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,64 +19,45 @@ */ package org.onap.dcae.collectors.veshv.factory -import arrow.core.Option import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk * @since May 2018 */ -class CollectorFactory(private val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: CollectorConfiguration, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maxPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { + private val maxPayloadSizeBytes: Int) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference() - configuration() - .doOnNext { - logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } - logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } - healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - .subscribe(config::set) return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Option = - config.getOption().map { createVesHvCollector(it, ctx) } + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) override fun close() = sinkProvider.close() } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt deleted file mode 100644 index 20b11753..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -object AdapterFactory { - fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() - - fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = - ConfigurationProviderImpl( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - config) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt deleted file mode 100644 index 185693c0..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ /dev/null @@ -1,110 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.google.gson.JsonObject -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal class ConfigurationProviderImpl(private val cbsClientMono: Mono, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - private val streamParser: StreamFromGsonParser, - retrySpec: Retry - -) : ConfigurationProvider { - constructor(cbsClientMono: Mono, params: CbsConfiguration) : this( - cbsClientMono, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - StreamFromGsonParsers.kafkaSinkParser(), - Retry.any() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval) - .jitter(Jitter.random()) - ) - - private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { - log("Exception from configuration provider client, retrying subscription", it.exception()) - } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - } - - override fun invoke(): Flux = - cbsClientMono - .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } - .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } - .retryWhen(retry) - .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } - .flatMapMany(::handleUpdates) - - private fun handleUpdates(cbsClient: CbsClient) = cbsClient - .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), - firstRequestDelay, - requestInterval) - .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createRoutingDescription) - .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } - .retryWhen(retry) - - private fun createRoutingDescription(configuration: JsonObject): Routing = try { - DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - - companion object { - private const val MAX_RETRIES = 5L - private val logger = Logger(ConfigurationProviderImpl::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt deleted file mode 100644 index 2b123fc8..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -class ParsingException(message: String, cause: Throwable) : Exception(message, cause) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index fab96560..3e19414d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { metrics.notifyClientConnected() logger.info(clientContext::fullMdc) { "Handling new client connection" } - return collectorProvider(clientContext).fold( - { - logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } - nettyInbound.closeConnectionAndReturn(Mono.empty()) - }, - handleClient(clientContext, nettyInbound) - ) + val collector = collectorProvider(clientContext) + return collector.handleClient(clientContext, nettyInbound) } - private fun handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound): (Collector) -> Mono = { collector -> + private fun Collector.handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound) = withConnectionFrom(nettyInbound) { connection -> connection .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { - collector.handleConnection(nettyInbound.createDataStream()) + handleConnection(nettyInbound.createDataStream()) } - } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt deleted file mode 100644 index 8616ce03..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt +++ /dev/null @@ -1,207 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.google.gson.JsonParser -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import reactor.core.publisher.Flux - -import reactor.core.publisher.Mono -import reactor.retry.Retry -import reactor.test.StepVerifier -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal object ConfigurationProviderImplTest : Spek({ - - describe("Configuration provider") { - - val cbsClient: CbsClient = mock() - val cbsClientMock: Mono = Mono.just(cbsClient) - val healthStateProvider = HealthState.INSTANCE - - given("configuration is never in cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - - on("waiting for configuration") { - val waitTime = Duration.ofMillis(100) - - it("should not get it") { - StepVerifier.create(configProvider().take(1)) - .expectNoEvent(waitTime) - } - } - } - - given("valid configuration from cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - - on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) - .thenReturn(Flux.just(validConfiguration)) - it("should use received configuration") { - - StepVerifier.create(configProvider().take(1)) - .consumeNextWith { - val route1 = it.elementAt(0) - val route2 = it.elementAt(1) - val receivedSink1 = route1.sink - val receivedSink2 = route2.sink - - assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) - assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) - assertThat(receivedSink1.bootstrapServers()) - .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") - assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") - - assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) - assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) - assertThat(receivedSink2.bootstrapServers()) - .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") - assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") - - }.verifyComplete() - } - } - - } - given("invalid configuration from cbs") { - val iterationCount = 3L - val configProvider = constructConfigurationProvider( - cbsClientMock, healthStateProvider, iterationCount - ) - - on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) - .thenReturn(Flux.just(invalidConfiguration)) - - it("should interrupt the flux") { - StepVerifier.create(configProvider()) - .verifyError() - } - - it("should update the health state") { - StepVerifier.create(healthStateProvider().take(iterationCount)) - .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - .verifyComplete() - } - } - } - } - -}) - - -val PERF3GPP_REGIONAL = "perf3gpp_regional" -val PERF3GPP_CENTRAL = "perf3gpp_central" - -private val aafCredentials1 = ImmutableAafCredentials.builder() - .username("client") - .password("very secure password") - .build() - -private val aafCredentials2 = ImmutableAafCredentials.builder() - .username("other_client") - .password("another very secure password") - .build() - -private val validConfiguration = JsonParser().parse(""" -{ - "streams_publishes": { - "$PERF3GPP_REGIONAL": { - "type": "kafka", - "aaf_credentials": { - "username": "client", - "password": "very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "topic_name": "REG_HVVES_PERF3GPP" - } - }, - "$PERF3GPP_CENTRAL": { - "type": "kafka", - "aaf_credentials": { - "username": "other_client", - "password": "another very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", - "topic_name": "CEN_HVVES_PERF3GPP" - } - } - } -}""").asJsonObject - -private val invalidConfiguration = JsonParser().parse(""" -{ - "streams_publishes": { - "$PERF3GPP_REGIONAL": { - "type": "kafka", - "aaf_credentials": { - "username": "client", - "password": "very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "popic_name": "REG_HVVES_PERF3GPP" - } - } - } -}""").asJsonObject - -private val firstRequestDelay = Duration.ofMillis(1) -private val requestInterval = Duration.ofMillis(1) -private val streamParser = StreamFromGsonParsers.kafkaSinkParser() - -private fun constructConfigurationProvider(cbsClientMono: Mono, - healthState: HealthState, - iterationCount: Long = 1 -): ConfigurationProviderImpl { - - val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - - return ConfigurationProviderImpl( - cbsClientMono, - firstRequestDelay, - requestInterval, - healthState, - streamParser, - retry - ) -} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 61a9a356..35dfba8b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage @@ -56,8 +57,7 @@ object PerformanceSpecification : Spek({ describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(basicRouting), sink) val numMessages: Long = 300_000 val runs = 4 @@ -87,8 +87,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(basicRouting), sink) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index ec540606..1217c471 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.component -import arrow.core.getOrElse import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator @@ -27,6 +26,7 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory @@ -34,8 +34,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink -import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider -import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting @@ -49,27 +47,22 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : Closeable { - val configurationProvider = FakeConfigurationProvider() - val healthStateProvider = FakeHealthState() +class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable { val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( - configurationProvider, + configuration, sinkProvider, metrics, - MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider + MAX_PAYLOAD_SIZE_BYTES ) private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider(ClientContext(alloc)).getOrElse { - throw IllegalStateException("Collector not available.") - } + get() = collectorProvider(ClientContext(alloc)) fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List { @@ -107,16 +100,10 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { private val timeout = Duration.ofSeconds(10) fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = - Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink()) fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = - Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), AlwaysFailingSink()) fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = - Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), DelayingSink(delay)) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 5d215fc5..6a718eea 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER @@ -166,9 +168,7 @@ object VesHvSpecification : Spek({ } it("should be able to direct 2 messages from different domains to one topic") { - val (sut, sink) = vesHvWithStoringSink() - - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -202,150 +202,6 @@ object VesHvSpecification : Spek({ } } - describe("configuration update") { - - val defaultTimeout = Duration.ofSeconds(10) - - given("successful configuration change") { - - lateinit var sut: Sut - lateinit var sink: StoringSink - - beforeEachTest { - vesHvWithStoringSink().run { - sut = first - sink = second - } - } - - it("should update collector") { - val firstCollector = sut.collector - - sut.configurationProvider.updateConfiguration(alternativeRouting) - val collectorAfterUpdate = sut.collector - - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) - } - - it("should start routing messages") { - - sut.configurationProvider.updateConfiguration(emptyRouting) - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).isEmpty() - - sut.configurationProvider.updateConfiguration(basicRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] - - assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should change domain routing") { - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] - - assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - - - sut.configurationProvider.updateConfiguration(alternativeRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] - - assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should update routing for each client sending one message") { - - val messagesAmount = 10 - val messagesForEachTopic = 5 - - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - }.then().block(defaultTimeout) - - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } - - it("should not update routing for client sending continuous stream of messages") { - - val messageStreamSize = 10 - val pivot = 5 - - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(PERF3GPP) } - - - sut.collector.handleConnection(incomingMessages).block(defaultTimeout) - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) - - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) - } - - it("should mark the application healthy") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.HEALTHY) - } - } - - given("failed configuration change") { - val (sut, _) = vesHvWithStoringSink() - sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) - - it("should mark the application unhealthy ") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - } - } - describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { val (sut, sink) = vesHvWithStoringSink() @@ -362,9 +218,8 @@ object VesHvSpecification : Spek({ }) -private fun vesHvWithStoringSink(): Pair { +private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair { val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(routing), sink) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt deleted file mode 100644 index c25771b7..00000000 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import reactor.core.publisher.Flux - -class FakeHealthState : HealthState { - - lateinit var currentHealth: HealthDescription - - override fun changeState(healthDescription: HealthDescription) { - currentHealth = healthDescription - } - - override fun invoke(): Flux { - throw NotImplementedError() - } -} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt deleted file mode 100644 index c465fd91..00000000 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import reactor.core.publisher.FluxProcessor -import reactor.core.publisher.UnicastProcessor -import reactor.retry.RetryExhaustedException - - -class FakeConfigurationProvider : ConfigurationProvider { - private var shouldThrowException = false - private val configStream: FluxProcessor = UnicastProcessor.create() - - fun updateConfiguration(routing: Routing) = - if (shouldThrowException) { - configStream.onError(RetryExhaustedException("I'm so tired")) - } else { - configStream.onNext(routing) - } - - - fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { - this.shouldThrowException = shouldThrowException - } - - override fun invoke() = configStream -} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 059e8028..22d8000e 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.main import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -41,10 +42,25 @@ private val hvVesServer = AtomicReference() private val configurationModule = ConfigurationModule() fun main(args: Array) { + val configStateListener = object : ConfigurationStateListener { + override fun retrying() { + HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + } + } + HealthCheckServer.start(configurationModule.healthCheckPort(args)) configurationModule - .hvVesConfigurationUpdates(args) + .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc) .publishOn(Schedulers.single(Schedulers.elastic())) + .doOnNext { + logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } + HealthState.INSTANCE.changeState(HealthDescription.HEALTHY) + } + .doOnError { + logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } + logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } + HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) + } .doOnNext(::startServer) .doOnError(::logServerStartFailed) .neverComplete() // TODO: remove after merging configuration stream with cbs diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index aed4d928..c079cc59 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -23,8 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.factory.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -59,11 +58,10 @@ object VesServer { private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = CollectorFactory( - AdapterFactory.configurationProvider(config.cbs), + config.collector, AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes, - HealthState.INSTANCE + config.server.maxPayloadSizeBytes ) private fun logServerStarted(handle: ServerHandle) = -- cgit 1.2.3-korg