From 1f54619b7e8f22bf1b0474c5ec6437f9716138cd Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 31 Jul 2018 09:28:29 +0200 Subject: Fix NPE when getting Consul configuration No initial value for AtomicReference was provided hence we had a little race condition. Retry when consul returns error. Change-Id: Ie38ca7fbf445123e98ee94703eba501bb5233fab Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../collectors/veshv/factory/CollectorFactory.kt | 37 +++++++++--- .../veshv/impl/adapters/AdapterFactory.kt | 5 +- .../impl/adapters/ConsulConfigurationProvider.kt | 65 +++++++++++----------- 3 files changed, 64 insertions(+), 43 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index b52f959f..7ce49a82 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -28,9 +28,12 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.adapters.ConsulConfigurationProvider import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import reactor.core.publisher.Flux +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5 import java.util.concurrent.atomic.AtomicReference /** @@ -42,15 +45,32 @@ class CollectorFactory(val configuration: ConfigurationProvider, private val metrics: Metrics) { fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference = AtomicReference() - createVesHvCollector().subscribe(collector::set) + val initialValue = createVesHvCollector(defaultConfiguration()) + val collector: AtomicReference = AtomicReference(initialValue) + configuration() + .map(this::createVesHvCollector) + .doOnNext { logger.info("Using updated configuration for new connections") } + .doOnError { + logger.error("Shutting down", it) + // TODO: create Health class + // It should monitor all incidents and expose the results for the + // container health check mechanism + System.exit(ERROR_CODE) + } + .subscribe(collector::set) return collector::get } - private fun createVesHvCollector(): Flux = - configuration() - .doOnError { System.exit(ERROR_CODE) } - .map(this::createVesHvCollector) + private fun defaultConfiguration() = + CollectorConfiguration( + kafkaBootstrapServers = "kafka:9092", + routing = routing { + defineRoute { + fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS) + toTopic("ves_hvRanMeas") + withFixedPartitioning() + } + }.build()) private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( @@ -62,7 +82,8 @@ class CollectorFactory(val configuration: ConfigurationProvider, } companion object { - const val ERROR_CODE = 3 + private const val ERROR_CODE = 3 + private val logger = Logger(CollectorFactory::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 11a0e9bd..7248db6e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -36,11 +36,8 @@ object AdapterFactory { fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = ConsulConfigurationProvider( - configurationProviderParams.configurationUrl, httpAdapter(), - configurationProviderParams.firstRequestDelay, - configurationProviderParams.requestInterval - ) + configurationProviderParams) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index aca0e7e9..6f04c95c 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.routing -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry import java.io.StringReader import java.time.Duration import java.util.* @@ -39,41 +41,40 @@ import javax.json.JsonObject * @author Jakub Dudycz * @since May 2018 */ -internal class ConsulConfigurationProvider(private val url: String, - private val http: HttpAdapter, +internal class ConsulConfigurationProvider(private val http: HttpAdapter, + private val url: String, private val firstRequestDelay: Duration, - private val requestInterval: Duration + private val requestInterval: Duration, + retrySpec: Retry ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference = AtomicReference(0) + private val retry = retrySpec + .doOnRetry { + logger.warn("Could not get fresh configuration", it.exception()) + } + + constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( + http, + params.configurationUrl, + params.firstRequestDelay, + params.requestInterval, + Retry.any() + .retryMax(MAX_RETRIES) + .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) + .jitter(Jitter.random())) override fun invoke(): Flux = - Flux.concat(createDefaultConfigurationFlux(), createConsulFlux()) + Flux.interval(firstRequestDelay, requestInterval) + .flatMap { askForConfig() } + .map(::parseJsonResponse) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) + .retryWhen(retry) - private fun createDefaultConfigurationFlux(): Mono = Mono.just( - CollectorConfiguration( - kafkaBootstrapServers = "kafka:9092", - routing = routing { - defineRoute { - fromDomain(HVRANMEAS) - toTopic("ves_hvRanMeas") - withFixedPartitioning() - } - }.build()) - ).doOnNext { logger.info("Applied default configuration") } - - private fun createConsulFlux(): Flux = Flux - .interval(firstRequestDelay, requestInterval) - .flatMap { http.get(url) } - .doOnError { - logger.error("Encountered an error " + - "when trying to acquire configuration from consul. Shutting down..") - } - .map(::parseJsonResponse) - .map(::extractEncodedConfiguration) - .flatMap(::filterDifferentValues) - .map(::decodeConfiguration) - .map(::createCollectorConfiguration) + private fun askForConfig(): Mono = http.get(url) private fun parseJsonResponse(responseString: String): JsonObject = Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() @@ -118,7 +119,9 @@ internal class ConsulConfigurationProvider(private val url: String, } companion object { - private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) + private const val MAX_RETRIES = 5 + private const val BACKOFF_INTERVAL_FACTOR = 30L + private val logger = Logger(ConsulConfigurationProvider::class) } } -- cgit 1.2.3-korg