diff options
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt')
-rw-r--r-- | sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt | 155 |
1 files changed, 5 insertions, 150 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 5d215fc5..6a718eea 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER @@ -166,9 +168,7 @@ object VesHvSpecification : Spek({ } it("should be able to direct 2 messages from different domains to one topic") { - val (sut, sink) = vesHvWithStoringSink() - - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -202,150 +202,6 @@ object VesHvSpecification : Spek({ } } - describe("configuration update") { - - val defaultTimeout = Duration.ofSeconds(10) - - given("successful configuration change") { - - lateinit var sut: Sut - lateinit var sink: StoringSink - - beforeEachTest { - vesHvWithStoringSink().run { - sut = first - sink = second - } - } - - it("should update collector") { - val firstCollector = sut.collector - - sut.configurationProvider.updateConfiguration(alternativeRouting) - val collectorAfterUpdate = sut.collector - - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) - } - - it("should start routing messages") { - - sut.configurationProvider.updateConfiguration(emptyRouting) - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).isEmpty() - - sut.configurationProvider.updateConfiguration(basicRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] - - assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should change domain routing") { - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] - - assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - - - sut.configurationProvider.updateConfiguration(alternativeRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] - - assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should update routing for each client sending one message") { - - val messagesAmount = 10 - val messagesForEachTopic = 5 - - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - }.then().block(defaultTimeout) - - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } - - it("should not update routing for client sending continuous stream of messages") { - - val messageStreamSize = 10 - val pivot = 5 - - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(PERF3GPP) } - - - sut.collector.handleConnection(incomingMessages).block(defaultTimeout) - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) - - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) - } - - it("should mark the application healthy") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.HEALTHY) - } - } - - given("failed configuration change") { - val (sut, _) = vesHvWithStoringSink() - sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) - - it("should mark the application unhealthy ") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - } - } - describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { val (sut, sink) = vesHvWithStoringSink() @@ -362,9 +218,8 @@ object VesHvSpecification : Spek({ }) -private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { +private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> { val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(routing), sink) return Pair(sut, sink) } |