diff options
5 files changed, 59 insertions, 16 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index 12e1c1e6..3f355e34 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -27,20 +27,18 @@ internal class MessageValidator { val requiredFieldDescriptors = listOf( "version", "eventName", - "domain", + // "domain", TODO to be restored back when GPB schema will include default value "eventId", "sourceName", "reportingEntityName", - "priority", + // "priority", TODO to be restored back when GPB schema will include default value "startEpochMicrosec", "lastEpochMicrosec", "sequence") .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)} fun isValid(message: VesMessage): Boolean { - val header = message.header - val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS - return ret + return allMandatoryFieldsArePresent(message.header) } private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index 017187a4..4d1b879a 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -67,17 +67,16 @@ internal object MessageValidatorTest : Spek({ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() } - it("should reject message with domain other than HVRANMEAS") { - Domain.values() - .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED } - .forEach { domain -> + Domain.values() + .filter { it != Domain.UNRECOGNIZED } + .forEach {domain -> + it("should accept message with $domain domain"){ val header = newBuilder(commonHeader).setDomain(domain).build() val vesMessage = VesMessage(header, vesMessageBytes(header)) assertThat(cut.isValid(vesMessage)) - .describedAs("message with $domain domain") - .isFalse() + .isTrue() } - } + } } on("ves hv message bytes") { diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 44b3266e..d78463bf 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.time.Duration diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index d917c71a..aa5810d3 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,9 +23,10 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.Routing +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.tests.fakes.* import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain /** @@ -137,6 +138,29 @@ object VesHvSpecification : Spek({ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } + it("should be able to direct 2 messages from different domains to one topic") { + val sink = StoringSink() + val sut = Sut(sink) + + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + + val messages = sut.handleConnection(sink, + vesMessage(Domain.HVRANMEAS), + vesMessage(Domain.HEARTBEAT), + vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING)) + + assertThat(messages).describedAs("number of routed messages").hasSize(3) + + assertThat(messages.get(0).topic).describedAs("first message topic") + .isEqualTo(HVRANMEAS_TOPIC) + + assertThat(messages.get(1).topic).describedAs("second message topic") + .isEqualTo(HVRANMEAS_TOPIC) + + assertThat(messages.get(2).topic).describedAs("last message topic") + .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + } + it("should drop message if route was not found") { val sink = StoringSink() val sut = Sut(sink) @@ -169,4 +193,5 @@ object VesHvSpecification : Spek({ assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") } } + }) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 82226dc2..47c31c6f 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -28,6 +28,7 @@ import reactor.core.publisher.UnicastProcessor const val HVRANMEAS_TOPIC = "ves_hvRanMeas" +const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( kafkaBootstrapServers = "localhost:9969", @@ -40,6 +41,27 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration( }.build() ) +val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(Domain.HEARTBEAT) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING) + toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + withFixedPartitioning() + } + }.build() +) + class FakeConfigurationProvider : ConfigurationProvider { private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() |