diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-07 11:52:16 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 13:06:43 +0200 |
commit | 07bbbf71cd65b29f446a1b475add87f20365db83 (patch) | |
tree | e64fcf12c21e46358043744476d68765634d7f6f /hv-collector-domain | |
parent | 767d0464a19e0949d2919e6df15c9653dec50503 (diff) |
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer
snapshots - not separate messages. That means that we need to join
incomming buffers and then split it into separate WireFrames.
Closes ONAP-312
Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain')
6 files changed, 308 insertions, 30 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 5bd63d8b..8c8b4718 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.domain import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). @@ -53,20 +56,20 @@ import io.netty.buffer.ByteBufAllocator * @since May 2018 */ data class WireFrame(val payload: ByteBuf, - val mark: Short, val majorVersion: Short, val minorVersion: Short, val payloadSize: Int) { + constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes()) + fun isValid(): Boolean = - mark == FF_BYTE - && majorVersion == SUPPORTED_MAJOR_VERSION + majorVersion == SUPPORTED_MAJOR_VERSION && payload.readableBytes() == payloadSize fun encode(allocator: ByteBufAllocator): ByteBuf { val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes()) - bb.writeByte(mark.toInt()) + bb.writeByte(MARKER_BYTE.toInt()) bb.writeByte(majorVersion.toInt()) bb.writeByte(minorVersion.toInt()) bb.writeInt(payloadSize) @@ -76,20 +79,58 @@ data class WireFrame(val payload: ByteBuf, } companion object { - fun decode(byteBuf: ByteBuf): WireFrame { - val mark = byteBuf.readUnsignedByte() + fun decodeFirst(byteBuf: ByteBuf): WireFrame { + verifyNotEmpty(byteBuf) + byteBuf.markReaderIndex() + + verifyMarker(byteBuf) + verifyMinimumSize(byteBuf) + val majorVersion = byteBuf.readUnsignedByte() val minorVersion = byteBuf.readUnsignedByte() - val payloadSize = byteBuf.readInt() - val payload = byteBuf.retainedSlice() + val payloadSize = verifyPayloadSize(byteBuf) + + val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize) + byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize) + + return WireFrame(payload, majorVersion, minorVersion, payloadSize) + } + + private fun verifyPayloadSize(byteBuf: ByteBuf): Int = + byteBuf.readInt().let { payloadSize -> + if (byteBuf.readableBytes() < payloadSize) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < payload size") + } else { + payloadSize + } + } + + private fun verifyMinimumSize(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < HEADER_SIZE) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < header size") + } + } + + private fun verifyMarker(byteBuf: ByteBuf) { + val mark = byteBuf.readUnsignedByte() + if (mark != MARKER_BYTE) { + byteBuf.resetReaderIndex() + throw InvalidWireFrameMarkerException(mark) + } + } - return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) + private fun verifyNotEmpty(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < 1) { + throw EmptyWireFrameException() + } } - private const val HEADER_SIZE = + const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + - 1 * java.lang.Integer.BYTES - private const val FF_BYTE: Short = 0xFF - private const val SUPPORTED_MAJOR_VERSION: Short = 1 + 1 * java.lang.Integer.BYTES + const val MARKER_BYTE: Short = 0xFF + const val SUPPORTED_MAJOR_VERSION: Short = 1 } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt new file mode 100644 index 00000000..6e1ce935 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)") diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt new file mode 100644 index 00000000..ff452a7a --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +import org.onap.dcae.collectors.veshv.domain.WireFrame + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt new file mode 100644 index 00000000..7e4b3cef --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg) diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt new file mode 100644 index 00000000..11013834 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +open class WireFrameDecodingException(msg: String) : Exception(msg) diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt index 5a923c4e..00113267 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt @@ -1,29 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.domain -import io.netty.buffer.ByteBufAllocator import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException +import java.nio.charset.Charset /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ object WireFrameTest : Spek({ - describe("Wire Frame codec") { - describe("encode-decode methods' compatibility") { - val payloadContent = "test" - val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII)) - val frame = WireFrame(payload = payload, - majorVersion = 1, + val payloadAsString = "coffeebabe" + + fun createSampleFrame() = + WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset()))) + + fun encodeSampleFrame() = + createSampleFrame().let { + Unpooled.buffer() + .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT)) + + } + + describe("Wire Frame invariants") { + + given("input with unsupported major version") { + val input = WireFrame( + payload = Unpooled.EMPTY_BUFFER, + majorVersion = 100, minorVersion = 2, - mark = 0xFF, - payloadSize = payload.readableBytes()) + payloadSize = 0) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too small payload size") { + val input = WireFrame( + payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), + majorVersion = 1, + minorVersion = 0, + payloadSize = 1) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too big payload size") { + val input = WireFrame( + payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), + majorVersion = 1, + minorVersion = 0, + payloadSize = 8) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("valid input") { + val payload = byteArrayOf(6, 9, 8, 6) + val input = WireFrame( + payload = Unpooled.wrappedBuffer(payload), + majorVersion = 1, + minorVersion = 0, + payloadSize = payload.size) + + it("should pass validation") { + assertThat(input.isValid()).isTrue() + } + } - val encoded = frame.encode(ByteBufAllocator.DEFAULT) - val decoded = WireFrame.decode(encoded) + + } + + describe("Wire Frame codec") { + + describe("encode-decode methods' compatibility") { + val frame = createSampleFrame() + val encoded = encodeSampleFrame() + val decoded = WireFrame.decodeFirst(encoded) it("should decode major version") { assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion) @@ -33,17 +117,13 @@ object WireFrameTest : Spek({ assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion) } - it("should decode mark") { - assertThat(decoded.mark).isEqualTo(frame.mark) - } - it("should decode payload size") { assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize) } it("should decode payload") { - assertThat(decoded.payload.toString(Charsets.US_ASCII)) - .isEqualTo(payloadContent) + assertThat(decoded.payload.toString(Charset.defaultCharset())) + .isEqualTo(payloadAsString) } it("should retain decoded payload") { @@ -51,5 +131,55 @@ object WireFrameTest : Spek({ assertThat(decoded.payload.refCnt()).isEqualTo(1) } } + + describe("TCP framing") { + // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 + + it("should decode message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = WireFrame.decodeFirst(buff) + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } + + it("should throw exception when not even header fits") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + + assertThatExceptionOfType(MissingWireFrameBytesException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + it("should throw exception when first byte is not 0xFF but length looks ok") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) + + assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + it("should throw exception when first byte is not 0xFF and length is to short") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + + assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + it("should throw exception when payload doesn't fit") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + buff.writerIndex(buff.writerIndex() - 2) + + assertThatExceptionOfType(MissingWireFrameBytesException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + } } + })
\ No newline at end of file |