aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-ct
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-ct')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt102
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt53
2 files changed, 95 insertions, 60 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 246fc7ed..5e6e666f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -37,22 +37,42 @@ object VesHvSpecification : Spek({
describe("VES High Volume Collector") {
it("should handle multiple HV RAN events") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
assertThat(messages)
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should not handle messages received from client after end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage,
+ anotherValidMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("first message should be released")
+ .isEqualTo(0)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(0)
+ assertThat(anotherValidMessage.refCnt())
+ .describedAs("second (not handled) message should not be released")
+ .isEqualTo(1)
+ }
}
describe("Memory management") {
it("should release memory for each handled and dropped message") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
@@ -76,13 +96,30 @@ object VesHvSpecification : Spek({
assertThat(msgWithTooBigPayload.refCnt())
.describedAs("message with payload exceeding 1MiB should be released")
.isEqualTo(expectedRefCnt)
+ }
+ it("should release memory for end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(expectedRefCnt)
}
it("should release memory for each message with invalid payload") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidPayload = invalidVesMessage()
val expectedRefCnt = 0
@@ -101,9 +138,7 @@ object VesHvSpecification : Spek({
}
it("should release memory for each message with garbage frame") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
@@ -124,9 +159,7 @@ object VesHvSpecification : Spek({
describe("message routing") {
it("should direct message to a topic by means of routing configuration") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
@@ -137,8 +170,7 @@ object VesHvSpecification : Spek({
}
it("should be able to direct 2 messages from different domains to one topic") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
@@ -149,20 +181,18 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages.get(0).topic).describedAs("first message topic")
+ assertThat(messages[0].topic).describedAs("first message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(1).topic).describedAs("second message topic")
+ assertThat(messages[1].topic).describedAs("second message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(2).topic).describedAs("last message topic")
+ assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
}
it("should drop message if route was not found") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
vesMessage(Domain.OTHER, "first"),
vesMessage(Domain.HVRANMEAS, "second"),
@@ -181,8 +211,7 @@ object VesHvSpecification : Spek({
val defaultTimeout = Duration.ofSeconds(10)
it("should update collector on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
val firstCollector = sut.collector
@@ -195,8 +224,7 @@ object VesHvSpecification : Spek({
}
it("should start routing messages on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
@@ -216,8 +244,7 @@ object VesHvSpecification : Spek({
}
it("should change domain routing on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -244,8 +271,7 @@ object VesHvSpecification : Spek({
}
it("should update routing for each client sending one message") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -274,8 +300,7 @@ object VesHvSpecification : Spek({
it("should not update routing for client sending continuous stream of messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -311,9 +336,7 @@ object VesHvSpecification : Spek({
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
vesMessage(Domain.HVRANMEAS, "first"),
@@ -326,3 +349,10 @@ object VesHvSpecification : Spek({
}
})
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ return Pair(sut, sink)
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
index e620e6b9..64b4ba26 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
@@ -23,9 +23,7 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.*
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
-import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -33,15 +31,19 @@ import java.util.*
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
+
+fun endOfTransmissionMessage(): ByteBuf =
+ allocator.buffer().writeByte(0xAA)
fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
@@ -65,22 +67,25 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // content type = GPB
}
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(
- domain,
- id,
- ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
- ).toByteString().asReadOnlyByteBuffer()
+ val gpb = vesEvent(
+ domain,
+ id,
+ ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ ).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS,
+ id: String = UUID.randomUUID().toString(),
+ hvRanMeasFields: ByteString = ByteString.EMPTY) =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()