aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-08-08 09:17:14 +0200
committerJakub Dudycz <jakub.dudycz@nokia.com>2018-08-09 10:46:48 +0200
commit67702df781ab8acab8cd7375c0ce5ee91fc3debe (patch)
treed4323f6567e23d156ebfa754fd5aa6aeac5eb64a /hv-collector-core/src
parentdd827e2c1cc984d9ed1fed9914cbef0e985ea625 (diff)
Implement simple health check mechanism
Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f Issue-ID: DCAEGEN2-659 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Diffstat (limited to 'hv-collector-core/src')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt13
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt113
4 files changed, 93 insertions, 56 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 7be24d23..3e652b92 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference
*/
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
- private val metrics: Metrics) {
+ private val metrics: Metrics,
+ private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
configuration()
.map(this::createVesHvCollector)
- .doOnNext { logger.info("Using updated configuration for new connections") }
+ .doOnNext {
+ logger.info("Using updated configuration for new connections")
+ healthStateProvider.changeState(HealthState.HEALTHY)
+ }
.doOnError {
- logger.error("Shutting down", it)
- // TODO: create Health class
- // It should monitor all incidents and expose the results for the
- // container health check mechanism
- System.exit(ERROR_CODE)
+ logger.error("Failed to acquire configuration from consul")
+ healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
@@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
companion object {
- private const val ERROR_CODE = 3
private val logger = Logger(CollectorFactory::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 7248db6e..07b5c82e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import reactor.ipc.netty.http.client.HttpClient
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,9 +34,7 @@ object AdapterFactory {
fun loggingSink(): SinkProvider = LoggingSinkProvider()
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(
- httpAdapter(),
- configurationProviderParams)
+ ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 6f04c95c..81463039 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,11 +20,12 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
-import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
@@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
+ private val healthStateProvider: HealthStateProvider,
retrySpec: Retry<Any>
+
) : ConfigurationProvider {
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
+ healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
}
- constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+ constructor(http: HttpAdapter,
+ params: ConfigurationProviderParams) : this(
http,
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
+ HealthStateProvider.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random()))
+ .jitter(Jitter.random())
+ )
override fun invoke(): Flux<CollectorConfiguration> =
Flux.interval(firstRequestDelay, requestInterval)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 1626c028..f9a9ba60 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -23,9 +23,13 @@ import com.nhaarman.mockito_kotlin.eq
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -40,62 +44,89 @@ import kotlin.test.assertEquals
*/
internal object ConsulConfigurationProviderTest : Spek({
- val httpAdapterMock: HttpAdapter = mock()
- val firstRequestDelay = Duration.ofMillis(1)
- val requestInterval = Duration.ofMillis(1)
- val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1))
+ describe("Consul configuration provider") {
- given("valid resource url") {
+ val httpAdapterMock: HttpAdapter = mock()
+ val healthStateProvider = HealthStateProvider.INSTANCE
- val validUrl = "http://valid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(
- httpAdapterMock,
- validUrl,
- firstRequestDelay,
- requestInterval,
- retry)
+ given("valid resource url") {
+ val validUrl = "http://valid-url/"
+ val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
- whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
- .thenReturn(Mono.just(constructConsulResponse()))
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ .thenReturn(Mono.just(constructConsulResponse()))
- it("should use received configuration") {
+ it("should use received configuration") {
- StepVerifier.create(consulConfigProvider().take(1))
- .consumeNextWith {
+ StepVerifier.create(consulConfigProvider().take(1))
+ .consumeNextWith {
- assertEquals("kafka:9093", it.kafkaBootstrapServers)
+ assertEquals("kafka:9093", it.kafkaBootstrapServers)
- val route1 = it.routing.routes[0]
- assertEquals(Domain.FAULT, route1.domain)
- assertEquals("test-topic-1", route1.targetTopic)
+ val route1 = it.routing.routes[0]
+ assertEquals(Domain.FAULT, route1.domain)
+ assertEquals("test-topic-1", route1.targetTopic)
- val route2 = it.routing.routes[1]
- assertEquals(Domain.HEARTBEAT, route2.domain)
- assertEquals("test-topic-2", route2.targetTopic)
+ val route2 = it.routing.routes[1]
+ assertEquals(Domain.HEARTBEAT, route2.domain)
+ assertEquals("test-topic-2", route2.targetTopic)
+
+ }.verifyComplete()
+ }
+ }
- }.verifyComplete()
+ }
+ given("invalid resource url") {
+ val invalidUrl = "http://invalid-url/"
+
+ val iterationCount = 3L
+ val consulConfigProvider = constructConsulConfigProvider(
+ invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ )
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ .thenReturn(Mono.error(RuntimeException("Test exception")))
+
+ it("should interrupt the flux") {
+
+ StepVerifier.create(consulConfigProvider())
+ .verifyErrorMessage("Test exception")
+ }
+
+ it("should update the health state"){
+ StepVerifier.create(healthStateProvider().take(iterationCount))
+ .expectNextCount(iterationCount - 1)
+ .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .verifyComplete()
+ }
+ }
}
}
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(
- httpAdapterMock,
- invalidUrl,
- firstRequestDelay,
- requestInterval,
- retry)
+})
- whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
- .thenReturn(Mono.error(RuntimeException("Test exception")))
+private fun constructConsulConfigProvider(url: String,
+ httpAdapter: HttpAdapter,
+ healthStateProvider: HealthStateProvider,
+ iterationCount: Long = 1
+): ConsulConfigurationProvider {
- it("should interrupt the flux") {
+ val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConsulConfigurationProvider(
+ httpAdapter,
+ url,
+ firstRequestDelay,
+ requestInterval,
+ healthStateProvider,
+ retry
+ )
+}
- StepVerifier.create(consulConfigProvider())
- .verifyErrorMessage("Test exception")
- }
- }
-})
fun constructConsulResponse(): String {