From 67702df781ab8acab8cd7375c0ce5ee91fc3debe Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Wed, 8 Aug 2018 09:17:14 +0200 Subject: Implement simple health check mechanism Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f Issue-ID: DCAEGEN2-659 Signed-off-by: Jakub Dudycz --- .../dcae/collectors/veshv/factory/CollectorFactory.kt | 18 ++++++++++-------- .../collectors/veshv/impl/adapters/AdapterFactory.kt | 5 +---- .../veshv/impl/adapters/ConsulConfigurationProvider.kt | 13 ++++++++++--- 3 files changed, 21 insertions(+), 15 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 7be24d23..3e652b92 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference */ class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, - private val metrics: Metrics) { + private val metrics: Metrics, + private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() configuration() .map(this::createVesHvCollector) - .doOnNext { logger.info("Using updated configuration for new connections") } + .doOnNext { + logger.info("Using updated configuration for new connections") + healthStateProvider.changeState(HealthState.HEALTHY) + } .doOnError { - logger.error("Shutting down", it) - // TODO: create Health class - // It should monitor all incidents and expose the results for the - // container health check mechanism - System.exit(ERROR_CODE) + logger.error("Failed to acquire configuration from consul") + healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get @@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider, } companion object { - private const val ERROR_CODE = 3 private val logger = Logger(CollectorFactory::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 7248db6e..07b5c82e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient -import java.time.Duration /** * @author Piotr Jaszczyk @@ -35,9 +34,7 @@ object AdapterFactory { fun loggingSink(): SinkProvider = LoggingSinkProvider() fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider( - httpAdapter(), - configurationProviderParams) + ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 6f04c95c..81463039 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter @@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, + private val healthStateProvider: HealthStateProvider, retrySpec: Retry + ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference = AtomicReference(0) private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) + healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) } - constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( + constructor(http: HttpAdapter, + params: ConfigurationProviderParams) : this( http, params.configurationUrl, params.firstRequestDelay, params.requestInterval, + HealthStateProvider.INSTANCE, Retry.any() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random())) + .jitter(Jitter.random()) + ) override fun invoke(): Flux = Flux.interval(firstRequestDelay, requestInterval) -- cgit 1.2.3-korg