aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt155
1 files changed, 5 insertions, 150 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 5d215fc5..6a718eea 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
@@ -166,9 +168,7 @@ object VesHvSpecification : Spek({
}
it("should be able to direct 2 messages from different domains to one topic") {
- val (sut, sink) = vesHvWithStoringSink()
-
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+ val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -202,150 +202,6 @@ object VesHvSpecification : Spek({
}
}
- describe("configuration update") {
-
- val defaultTimeout = Duration.ofSeconds(10)
-
- given("successful configuration change") {
-
- lateinit var sut: Sut
- lateinit var sink: StoringSink
-
- beforeEachTest {
- vesHvWithStoringSink().run {
- sut = first
- sink = second
- }
- }
-
- it("should update collector") {
- val firstCollector = sut.collector
-
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- val collectorAfterUpdate = sut.collector
-
- assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
- }
-
- it("should start routing messages") {
-
- sut.configurationProvider.updateConfiguration(emptyRouting)
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).isEmpty()
-
- sut.configurationProvider.updateConfiguration(basicRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(1)
- val message = messagesAfterUpdate[0]
-
- assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(None)
- }
-
- it("should change domain routing") {
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).hasSize(1)
- val firstMessage = messages[0]
-
- assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(None)
-
-
- sut.configurationProvider.updateConfiguration(alternativeRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(2)
- val secondMessage = messagesAfterUpdate[1]
-
- assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
- .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
- assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(None)
- }
-
- it("should update routing for each client sending one message") {
-
- val messagesAmount = 10
- val messagesForEachTopic = 5
-
- Flux.range(0, messagesAmount).doOnNext {
- if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- }
- }.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- }.then().block(defaultTimeout)
-
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messagesAmount)
- assertThat(messagesForEachTopic)
- .describedAs("amount of messages routed to each topic")
- .isEqualTo(firstTopicMessagesCount)
- .isEqualTo(secondTopicMessagesCount)
- }
-
- it("should not update routing for client sending continuous stream of messages") {
-
- val messageStreamSize = 10
- val pivot = 5
-
- val incomingMessages = Flux.range(0, messageStreamSize)
- .doOnNext {
- if (it == pivot) {
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- println("config changed")
- }
- }
- .map { vesWireFrameMessage(PERF3GPP) }
-
-
- sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messageStreamSize)
- assertThat(firstTopicMessagesCount)
- .describedAs("amount of messages routed to first topic")
- .isEqualTo(messageStreamSize)
-
- assertThat(secondTopicMessagesCount)
- .describedAs("amount of messages routed to second topic")
- .isEqualTo(0)
- }
-
- it("should mark the application healthy") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.HEALTHY)
- }
- }
-
- given("failed configuration change") {
- val (sut, _) = vesHvWithStoringSink()
- sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicRouting)
-
- it("should mark the application unhealthy ") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
- }
- }
- }
-
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
val (sut, sink) = vesHvWithStoringSink()
@@ -362,9 +218,8 @@ object VesHvSpecification : Spek({
})
-private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(routing), sink)
return Pair(sut, sink)
}