diff options
5 files changed, 92 insertions, 78 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index b52f959f..7ce49a82 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -28,9 +28,12 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.adapters.ConsulConfigurationProvider import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import reactor.core.publisher.Flux +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5 import java.util.concurrent.atomic.AtomicReference /** @@ -42,15 +45,32 @@ class CollectorFactory(val configuration: ConfigurationProvider, private val metrics: Metrics) { fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() - createVesHvCollector().subscribe(collector::set) + val initialValue = createVesHvCollector(defaultConfiguration()) + val collector: AtomicReference<Collector> = AtomicReference(initialValue) + configuration() + .map(this::createVesHvCollector) + .doOnNext { logger.info("Using updated configuration for new connections") } + .doOnError { + logger.error("Shutting down", it) + // TODO: create Health class + // It should monitor all incidents and expose the results for the + // container health check mechanism + System.exit(ERROR_CODE) + } + .subscribe(collector::set) return collector::get } - private fun createVesHvCollector(): Flux<Collector> = - configuration() - .doOnError { System.exit(ERROR_CODE) } - .map(this::createVesHvCollector) + private fun defaultConfiguration() = + CollectorConfiguration( + kafkaBootstrapServers = "kafka:9092", + routing = routing { + defineRoute { + fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS) + toTopic("ves_hvRanMeas") + withFixedPartitioning() + } + }.build()) private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( @@ -62,7 +82,8 @@ class CollectorFactory(val configuration: ConfigurationProvider, } companion object { - const val ERROR_CODE = 3 + private const val ERROR_CODE = 3 + private val logger = Logger(CollectorFactory::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 11a0e9bd..7248db6e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -36,11 +36,8 @@ object AdapterFactory { fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = ConsulConfigurationProvider( - configurationProviderParams.configurationUrl, httpAdapter(), - configurationProviderParams.firstRequestDelay, - configurationProviderParams.requestInterval - ) + configurationProviderParams) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index aca0e7e9..6f04c95c 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.routing -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry import java.io.StringReader import java.time.Duration import java.util.* @@ -39,41 +41,40 @@ import javax.json.JsonObject * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ -internal class ConsulConfigurationProvider(private val url: String, - private val http: HttpAdapter, +internal class ConsulConfigurationProvider(private val http: HttpAdapter, + private val url: String, private val firstRequestDelay: Duration, - private val requestInterval: Duration + private val requestInterval: Duration, + retrySpec: Retry<Any> ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) + private val retry = retrySpec + .doOnRetry { + logger.warn("Could not get fresh configuration", it.exception()) + } + + constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( + http, + params.configurationUrl, + params.firstRequestDelay, + params.requestInterval, + Retry.any<Any>() + .retryMax(MAX_RETRIES) + .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) + .jitter(Jitter.random())) override fun invoke(): Flux<CollectorConfiguration> = - Flux.concat(createDefaultConfigurationFlux(), createConsulFlux()) + Flux.interval(firstRequestDelay, requestInterval) + .flatMap { askForConfig() } + .map(::parseJsonResponse) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) + .retryWhen(retry) - private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just( - CollectorConfiguration( - kafkaBootstrapServers = "kafka:9092", - routing = routing { - defineRoute { - fromDomain(HVRANMEAS) - toTopic("ves_hvRanMeas") - withFixedPartitioning() - } - }.build()) - ).doOnNext { logger.info("Applied default configuration") } - - private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux - .interval(firstRequestDelay, requestInterval) - .flatMap { http.get(url) } - .doOnError { - logger.error("Encountered an error " + - "when trying to acquire configuration from consul. Shutting down..") - } - .map(::parseJsonResponse) - .map(::extractEncodedConfiguration) - .flatMap(::filterDifferentValues) - .map(::decodeConfiguration) - .map(::createCollectorConfiguration) + private fun askForConfig(): Mono<String> = http.get(url) private fun parseJsonResponse(responseString: String): JsonObject = Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() @@ -118,7 +119,9 @@ internal class ConsulConfigurationProvider(private val url: String, } companion object { - private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) + private const val MAX_RETRIES = 5 + private const val BACKOFF_INTERVAL_FACTOR = 30L + private val logger = Logger(ConsulConfigurationProvider::class) } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index f4c527a4..1626c028 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,6 +28,7 @@ import org.jetbrains.spek.api.dsl.it import org.mockito.Mockito import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono +import reactor.retry.Retry import reactor.test.StepVerifier import java.time.Duration import java.util.* @@ -42,27 +43,24 @@ internal object ConsulConfigurationProviderTest : Spek({ val httpAdapterMock: HttpAdapter = mock() val firstRequestDelay = Duration.ofMillis(1) val requestInterval = Duration.ofMillis(1) + val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1)) given("valid resource url") { val validUrl = "http://valid-url/" - val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval) + val consulConfigProvider = ConsulConfigurationProvider( + httpAdapterMock, + validUrl, + firstRequestDelay, + requestInterval, + retry) whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) .thenReturn(Mono.just(constructConsulResponse())) - it("should use default configuration at the beginning, " + - "then apply received configuration") { + it("should use received configuration") { - StepVerifier.create(consulConfigProvider().take(2)) - .consumeNextWith { - - assertEquals("kafka:9092", it.kafkaBootstrapServers) - - val route1 = it.routing.routes[0] - assertEquals(Domain.HVRANMEAS, route1.domain) - assertEquals("ves_hvRanMeas", route1.targetTopic) - } + StepVerifier.create(consulConfigProvider().take(1)) .consumeNextWith { assertEquals("kafka:9093", it.kafkaBootstrapServers) @@ -81,23 +79,19 @@ internal object ConsulConfigurationProviderTest : Spek({ given("invalid resource url") { val invalidUrl = "http://invalid-url/" - val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval) + val consulConfigProvider = ConsulConfigurationProvider( + httpAdapterMock, + invalidUrl, + firstRequestDelay, + requestInterval, + retry) whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) .thenReturn(Mono.error(RuntimeException("Test exception"))) - it("should use default configuration at the beginning, then should interrupt the flux") { + it("should interrupt the flux") { StepVerifier.create(consulConfigProvider()) - .consumeNextWith { - - - assertEquals("kafka:9092", it.kafkaBootstrapServers) - - val route1 = it.routing.routes[0] - assertEquals(Domain.HVRANMEAS, route1.domain) - assertEquals("ves_hvRanMeas", route1.targetTopic) - } .verifyErrorMessage("Test exception") } } @@ -106,18 +100,18 @@ internal object ConsulConfigurationProviderTest : Spek({ fun constructConsulResponse(): String { val config = """{ - "kafkaBootstrapServers": "kafka:9093", - "routing": [ - { - "fromDomain": 1, - "toTopic": "test-topic-1" - }, - { - "fromDomain": 2, - "toTopic": "test-topic-2" - } + "kafkaBootstrapServers": "kafka:9093", + "routing": [ + { + "fromDomain": 1, + "toTopic": "test-topic-1" + }, + { + "fromDomain": 2, + "toTopic": "test-topic-2" + } ] - }""" +}""" val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index cfc44bcd..aa3fdc39 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -34,5 +34,4 @@ class FakeMetrics: Metrics { override fun notifyMessageSent(topic: String) { } - }
\ No newline at end of file |