aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-07-31 09:28:29 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-03 10:49:23 +0200
commit1f54619b7e8f22bf1b0474c5ec6437f9716138cd (patch)
tree1020a8df4b9d79b3467500d087118c1ea0d0c90a /hv-collector-core/src/main/kotlin
parentd76905b9c98ec32f17bb9568ff80c04068aa213e (diff)
Fix NPE when getting Consul configuration
No initial value for AtomicReference was provided hence we had a little race condition. Retry when consul returns error. Change-Id: Ie38ca7fbf445123e98ee94703eba501bb5233fab Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt65
3 files changed, 64 insertions, 43 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index b52f959f..7ce49a82 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -28,9 +28,12 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.adapters.ConsulConfigurationProvider
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import reactor.core.publisher.Flux
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5
import java.util.concurrent.atomic.AtomicReference
/**
@@ -42,15 +45,32 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private val metrics: Metrics) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
- createVesHvCollector().subscribe(collector::set)
+ val initialValue = createVesHvCollector(defaultConfiguration())
+ val collector: AtomicReference<Collector> = AtomicReference(initialValue)
+ configuration()
+ .map(this::createVesHvCollector)
+ .doOnNext { logger.info("Using updated configuration for new connections") }
+ .doOnError {
+ logger.error("Shutting down", it)
+ // TODO: create Health class
+ // It should monitor all incidents and expose the results for the
+ // container health check mechanism
+ System.exit(ERROR_CODE)
+ }
+ .subscribe(collector::set)
return collector::get
}
- private fun createVesHvCollector(): Flux<Collector> =
- configuration()
- .doOnError { System.exit(ERROR_CODE) }
- .map(this::createVesHvCollector)
+ private fun defaultConfiguration() =
+ CollectorConfiguration(
+ kafkaBootstrapServers = "kafka:9092",
+ routing = routing {
+ defineRoute {
+ fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
+ toTopic("ves_hvRanMeas")
+ withFixedPartitioning()
+ }
+ }.build())
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
@@ -62,7 +82,8 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
companion object {
- const val ERROR_CODE = 3
+ private const val ERROR_CODE = 3
+ private val logger = Logger(CollectorFactory::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 11a0e9bd..7248db6e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -36,11 +36,8 @@ object AdapterFactory {
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
ConsulConfigurationProvider(
- configurationProviderParams.configurationUrl,
httpAdapter(),
- configurationProviderParams.firstRequestDelay,
- configurationProviderParams.requestInterval
- )
+ configurationProviderParams)
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index aca0e7e9..6f04c95c 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
import java.util.*
@@ -39,41 +41,40 @@ import javax.json.JsonObject
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class ConsulConfigurationProvider(private val url: String,
- private val http: HttpAdapter,
+internal class ConsulConfigurationProvider(private val http: HttpAdapter,
+ private val url: String,
private val firstRequestDelay: Duration,
- private val requestInterval: Duration
+ private val requestInterval: Duration,
+ retrySpec: Retry<Any>
) : ConfigurationProvider {
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
+ private val retry = retrySpec
+ .doOnRetry {
+ logger.warn("Could not get fresh configuration", it.exception())
+ }
+
+ constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+ http,
+ params.configurationUrl,
+ params.firstRequestDelay,
+ params.requestInterval,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
+ .jitter(Jitter.random()))
override fun invoke(): Flux<CollectorConfiguration> =
- Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
+ Flux.interval(firstRequestDelay, requestInterval)
+ .flatMap { askForConfig() }
+ .map(::parseJsonResponse)
+ .map(::extractEncodedConfiguration)
+ .flatMap(::filterDifferentValues)
+ .map(::decodeConfiguration)
+ .map(::createCollectorConfiguration)
+ .retryWhen(retry)
- private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
- CollectorConfiguration(
- kafkaBootstrapServers = "kafka:9092",
- routing = routing {
- defineRoute {
- fromDomain(HVRANMEAS)
- toTopic("ves_hvRanMeas")
- withFixedPartitioning()
- }
- }.build())
- ).doOnNext { logger.info("Applied default configuration") }
-
- private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
- .interval(firstRequestDelay, requestInterval)
- .flatMap { http.get(url) }
- .doOnError {
- logger.error("Encountered an error " +
- "when trying to acquire configuration from consul. Shutting down..")
- }
- .map(::parseJsonResponse)
- .map(::extractEncodedConfiguration)
- .flatMap(::filterDifferentValues)
- .map(::decodeConfiguration)
- .map(::createCollectorConfiguration)
+ private fun askForConfig(): Mono<String> = http.get(url)
private fun parseJsonResponse(responseString: String): JsonObject =
Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
@@ -118,7 +119,9 @@ internal class ConsulConfigurationProvider(private val url: String,
}
companion object {
- private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+ private const val MAX_RETRIES = 5
+ private const val BACKOFF_INTERVAL_FACTOR = 30L
+ private val logger = Logger(ConsulConfigurationProvider::class)
}
}