diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-ct | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-ct')
2 files changed, 95 insertions, 60 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 246fc7ed..5e6e666f 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -37,22 +37,42 @@ object VesHvSpecification : Spek({ describe("VES High Volume Collector") { it("should handle multiple HV RAN events") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) assertThat(messages) .describedAs("should send all events") .hasSize(2) } + + it("should not handle messages received from client after end-of-transmission message") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesMessage(Domain.HVRANMEAS) + val anotherValidMessage = vesMessage(Domain.HVRANMEAS) + val endOfTransmissionMessage = endOfTransmissionMessage() + + val handledEvents = sut.handleConnection(sink, + validMessage, + endOfTransmissionMessage, + anotherValidMessage + ) + + assertThat(handledEvents).hasSize(1) + assertThat(validMessage.refCnt()) + .describedAs("first message should be released") + .isEqualTo(0) + assertThat(endOfTransmissionMessage.refCnt()) + .describedAs("end-of-transmission message should be released") + .isEqualTo(0) + assertThat(anotherValidMessage.refCnt()) + .describedAs("second (not handled) message should not be released") + .isEqualTo(1) + } } describe("Memory management") { it("should release memory for each handled and dropped message") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) val msgWithInvalidFrame = invalidWireFrame() @@ -76,13 +96,30 @@ object VesHvSpecification : Spek({ assertThat(msgWithTooBigPayload.refCnt()) .describedAs("message with payload exceeding 1MiB should be released") .isEqualTo(expectedRefCnt) + } + it("should release memory for end-of-transmission message") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesMessage(Domain.HVRANMEAS) + val endOfTransmissionMessage = endOfTransmissionMessage() + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection(sink, + validMessage, + endOfTransmissionMessage + ) + + assertThat(handledEvents).hasSize(1) + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(endOfTransmissionMessage.refCnt()) + .describedAs("end-of-transmission message should be released") + .isEqualTo(expectedRefCnt) } it("should release memory for each message with invalid payload") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidPayload = invalidVesMessage() val expectedRefCnt = 0 @@ -101,9 +138,7 @@ object VesHvSpecification : Spek({ } it("should release memory for each message with garbage frame") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 @@ -124,9 +159,7 @@ object VesHvSpecification : Spek({ describe("message routing") { it("should direct message to a topic by means of routing configuration") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) @@ -137,8 +170,7 @@ object VesHvSpecification : Spek({ } it("should be able to direct 2 messages from different domains to one topic") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) @@ -149,20 +181,18 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(3) - assertThat(messages.get(0).topic).describedAs("first message topic") + assertThat(messages[0].topic).describedAs("first message topic") .isEqualTo(HVRANMEAS_TOPIC) - assertThat(messages.get(1).topic).describedAs("second message topic") + assertThat(messages[1].topic).describedAs("second message topic") .isEqualTo(HVRANMEAS_TOPIC) - assertThat(messages.get(2).topic).describedAs("last message topic") + assertThat(messages[2].topic).describedAs("last message topic") .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) } it("should drop message if route was not found") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, vesMessage(Domain.OTHER, "first"), vesMessage(Domain.HVRANMEAS, "second"), @@ -181,8 +211,7 @@ object VesHvSpecification : Spek({ val defaultTimeout = Duration.ofSeconds(10) it("should update collector on configuration change") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) val firstCollector = sut.collector @@ -195,8 +224,7 @@ object VesHvSpecification : Spek({ } it("should start routing messages on configuration change") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(configurationWithoutRouting) @@ -216,8 +244,7 @@ object VesHvSpecification : Spek({ } it("should change domain routing on configuration change") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) @@ -244,8 +271,7 @@ object VesHvSpecification : Spek({ } it("should update routing for each client sending one message") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) @@ -274,8 +300,7 @@ object VesHvSpecification : Spek({ it("should not update routing for client sending continuous stream of messages") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) @@ -311,9 +336,7 @@ object VesHvSpecification : Spek({ describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val handledMessages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS, "first"), @@ -326,3 +349,10 @@ object VesHvSpecification : Spek({ } }) + +private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { + val sink = StoringSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + return Pair(sut, sink) +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt index e620e6b9..64b4ba26 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt @@ -23,9 +23,7 @@ import com.google.protobuf.ByteString import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.* -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE -import org.onap.ves.HVRanMeasFieldsV5 +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -33,15 +31,19 @@ import java.util.* val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT -fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = + allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB - val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes -} + val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes + } + +fun endOfTransmissionMessage(): ByteBuf = + allocator.buffer().writeByte(0xAA) fun invalidVesMessage(): ByteBuf = allocator.buffer().run { @@ -65,22 +67,25 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run { writeByte(0x01) // content type = GPB } -fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB +fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = + allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB - val gpb = vesEvent( - domain, - id, - ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) - ).toByteString().asReadOnlyByteBuffer() + val gpb = vesEvent( + domain, + id, + ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) + ).toByteString().asReadOnlyByteBuffer() - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes -} + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes + } -fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) = +fun vesEvent(domain: Domain = Domain.HVRANMEAS, + id: String = UUID.randomUUID().toString(), + hvRanMeasFields: ByteString = ByteString.EMPTY) = VesEvent.newBuilder() .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() |