summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt141
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt21
2 files changed, 156 insertions, 6 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index aa5810d3..246fc7ed 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,11 +23,10 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.Routing
-import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.fakes.*
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.Flux
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,7 +49,6 @@ object VesHvSpecification : Spek({
}
describe("Memory management") {
-
it("should release memory for each handled and dropped message") {
val sink = StoringSink()
val sut = Sut(sink)
@@ -160,7 +158,7 @@ object VesHvSpecification : Spek({
assertThat(messages.get(2).topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
}
-
+
it("should drop message if route was not found") {
val sink = StoringSink()
val sut = Sut(sink)
@@ -178,6 +176,139 @@ object VesHvSpecification : Spek({
}
}
+ describe("configuration update") {
+
+ val defaultTimeout = Duration.ofSeconds(10)
+
+ it("should update collector on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val firstCollector = sut.collector
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
+
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+
+ }
+
+ it("should start routing messages on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).isEmpty()
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
+
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should change domain routing on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
+
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
+
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should update routing for each client sending one message") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
+
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ }.then().block(defaultTimeout)
+
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
+
+
+ it("should not update routing for client sending continuous stream of messages") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messageStreamSize = 10
+ val pivot = 5
+
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesMessage(Domain.HVRANMEAS) }
+
+
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
+
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
+ }
+
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
val sink = StoringSink()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 47c31c6f..b89113f4 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -29,6 +29,7 @@ import reactor.core.publisher.UnicastProcessor
const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
+const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
@@ -63,6 +64,24 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
)
+val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic(ALTERNATE_HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
+
+val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ }.build()
+)
+
class FakeConfigurationProvider : ConfigurationProvider {
private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
@@ -71,4 +90,4 @@ class FakeConfigurationProvider : ConfigurationProvider {
}
override fun invoke() = configStream
-} \ No newline at end of file
+}