aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-domain/src
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-domain/src
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain/src')
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt)41
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt50
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt15
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt90
4 files changed, 145 insertions, 51 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt
index db6e1070..4811a2b4 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt
@@ -19,6 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.domain
+
+sealed class WireFrameMessage
+
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
*
@@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class WireFrame(val payload: ByteData,
- val version: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int) {
+data class PayloadWireFrameMessage(val payload: ByteData,
+ val version: Short,
+ val payloadTypeRaw: Short,
+ val payloadSize: Int
+) : WireFrameMessage() {
constructor(payload: ByteArray) : this(
ByteData(payload),
@@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData,
&& payload.size() == payloadSize
companion object {
+ const val MARKER_BYTE: Short = 0xFF
+
const val SUPPORTED_VERSION: Short = 1
const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
1 * java.lang.Integer.BYTES
- const val MARKER_BYTE: Short = 0xFF
+
+ const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
+
+
+/**
+ * This message type should be used by client to indicate that he has finished sending data to collector.
+ *
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ * ┌─────┬───────────────────────┐
+ * │octet│ 0 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * │ bit │ 0│ │ │ │ │ │ │ │
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ 0xAA │
+ * └─────┴───────────────────────┘
+ * ```
+ *
+ * @since July 2018
+ */
+
+object EndOfTransmissionMessage : WireFrameMessage() {
+ const val MARKER_BYTE: Short = 0xAA
+}
+
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 39841d6a..ab82dc04 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,6 +24,7 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator
*/
class WireFrameEncoder(val allocator: ByteBufAllocator) {
- fun encode(frame: WireFrame): ByteBuf {
- val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+ fun encode(frame: PayloadWireFrameMessage): ByteBuf {
+ val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
- bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+ bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
bb.writeByte(frame.version.toInt())
bb.writeByte(frame.payloadTypeRaw.toInt())
bb.writeInt(frame.payloadSize)
@@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) {
*/
class WireFrameDecoder {
- fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+ fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
- else -> parseFrame(byteBuf)
+ else -> parseWireFrame(byteBuf)
}
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
-
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+ private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
+
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
+
+ private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
byteBuf.markReaderIndex()
+ val byte = byteBuf.readUnsignedByte()
- val mark = byteBuf.readUnsignedByte()
- if (mark != WireFrame.MARKER_BYTE) {
+ return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
+ Right(EndOfTransmissionMessage)
+ } else {
byteBuf.resetReaderIndex()
- return Left(InvalidWireFrameMarker(mark))
+ Left(MissingWireFrameHeaderBytes)
+ }
+ }
+
+ private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+ byteBuf.markReaderIndex()
+
+ val mark = byteBuf.readUnsignedByte()
+ return when (mark) {
+ EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
+ PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ else -> {
+ byteBuf.resetReaderIndex()
+ Left(InvalidWireFrameMarker(mark))
+ }
}
+ }
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
val version = byteBuf.readUnsignedByte()
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
if (payloadSize > MAX_PAYLOAD_SIZE) {
+ byteBuf.resetReaderIndex()
return Left(PayloadSizeExceeded)
}
@@ -86,10 +109,7 @@ class WireFrameDecoder {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
- }
+ return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize))
- companion object {
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index 626bf329..d82bb25f 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String)
sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
-class InvalidWireFrameMarker(actualMarker: Short)
- : InvalidWireFrame(
- "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
+ .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+
+// Other
+
+class UnknownWireFrameTypeException(frame: WireFrameMessage)
+ : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 4d6f0716..a5242e0f 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -20,19 +20,18 @@
package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
-import arrow.core.identity
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({
val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
+ PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
@@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({
describe("Wire Frame invariants") {
given("input with unsupported version") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 100,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported payload type") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 1,
payloadTypeRaw = 0x69,
@@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too small payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too big payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getOrFail()
+ val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
it("should decode version") {
assertThat(decoded.version).isEqualTo(frame.version)
@@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({
}
}
+
describe("TCP framing") {
// see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
- it("should decode message leaving rest unread") {
+ it("should return error when buffer is empty") {
+ val buff = Unpooled.buffer()
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+ }
+
+ it("should return end-of-transmission message when given end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getOrFail()
- assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
- assertThat(buff.readableBytes()).isEqualTo(1)
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
- it("should return error when not even header fits") {
+ it("should return error when given any single byte other than end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeByte(0xFF)
+ .writeByte(0xEE)
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ }
+ it("should return error when payload message header does not fit") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+ .writeBytes("MOMOM".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
- it("should return error when first byte is not 0xFF but length looks ok") {
+ it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
val buff = Unpooled.buffer()
- .writeByte(0xAA)
+ .writeByte(0x69)
.writeBytes("some garbage".toByteArray())
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
- it("should return error when first byte is not 0xFF and length is to short") {
+ it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
- decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
it("should return error when payload doesn't fit") {
@@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
}
+ it("should decode payload message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
}
- describe("payload size limit"){
+ describe("payload size limit") {
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
- fold({ fail(it.message) }, ::identity) as WireFrame
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
+ fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
+
+private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
+ this as? PayloadWireFrameMessage
+ ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
+
+
+private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
+ decoded.getEndOfTransmissionMessageOrFail()
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")