From 67702df781ab8acab8cd7375c0ce5ee91fc3debe Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Wed, 8 Aug 2018 09:17:14 +0200 Subject: Implement simple health check mechanism Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f Issue-ID: DCAEGEN2-659 Signed-off-by: Jakub Dudycz --- .../collectors/veshv/factory/CollectorFactory.kt | 18 ++-- .../veshv/impl/adapters/AdapterFactory.kt | 5 +- .../impl/adapters/ConsulConfigurationProvider.kt | 13 ++- .../adapters/ConsulConfigurationProviderTest.kt | 113 +++++++++++++-------- 4 files changed, 93 insertions(+), 56 deletions(-) (limited to 'hv-collector-core/src') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 7be24d23..3e652b92 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference */ class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, - private val metrics: Metrics) { + private val metrics: Metrics, + private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() configuration() .map(this::createVesHvCollector) - .doOnNext { logger.info("Using updated configuration for new connections") } + .doOnNext { + logger.info("Using updated configuration for new connections") + healthStateProvider.changeState(HealthState.HEALTHY) + } .doOnError { - logger.error("Shutting down", it) - // TODO: create Health class - // It should monitor all incidents and expose the results for the - // container health check mechanism - System.exit(ERROR_CODE) + logger.error("Failed to acquire configuration from consul") + healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get @@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider, } companion object { - private const val ERROR_CODE = 3 private val logger = Logger(CollectorFactory::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 7248db6e..07b5c82e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient -import java.time.Duration /** * @author Piotr Jaszczyk @@ -35,9 +34,7 @@ object AdapterFactory { fun loggingSink(): SinkProvider = LoggingSinkProvider() fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider( - httpAdapter(), - configurationProviderParams) + ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 6f04c95c..81463039 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter @@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, + private val healthStateProvider: HealthStateProvider, retrySpec: Retry + ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference = AtomicReference(0) private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) + healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) } - constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( + constructor(http: HttpAdapter, + params: ConfigurationProviderParams) : this( http, params.configurationUrl, params.firstRequestDelay, params.requestInterval, + HealthStateProvider.INSTANCE, Retry.any() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random())) + .jitter(Jitter.random()) + ) override fun invoke(): Flux = Flux.interval(firstRequestDelay, requestInterval) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 1626c028..f9a9ba60 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -23,9 +23,13 @@ import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.whenever import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.retry.Retry @@ -40,62 +44,89 @@ import kotlin.test.assertEquals */ internal object ConsulConfigurationProviderTest : Spek({ - val httpAdapterMock: HttpAdapter = mock() - val firstRequestDelay = Duration.ofMillis(1) - val requestInterval = Duration.ofMillis(1) - val retry = Retry.onlyIf { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1)) + describe("Consul configuration provider") { - given("valid resource url") { + val httpAdapterMock: HttpAdapter = mock() + val healthStateProvider = HealthStateProvider.INSTANCE - val validUrl = "http://valid-url/" - val consulConfigProvider = ConsulConfigurationProvider( - httpAdapterMock, - validUrl, - firstRequestDelay, - requestInterval, - retry) + given("valid resource url") { + val validUrl = "http://valid-url/" + val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) - whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) - .thenReturn(Mono.just(constructConsulResponse())) + on("call to consul") { + whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) + .thenReturn(Mono.just(constructConsulResponse())) - it("should use received configuration") { + it("should use received configuration") { - StepVerifier.create(consulConfigProvider().take(1)) - .consumeNextWith { + StepVerifier.create(consulConfigProvider().take(1)) + .consumeNextWith { - assertEquals("kafka:9093", it.kafkaBootstrapServers) + assertEquals("kafka:9093", it.kafkaBootstrapServers) - val route1 = it.routing.routes[0] - assertEquals(Domain.FAULT, route1.domain) - assertEquals("test-topic-1", route1.targetTopic) + val route1 = it.routing.routes[0] + assertEquals(Domain.FAULT, route1.domain) + assertEquals("test-topic-1", route1.targetTopic) - val route2 = it.routing.routes[1] - assertEquals(Domain.HEARTBEAT, route2.domain) - assertEquals("test-topic-2", route2.targetTopic) + val route2 = it.routing.routes[1] + assertEquals(Domain.HEARTBEAT, route2.domain) + assertEquals("test-topic-2", route2.targetTopic) + + }.verifyComplete() + } + } - }.verifyComplete() + } + given("invalid resource url") { + val invalidUrl = "http://invalid-url/" + + val iterationCount = 3L + val consulConfigProvider = constructConsulConfigProvider( + invalidUrl, httpAdapterMock, healthStateProvider, iterationCount + ) + + on("call to consul") { + whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) + .thenReturn(Mono.error(RuntimeException("Test exception"))) + + it("should interrupt the flux") { + + StepVerifier.create(consulConfigProvider()) + .verifyErrorMessage("Test exception") + } + + it("should update the health state"){ + StepVerifier.create(healthStateProvider().take(iterationCount)) + .expectNextCount(iterationCount - 1) + .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .verifyComplete() + } + } } } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - val consulConfigProvider = ConsulConfigurationProvider( - httpAdapterMock, - invalidUrl, - firstRequestDelay, - requestInterval, - retry) +}) - whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) - .thenReturn(Mono.error(RuntimeException("Test exception"))) +private fun constructConsulConfigProvider(url: String, + httpAdapter: HttpAdapter, + healthStateProvider: HealthStateProvider, + iterationCount: Long = 1 +): ConsulConfigurationProvider { - it("should interrupt the flux") { + val firstRequestDelay = Duration.ofMillis(1) + val requestInterval = Duration.ofMillis(1) + val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return ConsulConfigurationProvider( + httpAdapter, + url, + firstRequestDelay, + requestInterval, + healthStateProvider, + retry + ) +} - StepVerifier.create(consulConfigProvider()) - .verifyErrorMessage("Test exception") - } - } -}) fun constructConsulResponse(): String { -- cgit 1.2.3-korg