diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-domain/src/test | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain/src/test')
-rw-r--r-- | hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt | 90 |
1 files changed, 63 insertions, 27 deletions
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 4d6f0716..a5242e0f 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -20,19 +20,18 @@ package org.onap.dcae.collectors.veshv.domain import arrow.core.Either -import arrow.core.identity import io.netty.buffer.Unpooled import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail import org.assertj.core.api.ObjectAssert import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue +import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({ val decoder = WireFrameDecoder() fun createSampleFrame() = - WireFrame(payloadAsString.toByteArray(Charset.defaultCharset())) + PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { @@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { given("input with unsupported version") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 100, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported payload type") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 1, payloadTypeRaw = 0x69, @@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too small payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too big payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded).getOrFail() + val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() it("should decode version") { assertThat(decoded.version).isEqualTo(frame.version) @@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({ } } + describe("TCP framing") { // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 - it("should decode message leaving rest unread") { + it("should return error when buffer is empty") { + val buff = Unpooled.buffer() + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) } + } + + it("should return end-of-transmission message when given end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeBytes(encodeSampleFrame()) .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff).getOrFail() - assertThat(decoded.isValid()).describedAs("should be valid").isTrue() - assertThat(buff.readableBytes()).isEqualTo(1) + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } - it("should return error when not even header fits") { + it("should return error when given any single byte other than end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeByte(0xFF) + .writeByte(0xEE) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + } + it("should return error when payload message header does not fit") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + .writeBytes("MOMOM".toByteArray()) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } } - it("should return error when first byte is not 0xFF but length looks ok") { + it("should return error when length looks ok but first byte is not 0xFF or 0xAA") { val buff = Unpooled.buffer() - .writeByte(0xAA) + .writeByte(0x69) .writeBytes("some garbage".toByteArray()) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } } - it("should return error when first byte is not 0xFF and length is to short") { + it("should return end-of-transmission message when length looks ok and first byte is 0xAA") { val buff = Unpooled.buffer() .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) - decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } it("should return error when payload doesn't fit") { @@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) } } + it("should decode payload message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail() + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } } - describe("payload size limit"){ + describe("payload size limit") { it("should decode successfully when payload size is equal 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) fold({ assertj(assertThat(it)) }, { fail("Error expected") }) } -private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame = - fold({ fail(it.message) }, ::identity) as WireFrame +private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage = + fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() }) + +private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage = + this as? PayloadWireFrameMessage + ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}") + + +private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) { + decoded.getEndOfTransmissionMessageOrFail() +} + +private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() }) + +private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + this as? EndOfTransmissionMessage + ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") |