aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-08-08 09:17:14 +0200
committerJakub Dudycz <jakub.dudycz@nokia.com>2018-08-09 10:46:48 +0200
commit67702df781ab8acab8cd7375c0ce5ee91fc3debe (patch)
treed4323f6567e23d156ebfa754fd5aa6aeac5eb64a /hv-collector-core/src/main/kotlin
parentdd827e2c1cc984d9ed1fed9914cbef0e985ea625 (diff)
Implement simple health check mechanism
Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f Issue-ID: DCAEGEN2-659 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt13
3 files changed, 21 insertions, 15 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 7be24d23..3e652b92 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference
*/
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
- private val metrics: Metrics) {
+ private val metrics: Metrics,
+ private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
configuration()
.map(this::createVesHvCollector)
- .doOnNext { logger.info("Using updated configuration for new connections") }
+ .doOnNext {
+ logger.info("Using updated configuration for new connections")
+ healthStateProvider.changeState(HealthState.HEALTHY)
+ }
.doOnError {
- logger.error("Shutting down", it)
- // TODO: create Health class
- // It should monitor all incidents and expose the results for the
- // container health check mechanism
- System.exit(ERROR_CODE)
+ logger.error("Failed to acquire configuration from consul")
+ healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
@@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
companion object {
- private const val ERROR_CODE = 3
private val logger = Logger(CollectorFactory::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 7248db6e..07b5c82e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import reactor.ipc.netty.http.client.HttpClient
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,9 +34,7 @@ object AdapterFactory {
fun loggingSink(): SinkProvider = LoggingSinkProvider()
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(
- httpAdapter(),
- configurationProviderParams)
+ ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 6f04c95c..81463039 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,11 +20,12 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
-import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
@@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
+ private val healthStateProvider: HealthStateProvider,
retrySpec: Retry<Any>
+
) : ConfigurationProvider {
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
+ healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
}
- constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+ constructor(http: HttpAdapter,
+ params: ConfigurationProviderParams) : this(
http,
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
+ HealthStateProvider.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random()))
+ .jitter(Jitter.random())
+ )
override fun invoke(): Flux<CollectorConfiguration> =
Flux.interval(firstRequestDelay, requestInterval)