summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt8
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt13
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt1
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt31
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt22
5 files changed, 59 insertions, 16 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index 12e1c1e6..3f355e34 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -27,20 +27,18 @@ internal class MessageValidator {
val requiredFieldDescriptors = listOf(
"version",
"eventName",
- "domain",
+ // "domain", TODO to be restored back when GPB schema will include default value
"eventId",
"sourceName",
"reportingEntityName",
- "priority",
+ // "priority", TODO to be restored back when GPB schema will include default value
"startEpochMicrosec",
"lastEpochMicrosec",
"sequence")
.map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)}
fun isValid(message: VesMessage): Boolean {
- val header = message.header
- val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
- return ret
+ return allMandatoryFieldsArePresent(message.header)
}
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index 017187a4..4d1b879a 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -67,17 +67,16 @@ internal object MessageValidatorTest : Spek({
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
}
- it("should reject message with domain other than HVRANMEAS") {
- Domain.values()
- .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED }
- .forEach { domain ->
+ Domain.values()
+ .filter { it != Domain.UNRECOGNIZED }
+ .forEach {domain ->
+ it("should accept message with $domain domain"){
val header = newBuilder(commonHeader).setDomain(domain).build()
val vesMessage = VesMessage(header, vesMessageBytes(header))
assertThat(cut.isValid(vesMessage))
- .describedAs("message with $domain domain")
- .isFalse()
+ .isTrue()
}
- }
+ }
}
on("ves hv message bytes") {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 44b3266e..d78463bf 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.time.Duration
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index d917c71a..aa5810d3 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,9 +23,10 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.Routing
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.tests.fakes.*
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
/**
@@ -137,6 +138,29 @@ object VesHvSpecification : Spek({
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
+ it("should be able to direct 2 messages from different domains to one topic") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+ val messages = sut.handleConnection(sink,
+ vesMessage(Domain.HVRANMEAS),
+ vesMessage(Domain.HEARTBEAT),
+ vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+ assertThat(messages.get(0).topic).describedAs("first message topic")
+ .isEqualTo(HVRANMEAS_TOPIC)
+
+ assertThat(messages.get(1).topic).describedAs("second message topic")
+ .isEqualTo(HVRANMEAS_TOPIC)
+
+ assertThat(messages.get(2).topic).describedAs("last message topic")
+ .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ }
+
it("should drop message if route was not found") {
val sink = StoringSink()
val sut = Sut(sink)
@@ -169,4 +193,5 @@ object VesHvSpecification : Spek({
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
}
}
+
})
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 82226dc2..47c31c6f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -28,6 +28,7 @@ import reactor.core.publisher.UnicastProcessor
const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
@@ -40,6 +41,27 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
}.build()
)
+val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(Domain.HEARTBEAT)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+ toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
class FakeConfigurationProvider : ConfigurationProvider {
private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()