diff options
author | 2018-09-21 10:14:03 +0200 | |
---|---|---|
committer | 2018-09-24 08:22:29 +0200 | |
commit | e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (patch) | |
tree | 256bd77a86bf86fce96979643a9fe5fcc0318aba /hv-collector-domain/src/main/kotlin | |
parent | 7333951cfec6b79a92b12e70cf679bff2f01825a (diff) |
Remove end-of-transmission message from protocol
Also update protobuf files definitions to latest version.
Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea
Issue-ID: DCAEGEN2-775
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-domain/src/main/kotlin')
3 files changed, 33 insertions, 77 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index c61ab266..4f867f13 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,8 +24,8 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R */ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - fun encode(frame: PayloadWireFrameMessage): ByteBuf { - val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - - bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) - bb.writeByte(frame.versionMajor.toInt()) - bb.writeByte(frame.versionMinor.toInt()) - bb.writeZero(RESERVED_BYTE_COUNT) - bb.writeByte(frame.payloadTypeRaw.toInt()) - bb.writeInt(frame.payloadSize) - frame.payload.writeTo(bb) - - return bb - } + fun encode(frame: WireFrameMessage): ByteBuf = allocator + .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size()) + .run { + writeByte(WireFrameMessage.MARKER_BYTE.toInt()) + writeByte(frame.versionMajor.toInt()) + writeByte(frame.versionMinor.toInt()) + writeZero(RESERVED_BYTE_COUNT) + writeByte(frame.payloadType.toInt()) + writeInt(frame.payloadSize) + } + .also { + frame.payload.writeTo(it) + } } /** @@ -57,36 +57,20 @@ class WireFrameDecoder { fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) - isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) else -> parseWireFrame(byteBuf) } private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 - - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE - - private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> { - byteBuf.markReaderIndex() - val byte = byteBuf.readUnsignedByte() - - return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { - Right(EndOfTransmissionMessage) - } else { - byteBuf.resetReaderIndex() - Left(MissingWireFrameHeaderBytes) - } - } + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { byteBuf.markReaderIndex() val mark = byteBuf.readUnsignedByte() return when (mark) { - EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) - PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) else -> { byteBuf.resetReaderIndex() Left(InvalidWireFrameMarker(mark)) @@ -94,7 +78,7 @@ class WireFrameDecoder { } } - private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> { + private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { val versionMajor = byteBuf.readUnsignedByte() val versionMinor = byteBuf.readUnsignedByte() byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved @@ -113,7 +97,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) + return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index d82bb25f..dfadc5b8 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( "Invalid start of frame. Expected 0x%02X, but was 0x%02X" - .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) + .format(WireFrameMessage.MARKER_BYTE, actualMarker) ) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") - - -// Other - -class UnknownWireFrameTypeException(frame: WireFrameMessage) - : Throwable("Unexpected wire frame message type: ${frame.javaClass}") diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index 642179e1..06ca9383 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -20,10 +20,8 @@ package org.onap.dcae.collectors.veshv.domain -sealed class WireFrameMessage - /** - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian). * * ``` * -- Precedes every HV-VES message @@ -31,21 +29,22 @@ sealed class WireFrameMessage * magic INTEGER (0..255), – always 0xFF, identifies extended header usage * versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v * versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v - * reserved BIT STRING (SIZE (16)), – reserved for future use - * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf - * messageLength INTEGER (0..4294967295) – message payload length + * reserved OCTET STRING (SIZE (3)), – reserved for future use + * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf + * payloadLength INTEGER (0..4294967295) – message payload length + * payload OCTET STRING – length as per payloadLength * } * ``` * * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class PayloadWireFrameMessage(val payload: ByteData, - val versionMajor: Short, - val versionMinor: Short, - val payloadTypeRaw: Short, - val payloadSize: Int -) : WireFrameMessage() { +data class WireFrameMessage(val payload: ByteData, + val versionMajor: Short, + val versionMinor: Short, + val payloadType: Short, + val payloadSize: Int +) { constructor(payload: ByteArray) : this( ByteData(payload), SUPPORTED_VERSION_MAJOR, @@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData, fun isValid(): Boolean = versionMajor == SUPPORTED_VERSION_MAJOR - && PayloadContentType.isValidHexValue(payloadTypeRaw) + && PayloadContentType.isValidHexValue(payloadType) && payload.size() == payloadSize companion object { @@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData, const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } - - -/** - * This message type should be used by client to indicate that he has finished sending data to collector. - * - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). - * - * ``` - * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination - * Eot ::= SEQUENCE { - * magic INTEGER (0..255), – always 0xAA - * } - * ``` - * - * @since July 2018 - */ - -object EndOfTransmissionMessage : WireFrameMessage() { - const val MARKER_BYTE: Short = 0xAA -} - |