diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-08-08 09:17:14 +0200 |
---|---|---|
committer | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-08-09 10:46:48 +0200 |
commit | 67702df781ab8acab8cd7375c0ce5ee91fc3debe (patch) | |
tree | d4323f6567e23d156ebfa754fd5aa6aeac5eb64a /hv-collector-core/src/main | |
parent | dd827e2c1cc984d9ed1fed9914cbef0e985ea625 (diff) |
Implement simple health check mechanism
Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f
Issue-ID: DCAEGEN2-659
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Diffstat (limited to 'hv-collector-core/src/main')
3 files changed, 21 insertions, 15 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 7be24d23..3e652b92 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference */ class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, - private val metrics: Metrics) { + private val metrics: Metrics, + private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() configuration() .map(this::createVesHvCollector) - .doOnNext { logger.info("Using updated configuration for new connections") } + .doOnNext { + logger.info("Using updated configuration for new connections") + healthStateProvider.changeState(HealthState.HEALTHY) + } .doOnError { - logger.error("Shutting down", it) - // TODO: create Health class - // It should monitor all incidents and expose the results for the - // container health check mechanism - System.exit(ERROR_CODE) + logger.error("Failed to acquire configuration from consul") + healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get @@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider, } companion object { - private const val ERROR_CODE = 3 private val logger = Logger(CollectorFactory::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 7248db6e..07b5c82e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient -import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,9 +34,7 @@ object AdapterFactory { fun loggingSink(): SinkProvider = LoggingSinkProvider() fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider( - httpAdapter(), - configurationProviderParams) + ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 6f04c95c..81463039 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter @@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, + private val healthStateProvider: HealthStateProvider, retrySpec: Retry<Any> + ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) + healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) } - constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( + constructor(http: HttpAdapter, + params: ConfigurationProviderParams) : this( http, params.configurationUrl, params.firstRequestDelay, params.requestInterval, + HealthStateProvider.INSTANCE, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random())) + .jitter(Jitter.random()) + ) override fun invoke(): Flux<CollectorConfiguration> = Flux.interval(firstRequestDelay, requestInterval) |