aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt6
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt88
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt27
3 files changed, 62 insertions, 59 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index ba29844a..8e6103c9 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
@@ -62,7 +62,7 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- commonEventHeader = commonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVMEAS),
messageType = VALID,
amount = numMessages
)
@@ -92,7 +92,7 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- commonEventHeader = commonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVMEAS),
messageType = VALID,
amount = numMessages
)
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index a9f3e9a4..60e10ee0 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,9 +24,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -39,7 +43,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.Flux
import java.time.Duration
@@ -54,8 +58,8 @@ object VesHvSpecification : Spek({
it("should handle multiple HV RAN events") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HVRANMEAS)
+ vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(HVMEAS)
)
assertThat(messages)
@@ -65,8 +69,8 @@ object VesHvSpecification : Spek({
it("should not handle messages received from client after end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
- val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
+ val anotherValidMessage = vesWireFrameMessage(HVMEAS)
val endOfTransmissionMessage = endOfTransmissionWireMessage()
val handledEvents = sut.handleConnection(sink,
@@ -91,23 +95,19 @@ object VesHvSpecification : Spek({
describe("Memory management") {
it("should release memory for each handled and dropped message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
- val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
+ val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
- sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
+ sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
assertThat(handledEvents).hasSize(1)
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
.isEqualTo(expectedRefCnt)
- assertThat(msgWithInvalidDomain.refCnt())
- .describedAs("message with invalid domain should be released")
- .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
.isEqualTo(expectedRefCnt)
@@ -118,7 +118,7 @@ object VesHvSpecification : Spek({
it("should release memory for end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val endOfTransmissionMessage = endOfTransmissionWireMessage()
val expectedRefCnt = 0
@@ -138,7 +138,7 @@ object VesHvSpecification : Spek({
it("should release memory for each message with invalid payload") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
val expectedRefCnt = 0
@@ -157,7 +157,7 @@ object VesHvSpecification : Spek({
it("should release memory for each message with garbage frame") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
@@ -179,11 +179,11 @@ object VesHvSpecification : Spek({
it("should direct message to a topic by means of routing configuration") {
val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
@@ -193,17 +193,17 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HEARTBEAT),
- vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+ vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(HEARTBEAT),
+ vesWireFrameMessage(MEASUREMENT))
assertThat(messages).describedAs("number of routed messages").hasSize(3)
assertThat(messages[0].topic).describedAs("first message topic")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(messages[1].topic).describedAs("second message topic")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
@@ -212,14 +212,14 @@ object VesHvSpecification : Spek({
it("should drop message if route was not found") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.OTHER, "first"),
- vesWireFrameMessage(Domain.HVRANMEAS, "second"),
- vesWireFrameMessage(Domain.HEARTBEAT, "third"))
+ vesWireFrameMessage(OTHER, "first"),
+ vesWireFrameMessage(HVMEAS, "second"),
+ vesWireFrameMessage(HEARTBEAT, "third"))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
@@ -253,41 +253,41 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).isEmpty()
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
.isEqualTo(0)
}
it("should change domain routing") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
}
@@ -302,13 +302,13 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
}
}.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
}.then().block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
@@ -329,14 +329,14 @@ object VesHvSpecification : Spek({
println("config changed")
}
}
- .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+ .map { vesWireFrameMessage(HVMEAS) }
sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
@@ -373,9 +373,9 @@ object VesHvSpecification : Spek({
val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS, "first"),
- vesMessageWithTooBigPayload(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HVRANMEAS))
+ vesWireFrameMessage(HVMEAS, "first"),
+ vesMessageWithTooBigPayload(HVMEAS),
+ vesWireFrameMessage(HVMEAS))
assertThat(handledMessages).hasSize(1)
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index ebeaa69e..688f2758 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,24 +20,27 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
-const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val HVMEAS_TOPIC = "ves_hvRanMeas"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
-const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
+const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
}.build()
@@ -47,17 +50,17 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
defineRoute {
- fromDomain(Domain.HEARTBEAT)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HEARTBEAT.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
defineRoute {
- fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+ fromDomain(MEASUREMENT.name)
toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
withFixedPartitioning()
}
@@ -69,8 +72,8 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(ALTERNATE_HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(ALTERNATE_HVMEAS_TOPIC)
withFixedPartitioning()
}
}.build()