aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt65
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt62
4 files changed, 92 insertions, 77 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index b52f959f..7ce49a82 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -28,9 +28,12 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.adapters.ConsulConfigurationProvider
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import reactor.core.publisher.Flux
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5
import java.util.concurrent.atomic.AtomicReference
/**
@@ -42,15 +45,32 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private val metrics: Metrics) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
- createVesHvCollector().subscribe(collector::set)
+ val initialValue = createVesHvCollector(defaultConfiguration())
+ val collector: AtomicReference<Collector> = AtomicReference(initialValue)
+ configuration()
+ .map(this::createVesHvCollector)
+ .doOnNext { logger.info("Using updated configuration for new connections") }
+ .doOnError {
+ logger.error("Shutting down", it)
+ // TODO: create Health class
+ // It should monitor all incidents and expose the results for the
+ // container health check mechanism
+ System.exit(ERROR_CODE)
+ }
+ .subscribe(collector::set)
return collector::get
}
- private fun createVesHvCollector(): Flux<Collector> =
- configuration()
- .doOnError { System.exit(ERROR_CODE) }
- .map(this::createVesHvCollector)
+ private fun defaultConfiguration() =
+ CollectorConfiguration(
+ kafkaBootstrapServers = "kafka:9092",
+ routing = routing {
+ defineRoute {
+ fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
+ toTopic("ves_hvRanMeas")
+ withFixedPartitioning()
+ }
+ }.build())
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
@@ -62,7 +82,8 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
companion object {
- const val ERROR_CODE = 3
+ private const val ERROR_CODE = 3
+ private val logger = Logger(CollectorFactory::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 11a0e9bd..7248db6e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -36,11 +36,8 @@ object AdapterFactory {
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
ConsulConfigurationProvider(
- configurationProviderParams.configurationUrl,
httpAdapter(),
- configurationProviderParams.firstRequestDelay,
- configurationProviderParams.requestInterval
- )
+ configurationProviderParams)
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index aca0e7e9..6f04c95c 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
import java.util.*
@@ -39,41 +41,40 @@ import javax.json.JsonObject
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class ConsulConfigurationProvider(private val url: String,
- private val http: HttpAdapter,
+internal class ConsulConfigurationProvider(private val http: HttpAdapter,
+ private val url: String,
private val firstRequestDelay: Duration,
- private val requestInterval: Duration
+ private val requestInterval: Duration,
+ retrySpec: Retry<Any>
) : ConfigurationProvider {
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
+ private val retry = retrySpec
+ .doOnRetry {
+ logger.warn("Could not get fresh configuration", it.exception())
+ }
+
+ constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+ http,
+ params.configurationUrl,
+ params.firstRequestDelay,
+ params.requestInterval,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
+ .jitter(Jitter.random()))
override fun invoke(): Flux<CollectorConfiguration> =
- Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
+ Flux.interval(firstRequestDelay, requestInterval)
+ .flatMap { askForConfig() }
+ .map(::parseJsonResponse)
+ .map(::extractEncodedConfiguration)
+ .flatMap(::filterDifferentValues)
+ .map(::decodeConfiguration)
+ .map(::createCollectorConfiguration)
+ .retryWhen(retry)
- private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
- CollectorConfiguration(
- kafkaBootstrapServers = "kafka:9092",
- routing = routing {
- defineRoute {
- fromDomain(HVRANMEAS)
- toTopic("ves_hvRanMeas")
- withFixedPartitioning()
- }
- }.build())
- ).doOnNext { logger.info("Applied default configuration") }
-
- private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
- .interval(firstRequestDelay, requestInterval)
- .flatMap { http.get(url) }
- .doOnError {
- logger.error("Encountered an error " +
- "when trying to acquire configuration from consul. Shutting down..")
- }
- .map(::parseJsonResponse)
- .map(::extractEncodedConfiguration)
- .flatMap(::filterDifferentValues)
- .map(::decodeConfiguration)
- .map(::createCollectorConfiguration)
+ private fun askForConfig(): Mono<String> = http.get(url)
private fun parseJsonResponse(responseString: String): JsonObject =
Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
@@ -118,7 +119,9 @@ internal class ConsulConfigurationProvider(private val url: String,
}
companion object {
- private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+ private const val MAX_RETRIES = 5
+ private const val BACKOFF_INTERVAL_FACTOR = 30L
+ private val logger = Logger(ConsulConfigurationProvider::class)
}
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index f4c527a4..1626c028 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,6 +28,7 @@ import org.jetbrains.spek.api.dsl.it
import org.mockito.Mockito
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
+import reactor.retry.Retry
import reactor.test.StepVerifier
import java.time.Duration
import java.util.*
@@ -42,27 +43,24 @@ internal object ConsulConfigurationProviderTest : Spek({
val httpAdapterMock: HttpAdapter = mock()
val firstRequestDelay = Duration.ofMillis(1)
val requestInterval = Duration.ofMillis(1)
+ val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1))
given("valid resource url") {
val validUrl = "http://valid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval)
+ val consulConfigProvider = ConsulConfigurationProvider(
+ httpAdapterMock,
+ validUrl,
+ firstRequestDelay,
+ requestInterval,
+ retry)
whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
.thenReturn(Mono.just(constructConsulResponse()))
- it("should use default configuration at the beginning, " +
- "then apply received configuration") {
+ it("should use received configuration") {
- StepVerifier.create(consulConfigProvider().take(2))
- .consumeNextWith {
-
- assertEquals("kafka:9092", it.kafkaBootstrapServers)
-
- val route1 = it.routing.routes[0]
- assertEquals(Domain.HVRANMEAS, route1.domain)
- assertEquals("ves_hvRanMeas", route1.targetTopic)
- }
+ StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
assertEquals("kafka:9093", it.kafkaBootstrapServers)
@@ -81,23 +79,19 @@ internal object ConsulConfigurationProviderTest : Spek({
given("invalid resource url") {
val invalidUrl = "http://invalid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval)
+ val consulConfigProvider = ConsulConfigurationProvider(
+ httpAdapterMock,
+ invalidUrl,
+ firstRequestDelay,
+ requestInterval,
+ retry)
whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
.thenReturn(Mono.error(RuntimeException("Test exception")))
- it("should use default configuration at the beginning, then should interrupt the flux") {
+ it("should interrupt the flux") {
StepVerifier.create(consulConfigProvider())
- .consumeNextWith {
-
-
- assertEquals("kafka:9092", it.kafkaBootstrapServers)
-
- val route1 = it.routing.routes[0]
- assertEquals(Domain.HVRANMEAS, route1.domain)
- assertEquals("ves_hvRanMeas", route1.targetTopic)
- }
.verifyErrorMessage("Test exception")
}
}
@@ -106,18 +100,18 @@ internal object ConsulConfigurationProviderTest : Spek({
fun constructConsulResponse(): String {
val config = """{
- "kafkaBootstrapServers": "kafka:9093",
- "routing": [
- {
- "fromDomain": 1,
- "toTopic": "test-topic-1"
- },
- {
- "fromDomain": 2,
- "toTopic": "test-topic-2"
- }
+ "kafkaBootstrapServers": "kafka:9093",
+ "routing": [
+ {
+ "fromDomain": 1,
+ "toTopic": "test-topic-1"
+ },
+ {
+ "fromDomain": 2,
+ "toTopic": "test-topic-2"
+ }
]
- }"""
+}"""
val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))