aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-domain/src/main/kotlin
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-09-21 10:14:03 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2018-09-24 08:22:29 +0200
commite880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (patch)
tree256bd77a86bf86fce96979643a9fe5fcc0318aba /hv-collector-domain/src/main/kotlin
parent7333951cfec6b79a92b12e70cf679bff2f01825a (diff)
Remove end-of-transmission message from protocol
Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-domain/src/main/kotlin')
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt54
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt10
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt46
3 files changed, 33 insertions, 77 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index c61ab266..4f867f13 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,8 +24,8 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R
*/
class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- fun encode(frame: PayloadWireFrameMessage): ByteBuf {
- val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
-
- bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
- bb.writeByte(frame.versionMajor.toInt())
- bb.writeByte(frame.versionMinor.toInt())
- bb.writeZero(RESERVED_BYTE_COUNT)
- bb.writeByte(frame.payloadTypeRaw.toInt())
- bb.writeInt(frame.payloadSize)
- frame.payload.writeTo(bb)
-
- return bb
- }
+ fun encode(frame: WireFrameMessage): ByteBuf = allocator
+ .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size())
+ .run {
+ writeByte(WireFrameMessage.MARKER_BYTE.toInt())
+ writeByte(frame.versionMajor.toInt())
+ writeByte(frame.versionMinor.toInt())
+ writeZero(RESERVED_BYTE_COUNT)
+ writeByte(frame.payloadType.toInt())
+ writeInt(frame.payloadSize)
+ }
+ .also {
+ frame.payload.writeTo(it)
+ }
}
/**
@@ -57,36 +57,20 @@ class WireFrameDecoder {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
- isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
else -> parseWireFrame(byteBuf)
}
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
-
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
-
- private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
- byteBuf.markReaderIndex()
- val byte = byteBuf.readUnsignedByte()
-
- return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
- Right(EndOfTransmissionMessage)
- } else {
- byteBuf.resetReaderIndex()
- Left(MissingWireFrameHeaderBytes)
- }
- }
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE
private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
byteBuf.markReaderIndex()
val mark = byteBuf.readUnsignedByte()
return when (mark) {
- EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
- PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
else -> {
byteBuf.resetReaderIndex()
Left(InvalidWireFrameMarker(mark))
@@ -94,7 +78,7 @@ class WireFrameDecoder {
}
}
- private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
@@ -113,7 +97,7 @@ class WireFrameDecoder {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
+ return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index d82bb25f..dfadc5b8 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
"Invalid start of frame. Expected 0x%02X, but was 0x%02X"
- .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+ .format(WireFrameMessage.MARKER_BYTE, actualMarker)
)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
-
-
-// Other
-
-class UnknownWireFrameTypeException(frame: WireFrameMessage)
- : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index 642179e1..06ca9383 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -20,10 +20,8 @@
package org.onap.dcae.collectors.veshv.domain
-sealed class WireFrameMessage
-
/**
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian).
*
* ```
* -- Precedes every HV-VES message
@@ -31,21 +29,22 @@ sealed class WireFrameMessage
* magic INTEGER (0..255), – always 0xFF, identifies extended header usage
* versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v
* versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v
- * reserved BIT STRING (SIZE (16)), – reserved for future use
- * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf
- * messageLength INTEGER (0..4294967295) – message payload length
+ * reserved OCTET STRING (SIZE (3)), – reserved for future use
+ * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf
+ * payloadLength INTEGER (0..4294967295) – message payload length
+ * payload OCTET STRING – length as per payloadLength
* }
* ```
*
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class PayloadWireFrameMessage(val payload: ByteData,
- val versionMajor: Short,
- val versionMinor: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int
-) : WireFrameMessage() {
+data class WireFrameMessage(val payload: ByteData,
+ val versionMajor: Short,
+ val versionMinor: Short,
+ val payloadType: Short,
+ val payloadSize: Int
+) {
constructor(payload: ByteArray) : this(
ByteData(payload),
SUPPORTED_VERSION_MAJOR,
@@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData,
fun isValid(): Boolean =
versionMajor == SUPPORTED_VERSION_MAJOR
- && PayloadContentType.isValidHexValue(payloadTypeRaw)
+ && PayloadContentType.isValidHexValue(payloadType)
&& payload.size() == payloadSize
companion object {
@@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData,
const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
-
-
-/**
- * This message type should be used by client to indicate that he has finished sending data to collector.
- *
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
- *
- * ```
- * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination
- * Eot ::= SEQUENCE {
- * magic INTEGER (0..255), – always 0xAA
- * }
- * ```
- *
- * @since July 2018
- */
-
-object EndOfTransmissionMessage : WireFrameMessage() {
- const val MARKER_BYTE: Short = 0xAA
-}
-