aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt1
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt31
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt22
3 files changed, 50 insertions, 4 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 44b3266e..d78463bf 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.time.Duration
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index d917c71a..aa5810d3 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,9 +23,10 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.Routing
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.tests.fakes.*
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
/**
@@ -137,6 +138,29 @@ object VesHvSpecification : Spek({
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
+ it("should be able to direct 2 messages from different domains to one topic") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+ val messages = sut.handleConnection(sink,
+ vesMessage(Domain.HVRANMEAS),
+ vesMessage(Domain.HEARTBEAT),
+ vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+ assertThat(messages.get(0).topic).describedAs("first message topic")
+ .isEqualTo(HVRANMEAS_TOPIC)
+
+ assertThat(messages.get(1).topic).describedAs("second message topic")
+ .isEqualTo(HVRANMEAS_TOPIC)
+
+ assertThat(messages.get(2).topic).describedAs("last message topic")
+ .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ }
+
it("should drop message if route was not found") {
val sink = StoringSink()
val sut = Sut(sink)
@@ -169,4 +193,5 @@ object VesHvSpecification : Spek({
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
}
}
+
})
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 82226dc2..47c31c6f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -28,6 +28,7 @@ import reactor.core.publisher.UnicastProcessor
const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
@@ -40,6 +41,27 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
}.build()
)
+val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(Domain.HEARTBEAT)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+ toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
class FakeConfigurationProvider : ConfigurationProvider {
private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()