diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-domain/src/main | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain/src/main')
-rw-r--r-- | hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt) | 41 | ||||
-rw-r--r-- | hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt | 50 | ||||
-rw-r--r-- | hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt | 15 |
3 files changed, 82 insertions, 24 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt index db6e1070..4811a2b4 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt @@ -19,6 +19,9 @@ */ package org.onap.dcae.collectors.veshv.domain + +sealed class WireFrameMessage + /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). * @@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class WireFrame(val payload: ByteData, - val version: Short, - val payloadTypeRaw: Short, - val payloadSize: Int) { +data class PayloadWireFrameMessage(val payload: ByteData, + val version: Short, + val payloadTypeRaw: Short, + val payloadSize: Int +) : WireFrameMessage() { constructor(payload: ByteArray) : this( ByteData(payload), @@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData, && payload.size() == payloadSize companion object { + const val MARKER_BYTE: Short = 0xFF + const val SUPPORTED_VERSION: Short = 1 const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + 1 * java.lang.Integer.BYTES - const val MARKER_BYTE: Short = 0xFF + + const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } + + +/** + * This message type should be used by client to indicate that he has finished sending data to collector. + * + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * ┌─────┬───────────────────────┐ + * │octet│ 0 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤ + * │ bit │ 0│ │ │ │ │ │ │ │ + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ 0xAA │ + * └─────┴───────────────────────┘ + * ``` + * + * @since July 2018 + */ + +object EndOfTransmissionMessage : WireFrameMessage() { + const val MARKER_BYTE: Short = 0xAA +} + diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index 39841d6a..ab82dc04 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,6 +24,7 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator */ class WireFrameEncoder(val allocator: ByteBufAllocator) { - fun encode(frame: WireFrame): ByteBuf { - val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size()) + fun encode(frame: PayloadWireFrameMessage): ByteBuf { + val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - bb.writeByte(WireFrame.MARKER_BYTE.toInt()) + bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) bb.writeByte(frame.version.toInt()) bb.writeByte(frame.payloadTypeRaw.toInt()) bb.writeInt(frame.payloadSize) @@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) { */ class WireFrameDecoder { - fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> = + fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) + isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) - else -> parseFrame(byteBuf) + else -> parseWireFrame(byteBuf) } - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE - private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> { + private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 + + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE + + private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> { byteBuf.markReaderIndex() + val byte = byteBuf.readUnsignedByte() - val mark = byteBuf.readUnsignedByte() - if (mark != WireFrame.MARKER_BYTE) { + return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { + Right(EndOfTransmissionMessage) + } else { byteBuf.resetReaderIndex() - return Left(InvalidWireFrameMarker(mark)) + Left(MissingWireFrameHeaderBytes) + } + } + + private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { + byteBuf.markReaderIndex() + + val mark = byteBuf.readUnsignedByte() + return when (mark) { + EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) + PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + else -> { + byteBuf.resetReaderIndex() + Left(InvalidWireFrameMarker(mark)) + } } + } + private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> { val version = byteBuf.readUnsignedByte() val payloadTypeRaw = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() if (payloadSize > MAX_PAYLOAD_SIZE) { + byteBuf.resetReaderIndex() return Left(PayloadSizeExceeded) } @@ -86,10 +109,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize)) - } + return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize)) - companion object { - const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index 626bf329..d82bb25f 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String) sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) -class InvalidWireFrameMarker(actualMarker: Short) - : InvalidWireFrame( - "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) +class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X" + .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) +) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") + + +// Other + +class UnknownWireFrameTypeException(frame: WireFrameMessage) + : Throwable("Unexpected wire frame message type: ${frame.javaClass}") |