diff options
23 files changed, 159 insertions, 395 deletions
diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml index 792b9eaa..b6ec4ca2 100644 --- a/sources/hv-collector-configuration/pom.xml +++ b/sources/hv-collector-configuration/pom.xml @@ -77,7 +77,10 @@ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> <artifactId>cbs-client</artifactId> </dependency> - + <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index dd1b171e..9684484b 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException import org.onap.dcae.collectors.veshv.config.api.model.ValidationException -import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader +import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser +import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux class ConfigurationModule { @@ -34,16 +40,28 @@ class ConfigurationModule { private val configReader = FileConfigurationReader() private val configValidator = ConfigurationValidator() - private lateinit var initialConfig: HvVesConfiguration - fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args) - fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> = + fun hvVesConfigurationUpdates(args: Array<String>, + configStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> = Flux.just(cmd.getConfigurationFile(args)) .throwOnLeft { MissingArgumentException(it.message, it.cause) } .map { it.reader().use(configReader::loadConfig) } - .map { configValidator.validate(it) } - .throwOnLeft { ValidationException(it.message) } - .doOnNext { initialConfig = it } + .cache() + .flatMap { basePartialConfig -> + val baseConfig = configValidator.validate(basePartialConfig) + .rightOrThrow { ValidationException(it.message) } + val cbsConfigProvider = CbsConfigurationProvider( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + baseConfig.cbs, + configStateListener, + mdc) + val merger = ConfigurationMerger() + cbsConfigProvider() + .map { merger.merge(basePartialConfig, it) } + .map { configValidator.validate(it) } + .throwOnLeft { ValidationException(it.message) } + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt index 2b123fc8..9fa6660e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.config.api -class ParsingException(message: String, cause: Throwable) : Exception(message, cause) +interface ConfigurationStateListener { + fun retrying() {} +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index 3375821e..c1807be2 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -47,7 +47,5 @@ data class CbsConfiguration( ) data class CollectorConfiguration( - val maxRequestSizeBytes: Int, - val kafkaServers: String, val routing: Routing ) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt index 2fc29829..2fc29829 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt index e5a83ac4..e5a83ac4 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index 185693c0..2038c31a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -17,17 +17,18 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.config.impl +import arrow.core.None +import arrow.core.Option +import arrow.core.Some import com.google.gson.JsonObject -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink @@ -42,69 +43,77 @@ import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter import reactor.retry.Retry -import java.time.Duration /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ -internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - private val streamParser: StreamFromGsonParser<KafkaSink>, - retrySpec: Retry<Any> +internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>, + private val cbsConfiguration: CbsConfiguration, + private val streamParser: StreamFromGsonParser<KafkaSink>, + private val configurationStateListener: ConfigurationStateListener, + retrySpec: Retry<Any>, + private val mdc: MappedDiagnosticContext -) : ConfigurationProvider { - constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this( - cbsClientMono, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - StreamFromGsonParsers.kafkaSinkParser(), - Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval) - .jitter(Jitter.random()) - ) +) { + constructor(cbsClientMono: Mono<CbsClient>, + cbsConfig: CbsConfiguration, + configurationStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext) : + this( + cbsClientMono, + cbsConfig, + StreamFromGsonParsers.kafkaSinkParser(), + configurationStateListener, + Retry.any<Any>() + .retryMax(MAX_RETRIES) + .fixedBackoff(cbsConfig.requestInterval) + .jitter(Jitter.random()), + mdc + ) private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { + logger.withWarn(mdc) { log("Exception from configuration provider client, retrying subscription", it.exception()) } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + configurationStateListener.retrying() } - override fun invoke(): Flux<Routing> = + operator fun invoke(): Flux<PartialConfiguration> = cbsClientMono - .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } - .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } + .doOnNext { logger.info(mdc) { "CBS client successfully created" } } + .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } .retryWhen(retry) - .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } + .doFinally { logger.trace(mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) private fun handleUpdates(cbsClient: CbsClient) = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), - firstRequestDelay, - requestInterval) - .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } + cbsConfiguration.firstRequestDelay, + cbsConfiguration.requestInterval) + .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } .map(::createRoutingDescription) - .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } + .onErrorLog(logger, mdc) { "Error while creating configuration" } .retryWhen(retry) + .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) } - private fun createRoutingDescription(configuration: JsonObject): Routing = try { - DataStreams.namedSinks(configuration) + private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try { + val routes = DataStreams.namedSinks(configuration) .filter(streamOfType(KAFKA)) .map(streamParser::unsafeParse) .map { Route(it.name(), it) } .asIterable() .toList() + Some(routes) } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) + logger.withWarn(mdc) { + log("Invalid streams configuration", e) + } + None } companion object { private const val MAX_RETRIES = 5L - private val logger = Logger(ConfigurationProviderImpl::class) + private val logger = Logger(CbsConfigurationProvider::class) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index 04bba7e2..3e599b58 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -63,9 +63,7 @@ internal class ConfigurationValidator { securityConfiguration, // TOD0: swap when ConfigurationMerger is implemented // collectorConfiguration - CollectorConfiguration(-1, - "I do not exist. I'm not even a URL :o", - emptyList()), + CollectorConfiguration(emptyList()), // end TOD0 logLevel ) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt index 3e93a400..c1a98294 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt @@ -19,7 +19,10 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.* +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try +import arrow.core.getOrElse import org.apache.commons.cli.CommandLine import org.apache.commons.cli.CommandLineParser import org.apache.commons.cli.DefaultParser diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index a27998e1..f3c149cd 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -54,6 +54,6 @@ internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None) internal data class PartialCollectorConfig( val maxRequestSizeBytes: Option<Int> = None, - val kafkaServers: Option<List<InetSocketAddress>> = None, + val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part val routing: Option<Routing> = None ) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt index 8616ce03..0cbc0e4a 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -17,12 +17,14 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.config.impl import com.google.gson.JsonParser import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek @@ -30,13 +32,12 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import reactor.core.publisher.Flux - import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -46,16 +47,16 @@ import java.time.Duration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ -internal object ConfigurationProviderImplTest : Spek({ +internal object CbsConfigurationProviderTest : Spek({ describe("Configuration provider") { val cbsClient: CbsClient = mock() val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient) - val healthStateProvider = HealthState.INSTANCE + val configStateListener: ConfigurationStateListener = mock() given("configuration is never in cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) on("waiting for configuration") { val waitTime = Duration.ofMillis(100) @@ -68,7 +69,7 @@ internal object ConfigurationProviderImplTest : Spek({ } given("valid configuration from cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener) on("new configuration") { whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) @@ -77,8 +78,9 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val route1 = it.elementAt(0) - val route2 = it.elementAt(1) + val routes = it.collector.orNull()!!.routing.orNull()!! + val route1 = routes.elementAt(0) + val route2 = routes.elementAt(1) val receivedSink1 = route1.sink val receivedSink2 = route2.sink @@ -102,7 +104,7 @@ internal object ConfigurationProviderImplTest : Spek({ given("invalid configuration from cbs") { val iterationCount = 3L val configProvider = constructConfigurationProvider( - cbsClientMock, healthStateProvider, iterationCount + cbsClientMock, configStateListener, iterationCount ) on("new configuration") { @@ -114,11 +116,8 @@ internal object ConfigurationProviderImplTest : Spek({ .verifyError() } - it("should update the health state") { - StepVerifier.create(healthStateProvider().take(iterationCount)) - .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - .verifyComplete() + it("should call state listener when retrying") { + verify(configStateListener, times(iterationCount.toInt())).retrying() } } } @@ -190,18 +189,18 @@ private val requestInterval = Duration.ofMillis(1) private val streamParser = StreamFromGsonParsers.kafkaSinkParser() private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, - healthState: HealthState, + configurationStateListener: ConfigurationStateListener, iterationCount: Long = 1 -): ConfigurationProviderImpl { +): CbsConfigurationProvider { val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - return ConfigurationProviderImpl( + return CbsConfigurationProvider( cbsClientMono, - firstRequestDelay, - requestInterval, - healthState, + CbsConfiguration(firstRequestDelay, requestInterval), streamParser, - retry + configurationStateListener, + retry, + { mapOf("k" to "v") } ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 1b92d90c..e3156a0d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,8 +39,6 @@ interface SinkProvider : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> } -typealias ConfigurationProvider = () -> Flux<Routing> - interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 5c64c70b..ba0a9eee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext @@ -33,7 +32,7 @@ interface Collector { } interface CollectorProvider : Closeable { - operator fun invoke(ctx: ClientContext): Option<Collector> + operator fun invoke(ctx: ClientContext): Collector } interface Server { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt index 20b11753..04e575ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -17,14 +17,10 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -32,9 +28,4 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti */ object AdapterFactory { fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() - - fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = - ConfigurationProviderImpl( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - config) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2b29acd9..1c79abd2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,64 +19,45 @@ */ package org.onap.dcae.collectors.veshv.factory -import arrow.core.Option import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(private val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: CollectorConfiguration, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maxPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { + private val maxPayloadSizeBytes: Int) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Routing>() - configuration() - .doOnNext { - logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } - logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } - healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - .subscribe(config::set) return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Option<Collector> = - config.getOption().map { createVesHvCollector(it, ctx) } + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) override fun close() = sinkProvider.close() } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index fab96560..3e19414d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() logger.info(clientContext::fullMdc) { "Handling new client connection" } - return collectorProvider(clientContext).fold( - { - logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } - nettyInbound.closeConnectionAndReturn(Mono.empty<Void>()) - }, - handleClient(clientContext, nettyInbound) - ) + val collector = collectorProvider(clientContext) + return collector.handleClient(clientContext, nettyInbound) } - private fun handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector -> + private fun Collector.handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound) = withConnectionFrom(nettyInbound) { connection -> connection .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { - collector.handleConnection(nettyInbound.createDataStream()) + handleConnection(nettyInbound.createDataStream()) } - } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 61a9a356..35dfba8b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage @@ -56,8 +57,7 @@ object PerformanceSpecification : Spek({ describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(basicRouting), sink) val numMessages: Long = 300_000 val runs = 4 @@ -87,8 +87,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(basicRouting), sink) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index ec540606..1217c471 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.component -import arrow.core.getOrElse import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator @@ -27,6 +26,7 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory @@ -34,8 +34,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink -import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider -import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting @@ -49,27 +47,22 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : Closeable { - val configurationProvider = FakeConfigurationProvider() - val healthStateProvider = FakeHealthState() +class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable { val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( - configurationProvider, + configuration, sinkProvider, metrics, - MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider + MAX_PAYLOAD_SIZE_BYTES ) private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider(ClientContext(alloc)).getOrElse { - throw IllegalStateException("Collector not available.") - } + get() = collectorProvider(ClientContext(alloc)) fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { @@ -107,16 +100,10 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { private val timeout = Duration.ofSeconds(10) fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = - Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink()) fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = - Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), AlwaysFailingSink()) fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = - Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), DelayingSink(delay)) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 5d215fc5..6a718eea 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER @@ -166,9 +168,7 @@ object VesHvSpecification : Spek({ } it("should be able to direct 2 messages from different domains to one topic") { - val (sut, sink) = vesHvWithStoringSink() - - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -202,150 +202,6 @@ object VesHvSpecification : Spek({ } } - describe("configuration update") { - - val defaultTimeout = Duration.ofSeconds(10) - - given("successful configuration change") { - - lateinit var sut: Sut - lateinit var sink: StoringSink - - beforeEachTest { - vesHvWithStoringSink().run { - sut = first - sink = second - } - } - - it("should update collector") { - val firstCollector = sut.collector - - sut.configurationProvider.updateConfiguration(alternativeRouting) - val collectorAfterUpdate = sut.collector - - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) - } - - it("should start routing messages") { - - sut.configurationProvider.updateConfiguration(emptyRouting) - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).isEmpty() - - sut.configurationProvider.updateConfiguration(basicRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] - - assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should change domain routing") { - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] - - assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - - - sut.configurationProvider.updateConfiguration(alternativeRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] - - assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should update routing for each client sending one message") { - - val messagesAmount = 10 - val messagesForEachTopic = 5 - - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - }.then().block(defaultTimeout) - - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } - - it("should not update routing for client sending continuous stream of messages") { - - val messageStreamSize = 10 - val pivot = 5 - - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(PERF3GPP) } - - - sut.collector.handleConnection(incomingMessages).block(defaultTimeout) - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) - - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) - } - - it("should mark the application healthy") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.HEALTHY) - } - } - - given("failed configuration change") { - val (sut, _) = vesHvWithStoringSink() - sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) - - it("should mark the application unhealthy ") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - } - } - describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { val (sut, sink) = vesHvWithStoringSink() @@ -362,9 +218,8 @@ object VesHvSpecification : Spek({ }) -private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { +private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> { val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(routing), sink) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt deleted file mode 100644 index c25771b7..00000000 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import reactor.core.publisher.Flux - -class FakeHealthState : HealthState { - - lateinit var currentHealth: HealthDescription - - override fun changeState(healthDescription: HealthDescription) { - currentHealth = healthDescription - } - - override fun invoke(): Flux<HealthDescription> { - throw NotImplementedError() - } -} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt deleted file mode 100644 index c465fd91..00000000 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import reactor.core.publisher.FluxProcessor -import reactor.core.publisher.UnicastProcessor -import reactor.retry.RetryExhaustedException - - -class FakeConfigurationProvider : ConfigurationProvider { - private var shouldThrowException = false - private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() - - fun updateConfiguration(routing: Routing) = - if (shouldThrowException) { - configStream.onError(RetryExhaustedException("I'm so tired")) - } else { - configStream.onNext(routing) - } - - - fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { - this.shouldThrowException = shouldThrowException - } - - override fun invoke() = configStream -} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 059e8028..22d8000e 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.main import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -41,10 +42,25 @@ private val hvVesServer = AtomicReference<ServerHandle>() private val configurationModule = ConfigurationModule() fun main(args: Array<String>) { + val configStateListener = object : ConfigurationStateListener { + override fun retrying() { + HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + } + } + HealthCheckServer.start(configurationModule.healthCheckPort(args)) configurationModule - .hvVesConfigurationUpdates(args) + .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc) .publishOn(Schedulers.single(Schedulers.elastic())) + .doOnNext { + logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } + HealthState.INSTANCE.changeState(HealthDescription.HEALTHY) + } + .doOnError { + logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } + logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } + HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) + } .doOnNext(::startServer) .doOnError(::logServerStartFailed) .neverComplete() // TODO: remove after merging configuration stream with cbs diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index aed4d928..c079cc59 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -23,8 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.factory.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -59,11 +58,10 @@ object VesServer { private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = CollectorFactory( - AdapterFactory.configurationProvider(config.cbs), + config.collector, AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes, - HealthState.INSTANCE + config.server.maxPayloadSizeBytes ) private fun logServerStarted(handle: ServerHandle) = |