diff options
author | Jakub Dudycz <jdudycz@nokia.com> | 2018-07-02 15:52:08 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 12:12:37 +0200 |
commit | 72b60289c3aa91f91893193011a01ea11bee2375 (patch) | |
tree | 055183094a14ea7782223d9dbbbba853beb9d21a /hv-collector-ct/src | |
parent | e31d59f63ebe5536d2d2d868703eb8896924b63d (diff) |
Component tests for consul configuration updates
Added few component test cases for updating configuration
Closes ONAP-464
Change-Id: Id8dba1d1cf4bf641a65e27d2a257fb5c26ee2bbc
Signed-off-by: Jakub Dudycz <jdudycz@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-ct/src')
2 files changed, 156 insertions, 6 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index aa5810d3..246fc7ed 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,11 +23,10 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.Routing -import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.tests.fakes.* import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import reactor.core.publisher.Flux +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -50,7 +49,6 @@ object VesHvSpecification : Spek({ } describe("Memory management") { - it("should release memory for each handled and dropped message") { val sink = StoringSink() val sut = Sut(sink) @@ -160,7 +158,7 @@ object VesHvSpecification : Spek({ assertThat(messages.get(2).topic).describedAs("last message topic") .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) } - + it("should drop message if route was not found") { val sink = StoringSink() val sut = Sut(sink) @@ -178,6 +176,139 @@ object VesHvSpecification : Spek({ } } + describe("configuration update") { + + val defaultTimeout = Duration.ofSeconds(10) + + it("should update collector on configuration change") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + val firstCollector = sut.collector + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + val collectorAfterUpdate = sut.collector + + assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + + } + + it("should start routing messages on configuration change") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messages).isEmpty() + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(1) + val message = messagesAfterUpdate[0] + + assertThat(message.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(message.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should change domain routing on configuration change") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messages).hasSize(1) + val firstMessage = messages[0] + + assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(firstMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + + val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(2) + val secondMessage = messagesAfterUpdate[1] + + assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) + assertThat(secondMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should update routing for each client sending one message") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messagesAmount = 10 + val messagesForEachTopic = 5 + + Flux.range(0, messagesAmount).doOnNext { + if (it == messagesForEachTopic) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + } + }.doOnNext { + sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + }.then().block(defaultTimeout) + + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + + assertThat(messages.size).isEqualTo(messagesAmount) + assertThat(messagesForEachTopic) + .describedAs("amount of messages routed to each topic") + .isEqualTo(firstTopicMessagesCount) + .isEqualTo(secondTopicMessagesCount) + } + + + it("should not update routing for client sending continuous stream of messages") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messageStreamSize = 10 + val pivot = 5 + + val incomingMessages = Flux.range(0, messageStreamSize) + .doOnNext { + if (it == pivot) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + println("config changed") + } + } + .map { vesMessage(Domain.HVRANMEAS) } + + + sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + + assertThat(messages.size).isEqualTo(messageStreamSize) + assertThat(firstTopicMessagesCount) + .describedAs("amount of messages routed to first topic") + .isEqualTo(messageStreamSize) + + assertThat(secondTopicMessagesCount) + .describedAs("amount of messages routed to second topic") + .isEqualTo(0) + } + } + describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { val sink = StoringSink() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 47c31c6f..b89113f4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -29,6 +29,7 @@ import reactor.core.publisher.UnicastProcessor const val HVRANMEAS_TOPIC = "ves_hvRanMeas" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling" +const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( kafkaBootstrapServers = "localhost:9969", @@ -63,6 +64,24 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu ) +val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(ALTERNATE_HVRANMEAS_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + }.build() +) + class FakeConfigurationProvider : ConfigurationProvider { private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() @@ -71,4 +90,4 @@ class FakeConfigurationProvider : ConfigurationProvider { } override fun invoke() = configStream -}
\ No newline at end of file +} |