aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct')
-rw-r--r--hv-collector-ct/pom.xml4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt205
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt18
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt14
5 files changed, 152 insertions, 93 deletions
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml
index 347bbbe0..f4150c2a 100644
--- a/hv-collector-ct/pom.xml
+++ b/hv-collector-ct/pom.xml
@@ -105,6 +105,10 @@
<groupId>org.jetbrains.spek</groupId>
<artifactId>spek-junit-platform-engine</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
</dependencies>
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index aaadcc7d..e7b7770d 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import reactor.core.publisher.Flux
@@ -39,10 +40,11 @@ import java.time.Duration
*/
class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
+ val healthStateProvider = FakeHealthStateProvider()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
- private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
+ private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 1f07c233..493517ba 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -22,15 +22,24 @@ package org.onap.dcae.collectors.veshv.tests.component
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
-import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Flux
import java.time.Duration
@@ -219,127 +228,143 @@ object VesHvSpecification : Spek({
val defaultTimeout = Duration.ofSeconds(10)
- it("should update collector on configuration change") {
- val (sut, _) = vesHvWithStoringSink()
+ given("successful configuration change") {
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val firstCollector = sut.collector
+ lateinit var sut: Sut
+ lateinit var sink: StoringSink
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val collectorAfterUpdate = sut.collector
+ beforeEachTest {
+ vesHvWithStoringSink().run {
+ sut = first
+ sink = second
+ }
+ }
- assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+ it("should update collector") {
+ val firstCollector = sut.collector
- }
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
- it("should start routing messages on configuration change") {
- val (sut, sink) = vesHvWithStoringSink()
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+ }
- sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+ it("should start routing messages") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messages).isEmpty()
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messages).isEmpty()
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messagesAfterUpdate).hasSize(1)
- val message = messagesAfterUpdate[0]
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(HVRANMEAS_TOPIC)
- assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
- }
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
- it("should change domain routing on configuration change") {
- val (sut, sink) = vesHvWithStoringSink()
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ it("should change domain routing") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messages).hasSize(1)
- val firstMessage = messages[0]
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(HVRANMEAS_TOPIC)
- assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- assertThat(messagesAfterUpdate).hasSize(2)
- val secondMessage = messagesAfterUpdate[1]
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
- assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
- }
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
- it("should update routing for each client sending one message") {
- val (sut, sink) = vesHvWithStoringSink()
+ it("should update routing for each client sending one message") {
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
- val messagesAmount = 10
- val messagesForEachTopic = 5
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ }.then().block(defaultTimeout)
- Flux.range(0, messagesAmount).doOnNext {
- if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- }
- }.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
- }.then().block(defaultTimeout)
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
- assertThat(messages.size).isEqualTo(messagesAmount)
- assertThat(messagesForEachTopic)
- .describedAs("amount of messages routed to each topic")
- .isEqualTo(firstTopicMessagesCount)
- .isEqualTo(secondTopicMessagesCount)
- }
+ it("should not update routing for client sending continuous stream of messages") {
+ val messageStreamSize = 10
+ val pivot = 5
- it("should not update routing for client sending continuous stream of messages") {
- val (sut, sink) = vesHvWithStoringSink()
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesWireFrameMessage(Domain.HVRANMEAS) }
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messageStreamSize = 10
- val pivot = 5
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
- val incomingMessages = Flux.range(0, messageStreamSize)
- .doOnNext {
- if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- println("config changed")
- }
- }
- .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
- sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ it("should mark the application healthy") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthState.HEALTHY)
+ }
+ }
- assertThat(messages.size).isEqualTo(messageStreamSize)
- assertThat(firstTopicMessagesCount)
- .describedAs("amount of messages routed to first topic")
- .isEqualTo(messageStreamSize)
+ given("failed configuration change") {
+ val (sut, _) = vesHvWithStoringSink()
+ sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
- assertThat(secondTopicMessagesCount)
- .describedAs("amount of messages routed to second topic")
- .isEqualTo(0)
+ it("should mark the application unhealthy ") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ }
}
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
new file mode 100644
index 00000000..09fd232c
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
@@ -0,0 +1,18 @@
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import reactor.core.publisher.Flux
+
+class FakeHealthStateProvider : HealthStateProvider {
+
+ lateinit var currentHealth: HealthState
+
+ override fun changeState(healthState: HealthState) {
+ currentHealth = healthState
+ }
+
+ override fun invoke(): Flux<HealthState> {
+ throw NotImplementedError()
+ }
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index b89113f4..ebeaa69e 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
+import reactor.retry.RetryExhaustedException
const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
@@ -83,10 +84,19 @@ val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration
)
class FakeConfigurationProvider : ConfigurationProvider {
+ private var shouldThrowException = false
private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
- fun updateConfiguration(collectorConfiguration: CollectorConfiguration) {
- configStream.onNext(collectorConfiguration)
+ fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+ if (shouldThrowException) {
+ configStream.onError(RetryExhaustedException("I'm so tired"))
+ } else {
+ configStream.onNext(collectorConfiguration)
+ }
+
+
+ fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
+ this.shouldThrowException = shouldThrowException
}
override fun invoke() = configStream