diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-08-08 09:17:14 +0200 |
---|---|---|
committer | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-08-09 10:46:48 +0200 |
commit | 67702df781ab8acab8cd7375c0ce5ee91fc3debe (patch) | |
tree | d4323f6567e23d156ebfa754fd5aa6aeac5eb64a /hv-collector-ct | |
parent | dd827e2c1cc984d9ed1fed9914cbef0e985ea625 (diff) |
Implement simple health check mechanism
Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f
Issue-ID: DCAEGEN2-659
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Diffstat (limited to 'hv-collector-ct')
5 files changed, 152 insertions, 93 deletions
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml index 347bbbe0..f4150c2a 100644 --- a/hv-collector-ct/pom.xml +++ b/hv-collector-ct/pom.xml @@ -105,6 +105,10 @@ <groupId>org.jetbrains.spek</groupId> <artifactId>spek-junit-platform-engine</artifactId> </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + </dependency> </dependencies> diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index aaadcc7d..e7b7770d 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import reactor.core.publisher.Flux @@ -39,10 +40,11 @@ import java.time.Duration */ class Sut(sink: Sink = StoringSink()) { val configurationProvider = FakeConfigurationProvider() + val healthStateProvider = FakeHealthStateProvider() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT private val metrics = FakeMetrics() - private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics) + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider) private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 1f07c233..493517ba 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -22,15 +22,24 @@ package org.onap.dcae.collectors.veshv.tests.component import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.fakes.* +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame -import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Flux import java.time.Duration @@ -219,127 +228,143 @@ object VesHvSpecification : Spek({ val defaultTimeout = Duration.ofSeconds(10) - it("should update collector on configuration change") { - val (sut, _) = vesHvWithStoringSink() + given("successful configuration change") { - sut.configurationProvider.updateConfiguration(basicConfiguration) - val firstCollector = sut.collector + lateinit var sut: Sut + lateinit var sink: StoringSink - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val collectorAfterUpdate = sut.collector + beforeEachTest { + vesHvWithStoringSink().run { + sut = first + sink = second + } + } - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + it("should update collector") { + val firstCollector = sut.collector - } + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + val collectorAfterUpdate = sut.collector - it("should start routing messages on configuration change") { - val (sut, sink) = vesHvWithStoringSink() + assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + } - sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + it("should start routing messages") { - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messages).isEmpty() + sut.configurationProvider.updateConfiguration(configurationWithoutRouting) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messages).isEmpty() - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] + sut.configurationProvider.updateConfiguration(basicConfiguration) - assertThat(message.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(HVRANMEAS_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) - } + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(1) + val message = messagesAfterUpdate[0] - it("should change domain routing on configuration change") { - val (sut, sink) = vesHvWithStoringSink() + assertThat(message.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(message.partition).describedAs("routed message partition") + .isEqualTo(0) + } - sut.configurationProvider.updateConfiguration(basicConfiguration) + it("should change domain routing") { - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messages).hasSize(1) + val firstMessage = messages[0] - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") - .isEqualTo(HVRANMEAS_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(firstMessage.partition).describedAs("routed message partition") + .isEqualTo(0) - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(2) + val secondMessage = messagesAfterUpdate[1] - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) - } + assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) + assertThat(secondMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + } - it("should update routing for each client sending one message") { - val (sut, sink) = vesHvWithStoringSink() + it("should update routing for each client sending one message") { - sut.configurationProvider.updateConfiguration(basicConfiguration) + val messagesAmount = 10 + val messagesForEachTopic = 5 - val messagesAmount = 10 - val messagesForEachTopic = 5 + Flux.range(0, messagesAmount).doOnNext { + if (it == messagesForEachTopic) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + } + }.doOnNext { + sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + }.then().block(defaultTimeout) - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) - }.then().block(defaultTimeout) + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + assertThat(messages.size).isEqualTo(messagesAmount) + assertThat(messagesForEachTopic) + .describedAs("amount of messages routed to each topic") + .isEqualTo(firstTopicMessagesCount) + .isEqualTo(secondTopicMessagesCount) + } - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } + it("should not update routing for client sending continuous stream of messages") { + val messageStreamSize = 10 + val pivot = 5 - it("should not update routing for client sending continuous stream of messages") { - val (sut, sink) = vesHvWithStoringSink() + val incomingMessages = Flux.range(0, messageStreamSize) + .doOnNext { + if (it == pivot) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + println("config changed") + } + } + .map { vesWireFrameMessage(Domain.HVRANMEAS) } - sut.configurationProvider.updateConfiguration(basicConfiguration) - val messageStreamSize = 10 - val pivot = 5 + sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(Domain.HVRANMEAS) } + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + assertThat(messages.size).isEqualTo(messageStreamSize) + assertThat(firstTopicMessagesCount) + .describedAs("amount of messages routed to first topic") + .isEqualTo(messageStreamSize) - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + assertThat(secondTopicMessagesCount) + .describedAs("amount of messages routed to second topic") + .isEqualTo(0) + } - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + it("should mark the application healthy") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthState.HEALTHY) + } + } - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) + given("failed configuration change") { + val (sut, _) = vesHvWithStoringSink() + sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) + sut.configurationProvider.updateConfiguration(basicConfiguration) - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) + it("should mark the application unhealthy ") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + } } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt new file mode 100644 index 00000000..09fd232c --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt @@ -0,0 +1,18 @@ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider +import reactor.core.publisher.Flux + +class FakeHealthStateProvider : HealthStateProvider { + + lateinit var currentHealth: HealthState + + override fun changeState(healthState: HealthState) { + currentHealth = healthState + } + + override fun invoke(): Flux<HealthState> { + throw NotImplementedError() + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index b89113f4..ebeaa69e 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor +import reactor.retry.RetryExhaustedException const val HVRANMEAS_TOPIC = "ves_hvRanMeas" @@ -83,10 +84,19 @@ val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration ) class FakeConfigurationProvider : ConfigurationProvider { + private var shouldThrowException = false private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() - fun updateConfiguration(collectorConfiguration: CollectorConfiguration) { - configStream.onNext(collectorConfiguration) + fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = + if (shouldThrowException) { + configStream.onError(RetryExhaustedException("I'm so tired")) + } else { + configStream.onNext(collectorConfiguration) + } + + + fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { + this.shouldThrowException = shouldThrowException } override fun invoke() = configStream |