summaryrefslogtreecommitdiffstats
path: root/hv-collector-domain
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 11:52:16 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 13:06:43 +0200
commit07bbbf71cd65b29f446a1b475add87f20365db83 (patch)
treee64fcf12c21e46358043744476d68765634d7f6f /hv-collector-domain
parent767d0464a19e0949d2919e6df15c9653dec50503 (diff)
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain')
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt67
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt26
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt29
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt26
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt26
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt164
6 files changed, 308 insertions, 30 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
index 5bd63d8b..8c8b4718 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.domain
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
@@ -53,20 +56,20 @@ import io.netty.buffer.ByteBufAllocator
* @since May 2018
*/
data class WireFrame(val payload: ByteBuf,
- val mark: Short,
val majorVersion: Short,
val minorVersion: Short,
val payloadSize: Int) {
+ constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes())
+
fun isValid(): Boolean =
- mark == FF_BYTE
- && majorVersion == SUPPORTED_MAJOR_VERSION
+ majorVersion == SUPPORTED_MAJOR_VERSION
&& payload.readableBytes() == payloadSize
fun encode(allocator: ByteBufAllocator): ByteBuf {
val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
- bb.writeByte(mark.toInt())
+ bb.writeByte(MARKER_BYTE.toInt())
bb.writeByte(majorVersion.toInt())
bb.writeByte(minorVersion.toInt())
bb.writeInt(payloadSize)
@@ -76,20 +79,58 @@ data class WireFrame(val payload: ByteBuf,
}
companion object {
- fun decode(byteBuf: ByteBuf): WireFrame {
- val mark = byteBuf.readUnsignedByte()
+ fun decodeFirst(byteBuf: ByteBuf): WireFrame {
+ verifyNotEmpty(byteBuf)
+ byteBuf.markReaderIndex()
+
+ verifyMarker(byteBuf)
+ verifyMinimumSize(byteBuf)
+
val majorVersion = byteBuf.readUnsignedByte()
val minorVersion = byteBuf.readUnsignedByte()
- val payloadSize = byteBuf.readInt()
- val payload = byteBuf.retainedSlice()
+ val payloadSize = verifyPayloadSize(byteBuf)
+
+ val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize)
+ byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize)
+
+ return WireFrame(payload, majorVersion, minorVersion, payloadSize)
+ }
+
+ private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
+ byteBuf.readInt().let { payloadSize ->
+ if (byteBuf.readableBytes() < payloadSize) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < payload size")
+ } else {
+ payloadSize
+ }
+ }
+
+ private fun verifyMinimumSize(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < HEADER_SIZE) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < header size")
+ }
+ }
+
+ private fun verifyMarker(byteBuf: ByteBuf) {
+ val mark = byteBuf.readUnsignedByte()
+ if (mark != MARKER_BYTE) {
+ byteBuf.resetReaderIndex()
+ throw InvalidWireFrameMarkerException(mark)
+ }
+ }
- return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
+ private fun verifyNotEmpty(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < 1) {
+ throw EmptyWireFrameException()
+ }
}
- private const val HEADER_SIZE =
+ const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
- 1 * java.lang.Integer.BYTES
- private const val FF_BYTE: Short = 0xFF
- private const val SUPPORTED_MAJOR_VERSION: Short = 1
+ 1 * java.lang.Integer.BYTES
+ const val MARKER_BYTE: Short = 0xFF
+ const val SUPPORTED_MAJOR_VERSION: Short = 1
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt
new file mode 100644
index 00000000..6e1ce935
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)")
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt
new file mode 100644
index 00000000..ff452a7a
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
new file mode 100644
index 00000000..7e4b3cef
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg)
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
new file mode 100644
index 00000000..11013834
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+open class WireFrameDecodingException(msg: String) : Exception(msg)
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
index 5a923c4e..00113267 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
@@ -1,29 +1,113 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
package org.onap.dcae.collectors.veshv.domain
-import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+import java.nio.charset.Charset
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
object WireFrameTest : Spek({
- describe("Wire Frame codec") {
- describe("encode-decode methods' compatibility") {
- val payloadContent = "test"
- val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII))
- val frame = WireFrame(payload = payload,
- majorVersion = 1,
+ val payloadAsString = "coffeebabe"
+
+ fun createSampleFrame() =
+ WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset())))
+
+ fun encodeSampleFrame() =
+ createSampleFrame().let {
+ Unpooled.buffer()
+ .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT))
+
+ }
+
+ describe("Wire Frame invariants") {
+
+ given("input with unsupported major version") {
+ val input = WireFrame(
+ payload = Unpooled.EMPTY_BUFFER,
+ majorVersion = 100,
minorVersion = 2,
- mark = 0xFF,
- payloadSize = payload.readableBytes())
+ payloadSize = 0)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with too small payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 1)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with too big payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 8)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("valid input") {
+ val payload = byteArrayOf(6, 9, 8, 6)
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(payload),
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = payload.size)
+
+ it("should pass validation") {
+ assertThat(input.isValid()).isTrue()
+ }
+ }
- val encoded = frame.encode(ByteBufAllocator.DEFAULT)
- val decoded = WireFrame.decode(encoded)
+
+ }
+
+ describe("Wire Frame codec") {
+
+ describe("encode-decode methods' compatibility") {
+ val frame = createSampleFrame()
+ val encoded = encodeSampleFrame()
+ val decoded = WireFrame.decodeFirst(encoded)
it("should decode major version") {
assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
@@ -33,17 +117,13 @@ object WireFrameTest : Spek({
assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion)
}
- it("should decode mark") {
- assertThat(decoded.mark).isEqualTo(frame.mark)
- }
-
it("should decode payload size") {
assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
}
it("should decode payload") {
- assertThat(decoded.payload.toString(Charsets.US_ASCII))
- .isEqualTo(payloadContent)
+ assertThat(decoded.payload.toString(Charset.defaultCharset()))
+ .isEqualTo(payloadAsString)
}
it("should retain decoded payload") {
@@ -51,5 +131,55 @@ object WireFrameTest : Spek({
assertThat(decoded.payload.refCnt()).isEqualTo(1)
}
}
+
+ describe("TCP framing") {
+ // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
+
+ it("should decode message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = WireFrame.decodeFirst(buff)
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
+
+ it("should throw exception when not even header fits") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+
+ assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ it("should throw exception when first byte is not 0xFF but length looks ok") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
+
+ assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ it("should throw exception when first byte is not 0xFF and length is to short") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xAA)
+
+ assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ it("should throw exception when payload doesn't fit") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ buff.writerIndex(buff.writerIndex() - 2)
+
+ assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ }
}
+
}) \ No newline at end of file