aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/pom.xml5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt13
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt113
-rw-r--r--hv-collector-ct/pom.xml4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt205
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt18
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt14
-rw-r--r--hv-collector-health-check/pom.xml26
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt (renamed from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/http/HealthCheckApiServer.kt)13
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt34
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt39
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt39
-rw-r--r--hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt54
-rw-r--r--hv-collector-main/Dockerfile4
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt5
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt31
-rw-r--r--pom.xml1
20 files changed, 488 insertions, 157 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index 06687b7d..784b2476 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -69,6 +69,11 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-health-check</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-domain</artifactId>
<version>${project.parent.version}</version>
<scope>compile</scope>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 7be24d23..3e652b92 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference
*/
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
- private val metrics: Metrics) {
+ private val metrics: Metrics,
+ private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
configuration()
.map(this::createVesHvCollector)
- .doOnNext { logger.info("Using updated configuration for new connections") }
+ .doOnNext {
+ logger.info("Using updated configuration for new connections")
+ healthStateProvider.changeState(HealthState.HEALTHY)
+ }
.doOnError {
- logger.error("Shutting down", it)
- // TODO: create Health class
- // It should monitor all incidents and expose the results for the
- // container health check mechanism
- System.exit(ERROR_CODE)
+ logger.error("Failed to acquire configuration from consul")
+ healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
@@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
companion object {
- private const val ERROR_CODE = 3
private val logger = Logger(CollectorFactory::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 7248db6e..07b5c82e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import reactor.ipc.netty.http.client.HttpClient
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,9 +34,7 @@ object AdapterFactory {
fun loggingSink(): SinkProvider = LoggingSinkProvider()
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(
- httpAdapter(),
- configurationProviderParams)
+ ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 6f04c95c..81463039 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,11 +20,12 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
-import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
@@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
+ private val healthStateProvider: HealthStateProvider,
retrySpec: Retry<Any>
+
) : ConfigurationProvider {
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
+ healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
}
- constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+ constructor(http: HttpAdapter,
+ params: ConfigurationProviderParams) : this(
http,
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
+ HealthStateProvider.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random()))
+ .jitter(Jitter.random())
+ )
override fun invoke(): Flux<CollectorConfiguration> =
Flux.interval(firstRequestDelay, requestInterval)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 1626c028..f9a9ba60 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -23,9 +23,13 @@ import com.nhaarman.mockito_kotlin.eq
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -40,62 +44,89 @@ import kotlin.test.assertEquals
*/
internal object ConsulConfigurationProviderTest : Spek({
- val httpAdapterMock: HttpAdapter = mock()
- val firstRequestDelay = Duration.ofMillis(1)
- val requestInterval = Duration.ofMillis(1)
- val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1))
+ describe("Consul configuration provider") {
- given("valid resource url") {
+ val httpAdapterMock: HttpAdapter = mock()
+ val healthStateProvider = HealthStateProvider.INSTANCE
- val validUrl = "http://valid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(
- httpAdapterMock,
- validUrl,
- firstRequestDelay,
- requestInterval,
- retry)
+ given("valid resource url") {
+ val validUrl = "http://valid-url/"
+ val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
- whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
- .thenReturn(Mono.just(constructConsulResponse()))
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ .thenReturn(Mono.just(constructConsulResponse()))
- it("should use received configuration") {
+ it("should use received configuration") {
- StepVerifier.create(consulConfigProvider().take(1))
- .consumeNextWith {
+ StepVerifier.create(consulConfigProvider().take(1))
+ .consumeNextWith {
- assertEquals("kafka:9093", it.kafkaBootstrapServers)
+ assertEquals("kafka:9093", it.kafkaBootstrapServers)
- val route1 = it.routing.routes[0]
- assertEquals(Domain.FAULT, route1.domain)
- assertEquals("test-topic-1", route1.targetTopic)
+ val route1 = it.routing.routes[0]
+ assertEquals(Domain.FAULT, route1.domain)
+ assertEquals("test-topic-1", route1.targetTopic)
- val route2 = it.routing.routes[1]
- assertEquals(Domain.HEARTBEAT, route2.domain)
- assertEquals("test-topic-2", route2.targetTopic)
+ val route2 = it.routing.routes[1]
+ assertEquals(Domain.HEARTBEAT, route2.domain)
+ assertEquals("test-topic-2", route2.targetTopic)
+
+ }.verifyComplete()
+ }
+ }
- }.verifyComplete()
+ }
+ given("invalid resource url") {
+ val invalidUrl = "http://invalid-url/"
+
+ val iterationCount = 3L
+ val consulConfigProvider = constructConsulConfigProvider(
+ invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ )
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ .thenReturn(Mono.error(RuntimeException("Test exception")))
+
+ it("should interrupt the flux") {
+
+ StepVerifier.create(consulConfigProvider())
+ .verifyErrorMessage("Test exception")
+ }
+
+ it("should update the health state"){
+ StepVerifier.create(healthStateProvider().take(iterationCount))
+ .expectNextCount(iterationCount - 1)
+ .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .verifyComplete()
+ }
+ }
}
}
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(
- httpAdapterMock,
- invalidUrl,
- firstRequestDelay,
- requestInterval,
- retry)
+})
- whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
- .thenReturn(Mono.error(RuntimeException("Test exception")))
+private fun constructConsulConfigProvider(url: String,
+ httpAdapter: HttpAdapter,
+ healthStateProvider: HealthStateProvider,
+ iterationCount: Long = 1
+): ConsulConfigurationProvider {
- it("should interrupt the flux") {
+ val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConsulConfigurationProvider(
+ httpAdapter,
+ url,
+ firstRequestDelay,
+ requestInterval,
+ healthStateProvider,
+ retry
+ )
+}
- StepVerifier.create(consulConfigProvider())
- .verifyErrorMessage("Test exception")
- }
- }
-})
fun constructConsulResponse(): String {
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml
index 347bbbe0..f4150c2a 100644
--- a/hv-collector-ct/pom.xml
+++ b/hv-collector-ct/pom.xml
@@ -105,6 +105,10 @@
<groupId>org.jetbrains.spek</groupId>
<artifactId>spek-junit-platform-engine</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
</dependencies>
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index aaadcc7d..e7b7770d 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import reactor.core.publisher.Flux
@@ -39,10 +40,11 @@ import java.time.Duration
*/
class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
+ val healthStateProvider = FakeHealthStateProvider()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
- private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
+ private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 1f07c233..493517ba 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -22,15 +22,24 @@ package org.onap.dcae.collectors.veshv.tests.component
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
-import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Flux
import java.time.Duration
@@ -219,127 +228,143 @@ object VesHvSpecification : Spek({
val defaultTimeout = Duration.ofSeconds(10)
- it("should update collector on configuration change") {
- val (sut, _) = vesHvWithStoringSink()
+ given("successful configuration change") {
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val firstCollector = sut.collector
+ lateinit var sut: Sut
+ lateinit var sink: StoringSink
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val collectorAfterUpdate = sut.collector
+ beforeEachTest {
+ vesHvWithStoringSink().run {
+ sut = first
+ sink = second
+ }
+ }
- assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+ it("should update collector") {
+ val firstCollector = sut.collector
- }
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
- it("should start routing messages on configuration change") {
- val (sut, sink) = vesHvWithStoringSink()
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+ }
- sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+ it("should start routing messages") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messages).isEmpty()
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messages).isEmpty()
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messagesAfterUpdate).hasSize(1)
- val message = messagesAfterUpdate[0]
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(HVRANMEAS_TOPIC)
- assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
- }
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
- it("should change domain routing on configuration change") {
- val (sut, sink) = vesHvWithStoringSink()
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ it("should change domain routing") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messages).hasSize(1)
- val firstMessage = messages[0]
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(HVRANMEAS_TOPIC)
- assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messagesAfterUpdate).hasSize(2)
- val secondMessage = messagesAfterUpdate[1]
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
- assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
- }
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
- it("should update routing for each client sending one message") {
- val (sut, sink) = vesHvWithStoringSink()
+ it("should update routing for each client sending one message") {
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
- val messagesAmount = 10
- val messagesForEachTopic = 5
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ }.then().block(defaultTimeout)
- Flux.range(0, messagesAmount).doOnNext {
- if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- }
- }.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- }.then().block(defaultTimeout)
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
- assertThat(messages.size).isEqualTo(messagesAmount)
- assertThat(messagesForEachTopic)
- .describedAs("amount of messages routed to each topic")
- .isEqualTo(firstTopicMessagesCount)
- .isEqualTo(secondTopicMessagesCount)
- }
+ it("should not update routing for client sending continuous stream of messages") {
+ val messageStreamSize = 10
+ val pivot = 5
- it("should not update routing for client sending continuous stream of messages") {
- val (sut, sink) = vesHvWithStoringSink()
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesWireFrameMessage(Domain.HVRANMEAS) }
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messageStreamSize = 10
- val pivot = 5
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
- val incomingMessages = Flux.range(0, messageStreamSize)
- .doOnNext {
- if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- println("config changed")
- }
- }
- .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
- sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ it("should mark the application healthy") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthState.HEALTHY)
+ }
+ }
- assertThat(messages.size).isEqualTo(messageStreamSize)
- assertThat(firstTopicMessagesCount)
- .describedAs("amount of messages routed to first topic")
- .isEqualTo(messageStreamSize)
+ given("failed configuration change") {
+ val (sut, _) = vesHvWithStoringSink()
+ sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
- assertThat(secondTopicMessagesCount)
- .describedAs("amount of messages routed to second topic")
- .isEqualTo(0)
+ it("should mark the application unhealthy ") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ }
}
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
new file mode 100644
index 00000000..09fd232c
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
@@ -0,0 +1,18 @@
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import reactor.core.publisher.Flux
+
+class FakeHealthStateProvider : HealthStateProvider {
+
+ lateinit var currentHealth: HealthState
+
+ override fun changeState(healthState: HealthState) {
+ currentHealth = healthState
+ }
+
+ override fun invoke(): Flux<HealthState> {
+ throw NotImplementedError()
+ }
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index b89113f4..ebeaa69e 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
+import reactor.retry.RetryExhaustedException
const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
@@ -83,10 +84,19 @@ val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration
)
class FakeConfigurationProvider : ConfigurationProvider {
+ private var shouldThrowException = false
private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
- fun updateConfiguration(collectorConfiguration: CollectorConfiguration) {
- configStream.onNext(collectorConfiguration)
+ fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+ if (shouldThrowException) {
+ configStream.onError(RetryExhaustedException("I'm so tired"))
+ } else {
+ configStream.onNext(collectorConfiguration)
+ }
+
+
+ fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
+ this.shouldThrowException = shouldThrowException
}
override fun invoke() = configStream
diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml
index 4072ec24..1e77adb0 100644
--- a/hv-collector-health-check/pom.xml
+++ b/hv-collector-health-check/pom.xml
@@ -14,7 +14,6 @@
<properties>
<skipAnalysis>false</skipAnalysis>
- <failIfMissingUnitTests>false</failIfMissingUnitTests>
</properties>
<parent>
@@ -42,6 +41,11 @@
<dependencies>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
@@ -53,5 +57,25 @@
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-effects</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/http/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
index 9cab031a..b21d1871 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/http/HealthCheckApiServer.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
@@ -17,21 +17,24 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.healthcheck.http
+package org.onap.dcae.collectors.veshv.healthcheck.api
import arrow.effects.IO
import ratpack.handling.Chain
-import ratpack.http.Status
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
+import java.util.concurrent.atomic.AtomicReference
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-class HealthCheckApiServer {
+class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) {
+
+ private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING)
fun start(port: Int): IO<RatpackServer> = IO {
+ healthStateProvider().subscribe(healthState::set)
RatpackServer
.start {
it
@@ -43,7 +46,9 @@ class HealthCheckApiServer {
private fun configureHandlers(chain: Chain) {
chain
.get("healthcheck") { ctx ->
- ctx.response.status(Status.OK).send()
+ healthState.get().run {
+ ctx.response.status(responseCode).send(message)
+ }
}
}
} \ No newline at end of file
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
new file mode 100644
index 00000000..3dddf1e7
--- /dev/null
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.api
+
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+enum class HealthState(val message: String, val responseCode: Int) {
+ HEALTHY("Healthy", OK),
+ STARTING("Collector is starting", SERVICE_UNAVAILABLE),
+ WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE),
+ CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE)
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
new file mode 100644
index 00000000..5cc09ccc
--- /dev/null
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.api
+
+import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
+import reactor.core.publisher.Flux
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+interface HealthStateProvider {
+
+ operator fun invoke(): Flux<HealthState>
+ fun changeState(healthState: HealthState)
+
+ companion object {
+ val INSTANCE: HealthStateProvider by lazy {
+ HealthStateProviderImpl()
+ }
+ }
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt
new file mode 100644
index 00000000..5056d2da
--- /dev/null
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.impl
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import reactor.core.publisher.Flux
+import reactor.core.publisher.FluxProcessor
+import reactor.core.publisher.UnicastProcessor
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+internal class HealthStateProviderImpl : HealthStateProvider {
+
+ private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create()
+
+ override fun invoke(): Flux<HealthState> = healthStateStream
+
+ override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState)
+}
diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
new file mode 100644
index 00000000..e9c487bf
--- /dev/null
+++ b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.impl
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import reactor.test.StepVerifier
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+object HealthStateProviderImplTest : Spek({
+ describe("Health state provider") {
+ val healthStateProviderImpl = HealthStateProviderImpl()
+ on("health state update") {
+ healthStateProviderImpl.changeState(HealthState.HEALTHY)
+ healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+
+ it("should push new health state to the subscriber") {
+ StepVerifier
+ .create(healthStateProviderImpl().take(4))
+ .expectNext(HealthState.HEALTHY)
+ .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ .verifyComplete()
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-main/Dockerfile b/hv-collector-main/Dockerfile
index c077440e..fb7c7ae6 100644
--- a/hv-collector-main/Dockerfile
+++ b/hv-collector-main/Dockerfile
@@ -5,6 +5,10 @@ LABEL license.name="The Apache Software License, Version 2.0"
LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
LABEL maintainer="Nokia Wroclaw ONAP Team"
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends curl \
+ && apt-get clean
+
WORKDIR /opt/ves-hv-collector
ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
COPY target/libs/external/* ./
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 23d7d2e2..dc92228f 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -23,7 +23,8 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.http.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
@@ -68,7 +69,7 @@ private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
}
private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
- HealthCheckApiServer()
+ HealthCheckApiServer(HealthStateProvider.INSTANCE)
.start(healthCheckApiPort)
.unsafeRunSync()
.also { logger.info("Health check api server started on port ${it.bindPort}") }
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
new file mode 100644
index 00000000..d20ffaca
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+class Status{
+ companion object {
+ const val OK = 200
+ const val SERVICE_UNAVAILABLE = 503
+ }
+}
diff --git a/pom.xml b/pom.xml
index 19b98927..ac5e1f29 100644
--- a/pom.xml
+++ b/pom.xml
@@ -658,7 +658,6 @@
<version>${spek.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>