aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct/src
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct/src')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt102
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt53
2 files changed, 95 insertions, 60 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 246fc7ed..5e6e666f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -37,22 +37,42 @@ object VesHvSpecification : Spek({
describe("VES High Volume Collector") {
it("should handle multiple HV RAN events") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
assertThat(messages)
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should not handle messages received from client after end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage,
+ anotherValidMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("first message should be released")
+ .isEqualTo(0)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(0)
+ assertThat(anotherValidMessage.refCnt())
+ .describedAs("second (not handled) message should not be released")
+ .isEqualTo(1)
+ }
}
describe("Memory management") {
it("should release memory for each handled and dropped message") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
@@ -76,13 +96,30 @@ object VesHvSpecification : Spek({
assertThat(msgWithTooBigPayload.refCnt())
.describedAs("message with payload exceeding 1MiB should be released")
.isEqualTo(expectedRefCnt)
+ }
+ it("should release memory for end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(expectedRefCnt)
}
it("should release memory for each message with invalid payload") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidPayload = invalidVesMessage()
val expectedRefCnt = 0
@@ -101,9 +138,7 @@ object VesHvSpecification : Spek({
}
it("should release memory for each message with garbage frame") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
@@ -124,9 +159,7 @@ object VesHvSpecification : Spek({
describe("message routing") {
it("should direct message to a topic by means of routing configuration") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
@@ -137,8 +170,7 @@ object VesHvSpecification : Spek({
}
it("should be able to direct 2 messages from different domains to one topic") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
@@ -149,20 +181,18 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages.get(0).topic).describedAs("first message topic")
+ assertThat(messages[0].topic).describedAs("first message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(1).topic).describedAs("second message topic")
+ assertThat(messages[1].topic).describedAs("second message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(2).topic).describedAs("last message topic")
+ assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
}
it("should drop message if route was not found") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
vesMessage(Domain.OTHER, "first"),
vesMessage(Domain.HVRANMEAS, "second"),
@@ -181,8 +211,7 @@ object VesHvSpecification : Spek({
val defaultTimeout = Duration.ofSeconds(10)
it("should update collector on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
val firstCollector = sut.collector
@@ -195,8 +224,7 @@ object VesHvSpecification : Spek({
}
it("should start routing messages on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
@@ -216,8 +244,7 @@ object VesHvSpecification : Spek({
}
it("should change domain routing on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -244,8 +271,7 @@ object VesHvSpecification : Spek({
}
it("should update routing for each client sending one message") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -274,8 +300,7 @@ object VesHvSpecification : Spek({
it("should not update routing for client sending continuous stream of messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -311,9 +336,7 @@ object VesHvSpecification : Spek({
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
vesMessage(Domain.HVRANMEAS, "first"),
@@ -326,3 +349,10 @@ object VesHvSpecification : Spek({
}
})
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ return Pair(sut, sink)
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
index e620e6b9..64b4ba26 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
@@ -23,9 +23,7 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.*
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
-import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -33,15 +31,19 @@ import java.util.*
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
+
+fun endOfTransmissionMessage(): ByteBuf =
+ allocator.buffer().writeByte(0xAA)
fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
@@ -65,22 +67,25 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // content type = GPB
}
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(
- domain,
- id,
- ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
- ).toByteString().asReadOnlyByteBuffer()
+ val gpb = vesEvent(
+ domain,
+ id,
+ ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ ).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS,
+ id: String = UUID.randomUUID().toString(),
+ hvRanMeasFields: ByteString = ByteString.EMPTY) =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()