diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-domain | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain')
-rw-r--r-- | hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt) | 41 | ||||
-rw-r--r-- | hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt | 50 | ||||
-rw-r--r-- | hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt | 15 | ||||
-rw-r--r-- | hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt | 90 |
4 files changed, 145 insertions, 51 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt index db6e1070..4811a2b4 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt @@ -19,6 +19,9 @@ */ package org.onap.dcae.collectors.veshv.domain + +sealed class WireFrameMessage + /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). * @@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class WireFrame(val payload: ByteData, - val version: Short, - val payloadTypeRaw: Short, - val payloadSize: Int) { +data class PayloadWireFrameMessage(val payload: ByteData, + val version: Short, + val payloadTypeRaw: Short, + val payloadSize: Int +) : WireFrameMessage() { constructor(payload: ByteArray) : this( ByteData(payload), @@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData, && payload.size() == payloadSize companion object { + const val MARKER_BYTE: Short = 0xFF + const val SUPPORTED_VERSION: Short = 1 const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + 1 * java.lang.Integer.BYTES - const val MARKER_BYTE: Short = 0xFF + + const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } + + +/** + * This message type should be used by client to indicate that he has finished sending data to collector. + * + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * ┌─────┬───────────────────────┐ + * │octet│ 0 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤ + * │ bit │ 0│ │ │ │ │ │ │ │ + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ 0xAA │ + * └─────┴───────────────────────┘ + * ``` + * + * @since July 2018 + */ + +object EndOfTransmissionMessage : WireFrameMessage() { + const val MARKER_BYTE: Short = 0xAA +} + diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index 39841d6a..ab82dc04 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,6 +24,7 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator */ class WireFrameEncoder(val allocator: ByteBufAllocator) { - fun encode(frame: WireFrame): ByteBuf { - val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size()) + fun encode(frame: PayloadWireFrameMessage): ByteBuf { + val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - bb.writeByte(WireFrame.MARKER_BYTE.toInt()) + bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) bb.writeByte(frame.version.toInt()) bb.writeByte(frame.payloadTypeRaw.toInt()) bb.writeInt(frame.payloadSize) @@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) { */ class WireFrameDecoder { - fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> = + fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) + isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) - else -> parseFrame(byteBuf) + else -> parseWireFrame(byteBuf) } - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE - private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> { + private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 + + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE + + private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> { byteBuf.markReaderIndex() + val byte = byteBuf.readUnsignedByte() - val mark = byteBuf.readUnsignedByte() - if (mark != WireFrame.MARKER_BYTE) { + return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { + Right(EndOfTransmissionMessage) + } else { byteBuf.resetReaderIndex() - return Left(InvalidWireFrameMarker(mark)) + Left(MissingWireFrameHeaderBytes) + } + } + + private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { + byteBuf.markReaderIndex() + + val mark = byteBuf.readUnsignedByte() + return when (mark) { + EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) + PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + else -> { + byteBuf.resetReaderIndex() + Left(InvalidWireFrameMarker(mark)) + } } + } + private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> { val version = byteBuf.readUnsignedByte() val payloadTypeRaw = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() if (payloadSize > MAX_PAYLOAD_SIZE) { + byteBuf.resetReaderIndex() return Left(PayloadSizeExceeded) } @@ -86,10 +109,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize)) - } + return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize)) - companion object { - const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index 626bf329..d82bb25f 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String) sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) -class InvalidWireFrameMarker(actualMarker: Short) - : InvalidWireFrame( - "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) +class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X" + .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) +) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") + + +// Other + +class UnknownWireFrameTypeException(frame: WireFrameMessage) + : Throwable("Unexpected wire frame message type: ${frame.javaClass}") diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 4d6f0716..a5242e0f 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -20,19 +20,18 @@ package org.onap.dcae.collectors.veshv.domain import arrow.core.Either -import arrow.core.identity import io.netty.buffer.Unpooled import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail import org.assertj.core.api.ObjectAssert import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue +import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({ val decoder = WireFrameDecoder() fun createSampleFrame() = - WireFrame(payloadAsString.toByteArray(Charset.defaultCharset())) + PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { @@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { given("input with unsupported version") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 100, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported payload type") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 1, payloadTypeRaw = 0x69, @@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too small payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too big payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded).getOrFail() + val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() it("should decode version") { assertThat(decoded.version).isEqualTo(frame.version) @@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({ } } + describe("TCP framing") { // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 - it("should decode message leaving rest unread") { + it("should return error when buffer is empty") { + val buff = Unpooled.buffer() + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) } + } + + it("should return end-of-transmission message when given end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeBytes(encodeSampleFrame()) .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff).getOrFail() - assertThat(decoded.isValid()).describedAs("should be valid").isTrue() - assertThat(buff.readableBytes()).isEqualTo(1) + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } - it("should return error when not even header fits") { + it("should return error when given any single byte other than end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeByte(0xFF) + .writeByte(0xEE) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + } + it("should return error when payload message header does not fit") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + .writeBytes("MOMOM".toByteArray()) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } } - it("should return error when first byte is not 0xFF but length looks ok") { + it("should return error when length looks ok but first byte is not 0xFF or 0xAA") { val buff = Unpooled.buffer() - .writeByte(0xAA) + .writeByte(0x69) .writeBytes("some garbage".toByteArray()) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } } - it("should return error when first byte is not 0xFF and length is to short") { + it("should return end-of-transmission message when length looks ok and first byte is 0xAA") { val buff = Unpooled.buffer() .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) - decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } it("should return error when payload doesn't fit") { @@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) } } + it("should decode payload message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail() + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } } - describe("payload size limit"){ + describe("payload size limit") { it("should decode successfully when payload size is equal 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) fold({ assertj(assertThat(it)) }, { fail("Error expected") }) } -private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame = - fold({ fail(it.message) }, ::identity) as WireFrame +private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage = + fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() }) + +private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage = + this as? PayloadWireFrameMessage + ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}") + + +private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) { + decoded.getEndOfTransmissionMessageOrFail() +} + +private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() }) + +private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + this as? EndOfTransmissionMessage + ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") |