diff options
Diffstat (limited to 'hv-collector-ct')
3 files changed, 50 insertions, 4 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 44b3266e..d78463bf 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.time.Duration diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index d917c71a..aa5810d3 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,9 +23,10 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.Routing +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.tests.fakes.* import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain /** @@ -137,6 +138,29 @@ object VesHvSpecification : Spek({ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } + it("should be able to direct 2 messages from different domains to one topic") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + + val messages = sut.handleConnection(sink, + vesMessage(Domain.HVRANMEAS), + vesMessage(Domain.HEARTBEAT), + vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING)) + + assertThat(messages).describedAs("number of routed messages").hasSize(3) + + assertThat(messages.get(0).topic).describedAs("first message topic") + .isEqualTo(HVRANMEAS_TOPIC) + + assertThat(messages.get(1).topic).describedAs("second message topic") + .isEqualTo(HVRANMEAS_TOPIC) + + assertThat(messages.get(2).topic).describedAs("last message topic") + .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + } + it("should drop message if route was not found") { val sink = StoringSink() val sut = Sut(sink) @@ -169,4 +193,5 @@ object VesHvSpecification : Spek({ assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") } } + }) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 82226dc2..47c31c6f 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -28,6 +28,7 @@ import reactor.core.publisher.UnicastProcessor const val HVRANMEAS_TOPIC = "ves_hvRanMeas" +const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( kafkaBootstrapServers = "localhost:9969", @@ -40,6 +41,27 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration( }.build() ) +val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(Domain.HEARTBEAT) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING) + toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + withFixedPartitioning() + } + }.build() +) + class FakeConfigurationProvider : ConfigurationProvider { private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() |