From f8a9a10a75bf139203fe9ea48a01708c7bda0781 Mon Sep 17 00:00:00 2001 From: fkrzywka Date: Tue, 3 Jul 2018 10:14:38 +0200 Subject: Enhance wire protocol Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka Issue-ID: DCAEGEN2-601 --- .../collectors/veshv/domain/WireFrameCodecsTest.kt | 90 +++++++++++++++------- 1 file changed, 63 insertions(+), 27 deletions(-) (limited to 'hv-collector-domain/src/test') diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 4d6f0716..a5242e0f 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -20,19 +20,18 @@ package org.onap.dcae.collectors.veshv.domain import arrow.core.Either -import arrow.core.identity import io.netty.buffer.Unpooled import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail import org.assertj.core.api.ObjectAssert import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue +import kotlin.test.fail /** * @author Piotr Jaszczyk @@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({ val decoder = WireFrameDecoder() fun createSampleFrame() = - WireFrame(payloadAsString.toByteArray(Charset.defaultCharset())) + PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { @@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { given("input with unsupported version") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 100, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported payload type") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 1, payloadTypeRaw = 0x69, @@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too small payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too big payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded).getOrFail() + val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() it("should decode version") { assertThat(decoded.version).isEqualTo(frame.version) @@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({ } } + describe("TCP framing") { // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 - it("should decode message leaving rest unread") { + it("should return error when buffer is empty") { + val buff = Unpooled.buffer() + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) } + } + + it("should return end-of-transmission message when given end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeBytes(encodeSampleFrame()) .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff).getOrFail() - assertThat(decoded.isValid()).describedAs("should be valid").isTrue() - assertThat(buff.readableBytes()).isEqualTo(1) + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } - it("should return error when not even header fits") { + it("should return error when given any single byte other than end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeByte(0xFF) + .writeByte(0xEE) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + } + it("should return error when payload message header does not fit") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + .writeBytes("MOMOM".toByteArray()) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } } - it("should return error when first byte is not 0xFF but length looks ok") { + it("should return error when length looks ok but first byte is not 0xFF or 0xAA") { val buff = Unpooled.buffer() - .writeByte(0xAA) + .writeByte(0x69) .writeBytes("some garbage".toByteArray()) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } } - it("should return error when first byte is not 0xFF and length is to short") { + it("should return end-of-transmission message when length looks ok and first byte is 0xAA") { val buff = Unpooled.buffer() .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) - decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } it("should return error when payload doesn't fit") { @@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) } } + it("should decode payload message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail() + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } } - describe("payload size limit"){ + describe("payload size limit") { it("should decode successfully when payload size is equal 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -237,5 +257,21 @@ private fun Either.assertFailedWithError(assertj: (ObjectAssert) fold({ assertj(assertThat(it)) }, { fail("Error expected") }) } -private fun Either.getOrFail(): WireFrame = - fold({ fail(it.message) }, ::identity) as WireFrame +private fun Either.getPayloadMessageOrFail(): PayloadWireFrameMessage = + fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() }) + +private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage = + this as? PayloadWireFrameMessage + ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}") + + +private fun assertIsEndOfTransmissionMessage(decoded: Either) { + decoded.getEndOfTransmissionMessageOrFail() +} + +private fun Either.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() }) + +private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + this as? EndOfTransmissionMessage + ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") -- cgit 1.2.3-korg