aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-domain
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-domain')
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt)41
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt50
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt15
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt90
4 files changed, 145 insertions, 51 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt
index db6e1070..4811a2b4 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt
@@ -19,6 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.domain
+
+sealed class WireFrameMessage
+
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
*
@@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class WireFrame(val payload: ByteData,
- val version: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int) {
+data class PayloadWireFrameMessage(val payload: ByteData,
+ val version: Short,
+ val payloadTypeRaw: Short,
+ val payloadSize: Int
+) : WireFrameMessage() {
constructor(payload: ByteArray) : this(
ByteData(payload),
@@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData,
&& payload.size() == payloadSize
companion object {
+ const val MARKER_BYTE: Short = 0xFF
+
const val SUPPORTED_VERSION: Short = 1
const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
1 * java.lang.Integer.BYTES
- const val MARKER_BYTE: Short = 0xFF
+
+ const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
+
+
+/**
+ * This message type should be used by client to indicate that he has finished sending data to collector.
+ *
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ * ┌─────┬───────────────────────┐
+ * │octet│ 0 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * │ bit │ 0│ │ │ │ │ │ │ │
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ 0xAA │
+ * └─────┴───────────────────────┘
+ * ```
+ *
+ * @since July 2018
+ */
+
+object EndOfTransmissionMessage : WireFrameMessage() {
+ const val MARKER_BYTE: Short = 0xAA
+}
+
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 39841d6a..ab82dc04 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,6 +24,7 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator
*/
class WireFrameEncoder(val allocator: ByteBufAllocator) {
- fun encode(frame: WireFrame): ByteBuf {
- val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+ fun encode(frame: PayloadWireFrameMessage): ByteBuf {
+ val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
- bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+ bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
bb.writeByte(frame.version.toInt())
bb.writeByte(frame.payloadTypeRaw.toInt())
bb.writeInt(frame.payloadSize)
@@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) {
*/
class WireFrameDecoder {
- fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+ fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
- else -> parseFrame(byteBuf)
+ else -> parseWireFrame(byteBuf)
}
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
-
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+ private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
+
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
+
+ private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
byteBuf.markReaderIndex()
+ val byte = byteBuf.readUnsignedByte()
- val mark = byteBuf.readUnsignedByte()
- if (mark != WireFrame.MARKER_BYTE) {
+ return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
+ Right(EndOfTransmissionMessage)
+ } else {
byteBuf.resetReaderIndex()
- return Left(InvalidWireFrameMarker(mark))
+ Left(MissingWireFrameHeaderBytes)
+ }
+ }
+
+ private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+ byteBuf.markReaderIndex()
+
+ val mark = byteBuf.readUnsignedByte()
+ return when (mark) {
+ EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
+ PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ else -> {
+ byteBuf.resetReaderIndex()
+ Left(InvalidWireFrameMarker(mark))
+ }
}
+ }
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
val version = byteBuf.readUnsignedByte()
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
if (payloadSize > MAX_PAYLOAD_SIZE) {
+ byteBuf.resetReaderIndex()
return Left(PayloadSizeExceeded)
}
@@ -86,10 +109,7 @@ class WireFrameDecoder {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
- }
+ return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize))
- companion object {
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index 626bf329..d82bb25f 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String)
sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
-class InvalidWireFrameMarker(actualMarker: Short)
- : InvalidWireFrame(
- "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
+ .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+
+// Other
+
+class UnknownWireFrameTypeException(frame: WireFrameMessage)
+ : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 4d6f0716..a5242e0f 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -20,19 +20,18 @@
package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
-import arrow.core.identity
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({
val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
+ PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
@@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({
describe("Wire Frame invariants") {
given("input with unsupported version") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 100,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported payload type") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 1,
payloadTypeRaw = 0x69,
@@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too small payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too big payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getOrFail()
+ val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
it("should decode version") {
assertThat(decoded.version).isEqualTo(frame.version)
@@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({
}
}
+
describe("TCP framing") {
// see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
- it("should decode message leaving rest unread") {
+ it("should return error when buffer is empty") {
+ val buff = Unpooled.buffer()
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+ }
+
+ it("should return end-of-transmission message when given end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getOrFail()
- assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
- assertThat(buff.readableBytes()).isEqualTo(1)
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
- it("should return error when not even header fits") {
+ it("should return error when given any single byte other than end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeByte(0xFF)
+ .writeByte(0xEE)
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ }
+ it("should return error when payload message header does not fit") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+ .writeBytes("MOMOM".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
- it("should return error when first byte is not 0xFF but length looks ok") {
+ it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
val buff = Unpooled.buffer()
- .writeByte(0xAA)
+ .writeByte(0x69)
.writeBytes("some garbage".toByteArray())
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
- it("should return error when first byte is not 0xFF and length is to short") {
+ it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
- decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
it("should return error when payload doesn't fit") {
@@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
}
+ it("should decode payload message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
}
- describe("payload size limit"){
+ describe("payload size limit") {
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
- fold({ fail(it.message) }, ::identity) as WireFrame
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
+ fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
+
+private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
+ this as? PayloadWireFrameMessage
+ ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
+
+
+private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
+ decoded.getEndOfTransmissionMessageOrFail()
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")