diff options
20 files changed, 488 insertions, 157 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index 06687b7d..784b2476 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -69,6 +69,11 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-health-check</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-domain</artifactId> <version>${project.parent.version}</version> <scope>compile</scope> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 7be24d23..3e652b92 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference */ class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, - private val metrics: Metrics) { + private val metrics: Metrics, + private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() configuration() .map(this::createVesHvCollector) - .doOnNext { logger.info("Using updated configuration for new connections") } + .doOnNext { + logger.info("Using updated configuration for new connections") + healthStateProvider.changeState(HealthState.HEALTHY) + } .doOnError { - logger.error("Shutting down", it) - // TODO: create Health class - // It should monitor all incidents and expose the results for the - // container health check mechanism - System.exit(ERROR_CODE) + logger.error("Failed to acquire configuration from consul") + healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get @@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider, } companion object { - private const val ERROR_CODE = 3 private val logger = Logger(CollectorFactory::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 7248db6e..07b5c82e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient -import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,9 +34,7 @@ object AdapterFactory { fun loggingSink(): SinkProvider = LoggingSinkProvider() fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider( - httpAdapter(), - configurationProviderParams) + ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 6f04c95c..81463039 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter @@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, + private val healthStateProvider: HealthStateProvider, retrySpec: Retry<Any> + ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) + healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) } - constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( + constructor(http: HttpAdapter, + params: ConfigurationProviderParams) : this( http, params.configurationUrl, params.firstRequestDelay, params.requestInterval, + HealthStateProvider.INSTANCE, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random())) + .jitter(Jitter.random()) + ) override fun invoke(): Flux<CollectorConfiguration> = Flux.interval(firstRequestDelay, requestInterval) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 1626c028..f9a9ba60 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -23,9 +23,13 @@ import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.whenever import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.retry.Retry @@ -40,62 +44,89 @@ import kotlin.test.assertEquals */ internal object ConsulConfigurationProviderTest : Spek({ - val httpAdapterMock: HttpAdapter = mock() - val firstRequestDelay = Duration.ofMillis(1) - val requestInterval = Duration.ofMillis(1) - val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1)) + describe("Consul configuration provider") { - given("valid resource url") { + val httpAdapterMock: HttpAdapter = mock() + val healthStateProvider = HealthStateProvider.INSTANCE - val validUrl = "http://valid-url/" - val consulConfigProvider = ConsulConfigurationProvider( - httpAdapterMock, - validUrl, - firstRequestDelay, - requestInterval, - retry) + given("valid resource url") { + val validUrl = "http://valid-url/" + val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) - whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) - .thenReturn(Mono.just(constructConsulResponse())) + on("call to consul") { + whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) + .thenReturn(Mono.just(constructConsulResponse())) - it("should use received configuration") { + it("should use received configuration") { - StepVerifier.create(consulConfigProvider().take(1)) - .consumeNextWith { + StepVerifier.create(consulConfigProvider().take(1)) + .consumeNextWith { - assertEquals("kafka:9093", it.kafkaBootstrapServers) + assertEquals("kafka:9093", it.kafkaBootstrapServers) - val route1 = it.routing.routes[0] - assertEquals(Domain.FAULT, route1.domain) - assertEquals("test-topic-1", route1.targetTopic) + val route1 = it.routing.routes[0] + assertEquals(Domain.FAULT, route1.domain) + assertEquals("test-topic-1", route1.targetTopic) - val route2 = it.routing.routes[1] - assertEquals(Domain.HEARTBEAT, route2.domain) - assertEquals("test-topic-2", route2.targetTopic) + val route2 = it.routing.routes[1] + assertEquals(Domain.HEARTBEAT, route2.domain) + assertEquals("test-topic-2", route2.targetTopic) + + }.verifyComplete() + } + } - }.verifyComplete() + } + given("invalid resource url") { + val invalidUrl = "http://invalid-url/" + + val iterationCount = 3L + val consulConfigProvider = constructConsulConfigProvider( + invalidUrl, httpAdapterMock, healthStateProvider, iterationCount + ) + + on("call to consul") { + whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) + .thenReturn(Mono.error(RuntimeException("Test exception"))) + + it("should interrupt the flux") { + + StepVerifier.create(consulConfigProvider()) + .verifyErrorMessage("Test exception") + } + + it("should update the health state"){ + StepVerifier.create(healthStateProvider().take(iterationCount)) + .expectNextCount(iterationCount - 1) + .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .verifyComplete() + } + } } } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - val consulConfigProvider = ConsulConfigurationProvider( - httpAdapterMock, - invalidUrl, - firstRequestDelay, - requestInterval, - retry) +}) - whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) - .thenReturn(Mono.error(RuntimeException("Test exception"))) +private fun constructConsulConfigProvider(url: String, + httpAdapter: HttpAdapter, + healthStateProvider: HealthStateProvider, + iterationCount: Long = 1 +): ConsulConfigurationProvider { - it("should interrupt the flux") { + val firstRequestDelay = Duration.ofMillis(1) + val requestInterval = Duration.ofMillis(1) + val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return ConsulConfigurationProvider( + httpAdapter, + url, + firstRequestDelay, + requestInterval, + healthStateProvider, + retry + ) +} - StepVerifier.create(consulConfigProvider()) - .verifyErrorMessage("Test exception") - } - } -}) fun constructConsulResponse(): String { diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml index 347bbbe0..f4150c2a 100644 --- a/hv-collector-ct/pom.xml +++ b/hv-collector-ct/pom.xml @@ -105,6 +105,10 @@ <groupId>org.jetbrains.spek</groupId> <artifactId>spek-junit-platform-engine</artifactId> </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + </dependency> </dependencies> diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index aaadcc7d..e7b7770d 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import reactor.core.publisher.Flux @@ -39,10 +40,11 @@ import java.time.Duration */ class Sut(sink: Sink = StoringSink()) { val configurationProvider = FakeConfigurationProvider() + val healthStateProvider = FakeHealthStateProvider() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT private val metrics = FakeMetrics() - private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics) + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider) private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 1f07c233..493517ba 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -22,15 +22,24 @@ package org.onap.dcae.collectors.veshv.tests.component import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.fakes.* +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame -import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Flux import java.time.Duration @@ -219,127 +228,143 @@ object VesHvSpecification : Spek({ val defaultTimeout = Duration.ofSeconds(10) - it("should update collector on configuration change") { - val (sut, _) = vesHvWithStoringSink() + given("successful configuration change") { - sut.configurationProvider.updateConfiguration(basicConfiguration) - val firstCollector = sut.collector + lateinit var sut: Sut + lateinit var sink: StoringSink - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val collectorAfterUpdate = sut.collector + beforeEachTest { + vesHvWithStoringSink().run { + sut = first + sink = second + } + } - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + it("should update collector") { + val firstCollector = sut.collector - } + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + val collectorAfterUpdate = sut.collector - it("should start routing messages on configuration change") { - val (sut, sink) = vesHvWithStoringSink() + assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + } - sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + it("should start routing messages") { - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messages).isEmpty() + sut.configurationProvider.updateConfiguration(configurationWithoutRouting) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messages).isEmpty() - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] + sut.configurationProvider.updateConfiguration(basicConfiguration) - assertThat(message.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(HVRANMEAS_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) - } + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(1) + val message = messagesAfterUpdate[0] - it("should change domain routing on configuration change") { - val (sut, sink) = vesHvWithStoringSink() + assertThat(message.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(message.partition).describedAs("routed message partition") + .isEqualTo(0) + } - sut.configurationProvider.updateConfiguration(basicConfiguration) + it("should change domain routing") { - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messages).hasSize(1) + val firstMessage = messages[0] - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") - .isEqualTo(HVRANMEAS_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(firstMessage.partition).describedAs("routed message partition") + .isEqualTo(0) - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(2) + val secondMessage = messagesAfterUpdate[1] - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) - } + assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) + assertThat(secondMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + } - it("should update routing for each client sending one message") { - val (sut, sink) = vesHvWithStoringSink() + it("should update routing for each client sending one message") { - sut.configurationProvider.updateConfiguration(basicConfiguration) + val messagesAmount = 10 + val messagesForEachTopic = 5 - val messagesAmount = 10 - val messagesForEachTopic = 5 + Flux.range(0, messagesAmount).doOnNext { + if (it == messagesForEachTopic) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + } + }.doOnNext { + sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + }.then().block(defaultTimeout) - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - }.then().block(defaultTimeout) + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + assertThat(messages.size).isEqualTo(messagesAmount) + assertThat(messagesForEachTopic) + .describedAs("amount of messages routed to each topic") + .isEqualTo(firstTopicMessagesCount) + .isEqualTo(secondTopicMessagesCount) + } - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } + it("should not update routing for client sending continuous stream of messages") { + val messageStreamSize = 10 + val pivot = 5 - it("should not update routing for client sending continuous stream of messages") { - val (sut, sink) = vesHvWithStoringSink() + val incomingMessages = Flux.range(0, messageStreamSize) + .doOnNext { + if (it == pivot) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + println("config changed") + } + } + .map { vesWireFrameMessage(Domain.HVRANMEAS) } - sut.configurationProvider.updateConfiguration(basicConfiguration) - val messageStreamSize = 10 - val pivot = 5 + sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(Domain.HVRANMEAS) } + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + assertThat(messages.size).isEqualTo(messageStreamSize) + assertThat(firstTopicMessagesCount) + .describedAs("amount of messages routed to first topic") + .isEqualTo(messageStreamSize) - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + assertThat(secondTopicMessagesCount) + .describedAs("amount of messages routed to second topic") + .isEqualTo(0) + } - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + it("should mark the application healthy") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthState.HEALTHY) + } + } - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) + given("failed configuration change") { + val (sut, _) = vesHvWithStoringSink() + sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) + sut.configurationProvider.updateConfiguration(basicConfiguration) - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) + it("should mark the application unhealthy ") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + } } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt new file mode 100644 index 00000000..09fd232c --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt @@ -0,0 +1,18 @@ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import reactor.core.publisher.Flux + +class FakeHealthStateProvider : HealthStateProvider { + + lateinit var currentHealth: HealthState + + override fun changeState(healthState: HealthState) { + currentHealth = healthState + } + + override fun invoke(): Flux<HealthState> { + throw NotImplementedError() + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index b89113f4..ebeaa69e 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor +import reactor.retry.RetryExhaustedException const val HVRANMEAS_TOPIC = "ves_hvRanMeas" @@ -83,10 +84,19 @@ val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration ) class FakeConfigurationProvider : ConfigurationProvider { + private var shouldThrowException = false private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() - fun updateConfiguration(collectorConfiguration: CollectorConfiguration) { - configStream.onNext(collectorConfiguration) + fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = + if (shouldThrowException) { + configStream.onError(RetryExhaustedException("I'm so tired")) + } else { + configStream.onNext(collectorConfiguration) + } + + + fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { + this.shouldThrowException = shouldThrowException } override fun invoke() = configStream diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml index 4072ec24..1e77adb0 100644 --- a/hv-collector-health-check/pom.xml +++ b/hv-collector-health-check/pom.xml @@ -14,7 +14,6 @@ <properties> <skipAnalysis>false</skipAnalysis> - <failIfMissingUnitTests>false</failIfMissingUnitTests> </properties> <parent> @@ -42,6 +41,11 @@ <dependencies> <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> @@ -53,5 +57,25 @@ <groupId>io.arrow-kt</groupId> <artifactId>arrow-effects</artifactId> </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/http/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt index 9cab031a..b21d1871 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/http/HealthCheckApiServer.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt @@ -17,21 +17,24 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.healthcheck.http +package org.onap.dcae.collectors.veshv.healthcheck.api import arrow.effects.IO import ratpack.handling.Chain -import ratpack.http.Status import ratpack.server.RatpackServer import ratpack.server.ServerConfig +import java.util.concurrent.atomic.AtomicReference /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since August 2018 */ -class HealthCheckApiServer { +class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) { + + private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING) fun start(port: Int): IO<RatpackServer> = IO { + healthStateProvider().subscribe(healthState::set) RatpackServer .start { it @@ -43,7 +46,9 @@ class HealthCheckApiServer { private fun configureHandlers(chain: Chain) { chain .get("healthcheck") { ctx -> - ctx.response.status(Status.OK).send() + healthState.get().run { + ctx.response.status(responseCode).send(message) + } } } }
\ No newline at end of file diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt new file mode 100644 index 00000000..3dddf1e7 --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.api + +import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK +import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +enum class HealthState(val message: String, val responseCode: Int) { + HEALTHY("Healthy", OK), + STARTING("Collector is starting", SERVICE_UNAVAILABLE), + WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE), + CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE) +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt new file mode 100644 index 00000000..5cc09ccc --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.api + +import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl +import reactor.core.publisher.Flux + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +interface HealthStateProvider { + + operator fun invoke(): Flux<HealthState> + fun changeState(healthState: HealthState) + + companion object { + val INSTANCE: HealthStateProvider by lazy { + HealthStateProviderImpl() + } + } +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt new file mode 100644 index 00000000..5056d2da --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.impl + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import reactor.core.publisher.Flux +import reactor.core.publisher.FluxProcessor +import reactor.core.publisher.UnicastProcessor + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +internal class HealthStateProviderImpl : HealthStateProvider { + + private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create() + + override fun invoke(): Flux<HealthState> = healthStateStream + + override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState) +} diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt new file mode 100644 index 00000000..e9c487bf --- /dev/null +++ b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.impl + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import reactor.test.StepVerifier + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +object HealthStateProviderImplTest : Spek({ + describe("Health state provider") { + val healthStateProviderImpl = HealthStateProviderImpl() + on("health state update") { + healthStateProviderImpl.changeState(HealthState.HEALTHY) + healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + + it("should push new health state to the subscriber") { + StepVerifier + .create(healthStateProviderImpl().take(4)) + .expectNext(HealthState.HEALTHY) + .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + .verifyComplete() + } + } + } +})
\ No newline at end of file diff --git a/hv-collector-main/Dockerfile b/hv-collector-main/Dockerfile index c077440e..fb7c7ae6 100644 --- a/hv-collector-main/Dockerfile +++ b/hv-collector-main/Dockerfile @@ -5,6 +5,10 @@ LABEL license.name="The Apache Software License, Version 2.0" LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" LABEL maintainer="Nokia Wroclaw ONAP Team" +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl \ + && apt-get clean + WORKDIR /opt/ves-hv-collector ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] COPY target/libs/external/* ./ diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 23d7d2e2..dc92228f 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -23,7 +23,8 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.http.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure @@ -68,7 +69,7 @@ private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also { } private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply { - HealthCheckApiServer() + HealthCheckApiServer(HealthStateProvider.INSTANCE) .start(healthCheckApiPort) .unsafeRunSync() .also { logger.info("Health check api server started on port ${it.bindPort}") } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt new file mode 100644 index 00000000..d20ffaca --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +class Status{ + companion object { + const val OK = 200 + const val SERVICE_UNAVAILABLE = 503 + } +} @@ -658,7 +658,6 @@ <version>${spek.version}</version> <scope>test</scope> </dependency> - <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> |