diff options
Diffstat (limited to 'hv-collector-ct/src')
2 files changed, 156 insertions, 6 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index aa5810d3..246fc7ed 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,11 +23,10 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.Routing -import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.tests.fakes.* import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import reactor.core.publisher.Flux +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -50,7 +49,6 @@ object VesHvSpecification : Spek({ } describe("Memory management") { - it("should release memory for each handled and dropped message") { val sink = StoringSink() val sut = Sut(sink) @@ -160,7 +158,7 @@ object VesHvSpecification : Spek({ assertThat(messages.get(2).topic).describedAs("last message topic") .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) } - + it("should drop message if route was not found") { val sink = StoringSink() val sut = Sut(sink) @@ -178,6 +176,139 @@ object VesHvSpecification : Spek({ } } + describe("configuration update") { + + val defaultTimeout = Duration.ofSeconds(10) + + it("should update collector on configuration change") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + val firstCollector = sut.collector + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + val collectorAfterUpdate = sut.collector + + assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + + } + + it("should start routing messages on configuration change") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messages).isEmpty() + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(1) + val message = messagesAfterUpdate[0] + + assertThat(message.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(message.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should change domain routing on configuration change") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messages).hasSize(1) + val firstMessage = messages[0] + + assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + .isEqualTo(HVRANMEAS_TOPIC) + assertThat(firstMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + + val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + assertThat(messagesAfterUpdate).hasSize(2) + val secondMessage = messagesAfterUpdate[1] + + assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) + assertThat(secondMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should update routing for each client sending one message") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messagesAmount = 10 + val messagesForEachTopic = 5 + + Flux.range(0, messagesAmount).doOnNext { + if (it == messagesForEachTopic) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + } + }.doOnNext { + sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + }.then().block(defaultTimeout) + + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + + assertThat(messages.size).isEqualTo(messagesAmount) + assertThat(messagesForEachTopic) + .describedAs("amount of messages routed to each topic") + .isEqualTo(firstTopicMessagesCount) + .isEqualTo(secondTopicMessagesCount) + } + + + it("should not update routing for client sending continuous stream of messages") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messageStreamSize = 10 + val pivot = 5 + + val incomingMessages = Flux.range(0, messageStreamSize) + .doOnNext { + if (it == pivot) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + println("config changed") + } + } + .map { vesMessage(Domain.HVRANMEAS) } + + + sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + + assertThat(messages.size).isEqualTo(messageStreamSize) + assertThat(firstTopicMessagesCount) + .describedAs("amount of messages routed to first topic") + .isEqualTo(messageStreamSize) + + assertThat(secondTopicMessagesCount) + .describedAs("amount of messages routed to second topic") + .isEqualTo(0) + } + } + describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { val sink = StoringSink() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 47c31c6f..b89113f4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -29,6 +29,7 @@ import reactor.core.publisher.UnicastProcessor const val HVRANMEAS_TOPIC = "ves_hvRanMeas" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling" +const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( kafkaBootstrapServers = "localhost:9969", @@ -63,6 +64,24 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu ) +val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(ALTERNATE_HVRANMEAS_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + }.build() +) + class FakeConfigurationProvider : ConfigurationProvider { private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() @@ -71,4 +90,4 @@ class FakeConfigurationProvider : ConfigurationProvider { } override fun invoke() = configStream -}
\ No newline at end of file +} |