From e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 21 Sep 2018 10:14:03 +0200 Subject: Remove end-of-transmission message from protocol Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka --- .../org/onap/dcae/collectors/veshv/domain/codec.kt | 54 +++++-------- .../onap/dcae/collectors/veshv/domain/errors.kt | 10 +-- .../dcae/collectors/veshv/domain/wire_frame.kt | 46 +++-------- .../src/main/proto/event/VesEvent.proto | 63 +++++++-------- .../src/main/proto/measurements/HVMeasFields.proto | 14 +--- .../proto/measurements/MeasDataCollection.proto | 66 +++++---------- .../collectors/veshv/domain/WireFrameCodecsTest.kt | 93 ++++++++-------------- 7 files changed, 124 insertions(+), 222 deletions(-) (limited to 'hv-collector-domain') diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index c61ab266..4f867f13 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,8 +24,8 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT /** * @author Piotr Jaszczyk @@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R */ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - fun encode(frame: PayloadWireFrameMessage): ByteBuf { - val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - - bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) - bb.writeByte(frame.versionMajor.toInt()) - bb.writeByte(frame.versionMinor.toInt()) - bb.writeZero(RESERVED_BYTE_COUNT) - bb.writeByte(frame.payloadTypeRaw.toInt()) - bb.writeInt(frame.payloadSize) - frame.payload.writeTo(bb) - - return bb - } + fun encode(frame: WireFrameMessage): ByteBuf = allocator + .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size()) + .run { + writeByte(WireFrameMessage.MARKER_BYTE.toInt()) + writeByte(frame.versionMajor.toInt()) + writeByte(frame.versionMinor.toInt()) + writeZero(RESERVED_BYTE_COUNT) + writeByte(frame.payloadType.toInt()) + writeInt(frame.payloadSize) + } + .also { + frame.payload.writeTo(it) + } } /** @@ -57,36 +57,20 @@ class WireFrameDecoder { fun decodeFirst(byteBuf: ByteBuf): Either = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) - isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) else -> parseWireFrame(byteBuf) } private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 - - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE - - private fun lookForEOTFrame(byteBuf: ByteBuf): Either { - byteBuf.markReaderIndex() - val byte = byteBuf.readUnsignedByte() - - return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { - Right(EndOfTransmissionMessage) - } else { - byteBuf.resetReaderIndex() - Left(MissingWireFrameHeaderBytes) - } - } + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE private fun parseWireFrame(byteBuf: ByteBuf): Either { byteBuf.markReaderIndex() val mark = byteBuf.readUnsignedByte() return when (mark) { - EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) - PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) else -> { byteBuf.resetReaderIndex() Left(InvalidWireFrameMarker(mark)) @@ -94,7 +78,7 @@ class WireFrameDecoder { } } - private fun parsePayloadFrame(byteBuf: ByteBuf): Either { + private fun parsePayloadFrame(byteBuf: ByteBuf): Either { val versionMajor = byteBuf.readUnsignedByte() val versionMinor = byteBuf.readUnsignedByte() byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved @@ -113,7 +97,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) + return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index d82bb25f..dfadc5b8 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk @@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( "Invalid start of frame. Expected 0x%02X, but was 0x%02X" - .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) + .format(WireFrameMessage.MARKER_BYTE, actualMarker) ) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") - - -// Other - -class UnknownWireFrameTypeException(frame: WireFrameMessage) - : Throwable("Unexpected wire frame message type: ${frame.javaClass}") diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index 642179e1..06ca9383 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -20,10 +20,8 @@ package org.onap.dcae.collectors.veshv.domain -sealed class WireFrameMessage - /** - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian). * * ``` * -- Precedes every HV-VES message @@ -31,21 +29,22 @@ sealed class WireFrameMessage * magic INTEGER (0..255), – always 0xFF, identifies extended header usage * versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v * versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v - * reserved BIT STRING (SIZE (16)), – reserved for future use - * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf - * messageLength INTEGER (0..4294967295) – message payload length + * reserved OCTET STRING (SIZE (3)), – reserved for future use + * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf + * payloadLength INTEGER (0..4294967295) – message payload length + * payload OCTET STRING – length as per payloadLength * } * ``` * * @author Piotr Jaszczyk * @since May 2018 */ -data class PayloadWireFrameMessage(val payload: ByteData, - val versionMajor: Short, - val versionMinor: Short, - val payloadTypeRaw: Short, - val payloadSize: Int -) : WireFrameMessage() { +data class WireFrameMessage(val payload: ByteData, + val versionMajor: Short, + val versionMinor: Short, + val payloadType: Short, + val payloadSize: Int +) { constructor(payload: ByteArray) : this( ByteData(payload), SUPPORTED_VERSION_MAJOR, @@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData, fun isValid(): Boolean = versionMajor == SUPPORTED_VERSION_MAJOR - && PayloadContentType.isValidHexValue(payloadTypeRaw) + && PayloadContentType.isValidHexValue(payloadType) && payload.size() == payloadSize companion object { @@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData, const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } - - -/** - * This message type should be used by client to indicate that he has finished sending data to collector. - * - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). - * - * ``` - * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination - * Eot ::= SEQUENCE { - * magic INTEGER (0..255), – always 0xAA - * } - * ``` - * - * @since July 2018 - */ - -object EndOfTransmissionMessage : WireFrameMessage() { - const val MARKER_BYTE: Short = 0xAA -} - diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto index 54a6d149..0f9e5e1f 100644 --- a/hv-collector-domain/src/main/proto/event/VesEvent.proto +++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto @@ -21,54 +21,55 @@ syntax = "proto3"; package org.onap.ves; message VesEvent { - CommonEventHeader commonEventHeader = 1; // required + CommonEventHeader commonEventHeader=1; // required - oneof eventFields // required, payload - { - // each new high-volume domain can add an entry for its own GPB message - // the field can be opaque (bytes) to allow decoding the payload in a separate step - bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields - } + bytes eventFields=2; // required, payload + // this field contains a domain-specific GPB message + // the field being opaque (bytes), the decoding of the payload occurs in a separate step + // the name of the GPB message for domain XYZ is XYZFields + // e.g. for domain==HVMEAS, the GPB message is HVMEASFields } // VES CommonEventHeader adapted to GPB (Google Protocol Buffers) -// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain +// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain. -message CommonEventHeader { - string version = 1; // required, "version of the gpb common event header" - string domain = 2; // required, "the eventing domain associated with the event", allowed values: - // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, - // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS +message CommonEventHeader +{ + string version = 1; // required, "version of the gpb common event header" + string domain = 2; // required, "the eventing domain associated with the event", allowed values: + // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, + // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS - uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" + uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" - enum Priority { + enum Priority + { PRIORITY_NOT_PROVIDED = 0; HIGH = 1; MEDIUM = 2; NORMAL = 3; LOW = 4; } - Priority priority = 4; // required, "processing priority" + Priority priority = 4; // required, "processing priority" - string eventId = 5; // required, "event key that is unique to the event source" - string eventName = 6; // required, "unique event name" - string eventType = 7; // "for example - guest05, platform" + string eventId = 5; // required, "event key that is unique to the event source" + string eventName = 6; // required, "unique event name" + string eventType = 7; // "for example - guest05, platform" - uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" - string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" - string nfVendorName = 12; // " Vendor Name providing the nf " + string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" + string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" + string nfVendorName = 12; // " Vendor Name providing the nf " - bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" - string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" - bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" - string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" - string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" - string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" + bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" + string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" + bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" + string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" + string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" + string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" - reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" + reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" reserved 100; } diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto index 9a8582d5..94b40106 100644 --- a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto +++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto @@ -24,20 +24,14 @@ import "MeasDataCollection.proto"; // for 3GPP PM format message HVMeasFields { string hvMeasFieldsVersion = 1; - measDataCollection.MeasDataCollection measDataCollection = 2; - // From 3GPP TS 28.550 + MeasDataCollection measDataCollection = 2; + // Based on 3GPP TS 28.550 // Informative: mapping between similar header fields (format may be different) - // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader + // 3GPP MeasHeader ONAP/VES CommonEventHeader // senderName sourceName // senderType nfNamingCode + nfcNamingCode // vendorName nfVendorName // collectionBeginTime startEpochMicrosec // timestamp lastEpochMicrosec - repeated HashMap eventAddlFlds = 3; // optional per-event data + map eventAddlFlds = 3; // optional per-event data (name/value HashMap) } - -message HashMap -{ - string name = 1; - string value = 2; -} \ No newline at end of file diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto index 472dcc43..31f4dfb1 100644 --- a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto +++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto @@ -18,87 +18,65 @@ * ============LICENSE_END========================================================= */ syntax = "proto3"; -package measDataCollection; +package org.onap.ves; -// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08). +// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09). // Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file. -// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version. +// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version. // Differences/additions to 3GPP TS 28.550 are marked with "%%". -message MeasDataCollection // top-level message +message MeasDataCollection // top-level message { - MeasHeader measHeader = 1; - repeated MeasData measData = 2; // %%: use a single instance for RTPM - MeasFooter measFooter = 3; -} - -message MeasHeader -{ - string streamFormatVersion = 1; + // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod). + string formatVersion = 1; string senderName = 2; string senderType = 3; string vendorName = 4; string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format) + uint32 granularityPeriod = 6; // duration in seconds, %% moved from MeasInfo (single reporting period per event) + string measuredEntityUserName = 7; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622 + string measuredEntityDn = 8; // DN as per 3GPP TS 32.300 + string measuredEntitySoftwareVersion = 9; + repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432 + repeated MeasInfo measInfo = 11; } -message MeasData -{ - string measuredEntityId = 1; // DN as per 3GPP TS 32.300 - string measuredEntityUserName = 2; // network function User Name - string measuredEntitySoftwareVersion = 3; - uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event) - repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432 - repeated MeasInfo measInfo = 6; -} - - message MeasInfo { oneof MeasInfoId { // measurement group identifier - uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact) - string measInfoId = 2; // identifier as string (more generic) + uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact) + string measInfoId = 2; // identifier as string (more generic) } oneof MeasTypes { // measurement identifiers associated with the measurement results - IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact) - SMeasTypes measTypes = 4; // identifiers as strings (more generic) + IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact) + SMeasTypes measTypes = 4; // identifiers as strings (more generic) } // Needed only because GPB does not support repeated fields directly inside 'oneof' message IMeasTypes { repeated uint32 iMeasType = 1; } message SMeasTypes { repeated string measType = 1; } - string jobIdList = 5; - repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups + string jobId = 5; + repeated MeasValue measValues = 6; // performance measurements grouped by measurement object } message MeasValue { oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432 - string measObjInstId = 1; // LDN itself - uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList + string measObjInstId = 1; // LDN itself + uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList } repeated MeasResult measResults = 3; bool suspectFlag = 4; - repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data + map measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap) } message MeasResult { - uint32 p = 1; // Optional index in the MeasTypes array + uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes oneof xValue { sint64 iValue = 2; double rValue = 3; bool isNull = 4; } } - -message MeasFooter -{ - string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime" -} - -message nameValue // %%: vendor-defined name-value pair -{ - string name = 1; - string value = 2; -} \ No newline at end of file diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index b992d530..988789d2 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -28,7 +28,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -42,8 +42,7 @@ object WireFrameCodecsTest : Spek({ val encoder = WireFrameEncoder() val decoder = WireFrameDecoder() - fun createSampleFrame() = - PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) + fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { @@ -53,11 +52,11 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { given("input with unsupported major version") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData.EMPTY, versionMajor = 100, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 0) it("should fail validation") { @@ -66,11 +65,11 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported minor version") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData.EMPTY, versionMajor = 1, versionMinor = 6, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 0) it("should pass validation") { @@ -79,11 +78,11 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported payload type") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData.EMPTY, versionMajor = 1, versionMinor = 0, - payloadTypeRaw = 0x69, + payloadType = 0x69, payloadSize = 0) it("should fail validation") { @@ -92,11 +91,11 @@ object WireFrameCodecsTest : Spek({ } given("input with too small payload size") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 1) it("should fail validation") { @@ -105,11 +104,11 @@ object WireFrameCodecsTest : Spek({ } given("input with too big payload size") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 8) it("should fail validation") { @@ -119,11 +118,11 @@ object WireFrameCodecsTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) it("should pass validation") { @@ -139,14 +138,18 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() + val decoded = decoder.decodeFirst(encoded).getMessageOrFail() - it("should decode version") { + it("should decode major version") { assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor) } + it("should decode minor version") { + assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor) + } + it("should decode payload type") { - assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw) + assertThat(decoded.payloadType).isEqualTo(frame.payloadType) } it("should decode payload size") { @@ -170,14 +173,7 @@ object WireFrameCodecsTest : Spek({ assertBufferIntact(buff) } - it("should return end-of-transmission message when given end-of-transmission marker byte") { - val buff = Unpooled.buffer() - .writeByte(0xAA) - - assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) - } - - it("should return error when given any single byte other than end-of-transmission marker byte") { + it("should return error when given any single byte other than marker byte") { val buff = Unpooled.buffer() .writeByte(0xEE) @@ -194,7 +190,7 @@ object WireFrameCodecsTest : Spek({ assertBufferIntact(buff) } - it("should return error when length looks ok but first byte is not 0xFF or 0xAA") { + it("should return error when length looks ok but first byte is not 0xFF") { val buff = Unpooled.buffer() .writeByte(0x69) .writeBytes("some garbage".toByteArray()) @@ -203,14 +199,6 @@ object WireFrameCodecsTest : Spek({ assertBufferIntact(buff) } - it("should return end-of-transmission message when length looks ok and first byte is 0xAA") { - val buff = Unpooled.buffer() - .writeByte(0xAA) - .writeBytes("some garbage".toByteArray()) - - assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) - } - it("should return error when payload doesn't fit") { val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) @@ -223,8 +211,8 @@ object WireFrameCodecsTest : Spek({ it("should decode payload message leaving rest unread") { val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) - .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail() + .writeByte(0xAB) + val decoded = decoder.decodeFirst(buff).getMessageOrFail() assertThat(decoded.isValid()).describedAs("should be valid").isTrue() assertThat(buff.readableBytes()).isEqualTo(1) @@ -236,11 +224,11 @@ object WireFrameCodecsTest : Spek({ it("should decode successfully when payload size is equal 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) @@ -250,11 +238,11 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) val buff = encoder.encode(input) @@ -266,11 +254,11 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) @@ -289,21 +277,6 @@ private fun Either.assertFailedWithError(assertj: (ObjectAssert) fold({ assertj(assertThat(it)) }, { fail("Error expected") }) } -private fun Either.getPayloadMessageOrFail(): PayloadWireFrameMessage = - fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() }) - -private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage = - this as? PayloadWireFrameMessage - ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}") - - -private fun assertIsEndOfTransmissionMessage(decoded: Either) { - decoded.getEndOfTransmissionMessageOrFail() -} - -private fun Either.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = - fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() }) +private fun Either.getMessageOrFail(): WireFrameMessage = + fold({ fail(it.message) }, { it }) -private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = - this as? EndOfTransmissionMessage - ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") -- cgit 1.2.3-korg