summaryrefslogtreecommitdiffstats
path: root/hv-collector-domain
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-domain')
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt54
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt10
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt46
-rw-r--r--hv-collector-domain/src/main/proto/event/VesEvent.proto63
-rw-r--r--hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto14
-rw-r--r--hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto66
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt93
7 files changed, 124 insertions, 222 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index c61ab266..4f867f13 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,8 +24,8 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R
*/
class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- fun encode(frame: PayloadWireFrameMessage): ByteBuf {
- val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
-
- bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
- bb.writeByte(frame.versionMajor.toInt())
- bb.writeByte(frame.versionMinor.toInt())
- bb.writeZero(RESERVED_BYTE_COUNT)
- bb.writeByte(frame.payloadTypeRaw.toInt())
- bb.writeInt(frame.payloadSize)
- frame.payload.writeTo(bb)
-
- return bb
- }
+ fun encode(frame: WireFrameMessage): ByteBuf = allocator
+ .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size())
+ .run {
+ writeByte(WireFrameMessage.MARKER_BYTE.toInt())
+ writeByte(frame.versionMajor.toInt())
+ writeByte(frame.versionMinor.toInt())
+ writeZero(RESERVED_BYTE_COUNT)
+ writeByte(frame.payloadType.toInt())
+ writeInt(frame.payloadSize)
+ }
+ .also {
+ frame.payload.writeTo(it)
+ }
}
/**
@@ -57,36 +57,20 @@ class WireFrameDecoder {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
- isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
else -> parseWireFrame(byteBuf)
}
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
-
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
-
- private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
- byteBuf.markReaderIndex()
- val byte = byteBuf.readUnsignedByte()
-
- return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
- Right(EndOfTransmissionMessage)
- } else {
- byteBuf.resetReaderIndex()
- Left(MissingWireFrameHeaderBytes)
- }
- }
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE
private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
byteBuf.markReaderIndex()
val mark = byteBuf.readUnsignedByte()
return when (mark) {
- EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
- PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
else -> {
byteBuf.resetReaderIndex()
Left(InvalidWireFrameMarker(mark))
@@ -94,7 +78,7 @@ class WireFrameDecoder {
}
}
- private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
@@ -113,7 +97,7 @@ class WireFrameDecoder {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
+ return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index d82bb25f..dfadc5b8 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
"Invalid start of frame. Expected 0x%02X, but was 0x%02X"
- .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+ .format(WireFrameMessage.MARKER_BYTE, actualMarker)
)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
-
-
-// Other
-
-class UnknownWireFrameTypeException(frame: WireFrameMessage)
- : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index 642179e1..06ca9383 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -20,10 +20,8 @@
package org.onap.dcae.collectors.veshv.domain
-sealed class WireFrameMessage
-
/**
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian).
*
* ```
* -- Precedes every HV-VES message
@@ -31,21 +29,22 @@ sealed class WireFrameMessage
* magic INTEGER (0..255), – always 0xFF, identifies extended header usage
* versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v
* versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v
- * reserved BIT STRING (SIZE (16)), – reserved for future use
- * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf
- * messageLength INTEGER (0..4294967295) – message payload length
+ * reserved OCTET STRING (SIZE (3)), – reserved for future use
+ * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf
+ * payloadLength INTEGER (0..4294967295) – message payload length
+ * payload OCTET STRING – length as per payloadLength
* }
* ```
*
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class PayloadWireFrameMessage(val payload: ByteData,
- val versionMajor: Short,
- val versionMinor: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int
-) : WireFrameMessage() {
+data class WireFrameMessage(val payload: ByteData,
+ val versionMajor: Short,
+ val versionMinor: Short,
+ val payloadType: Short,
+ val payloadSize: Int
+) {
constructor(payload: ByteArray) : this(
ByteData(payload),
SUPPORTED_VERSION_MAJOR,
@@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData,
fun isValid(): Boolean =
versionMajor == SUPPORTED_VERSION_MAJOR
- && PayloadContentType.isValidHexValue(payloadTypeRaw)
+ && PayloadContentType.isValidHexValue(payloadType)
&& payload.size() == payloadSize
companion object {
@@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData,
const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
-
-
-/**
- * This message type should be used by client to indicate that he has finished sending data to collector.
- *
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
- *
- * ```
- * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination
- * Eot ::= SEQUENCE {
- * magic INTEGER (0..255), – always 0xAA
- * }
- * ```
- *
- * @since July 2018
- */
-
-object EndOfTransmissionMessage : WireFrameMessage() {
- const val MARKER_BYTE: Short = 0xAA
-}
-
diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto
index 54a6d149..0f9e5e1f 100644
--- a/hv-collector-domain/src/main/proto/event/VesEvent.proto
+++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto
@@ -21,54 +21,55 @@ syntax = "proto3";
package org.onap.ves;
message VesEvent {
- CommonEventHeader commonEventHeader = 1; // required
+ CommonEventHeader commonEventHeader=1; // required
- oneof eventFields // required, payload
- {
- // each new high-volume domain can add an entry for its own GPB message
- // the field can be opaque (bytes) to allow decoding the payload in a separate step
- bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields
- }
+ bytes eventFields=2; // required, payload
+ // this field contains a domain-specific GPB message
+ // the field being opaque (bytes), the decoding of the payload occurs in a separate step
+ // the name of the GPB message for domain XYZ is XYZFields
+ // e.g. for domain==HVMEAS, the GPB message is HVMEASFields
}
// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
-// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain
+// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain.
-message CommonEventHeader {
- string version = 1; // required, "version of the gpb common event header"
- string domain = 2; // required, "the eventing domain associated with the event", allowed values:
- // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
- // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+message CommonEventHeader
+{
+ string version = 1; // required, "version of the gpb common event header"
+ string domain = 2; // required, "the eventing domain associated with the event", allowed values:
+ // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+ // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
- uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
+ uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
- enum Priority {
+ enum Priority
+ {
PRIORITY_NOT_PROVIDED = 0;
HIGH = 1;
MEDIUM = 2;
NORMAL = 3;
LOW = 4;
}
- Priority priority = 4; // required, "processing priority"
+ Priority priority = 4; // required, "processing priority"
- string eventId = 5; // required, "event key that is unique to the event source"
- string eventName = 6; // required, "unique event name"
- string eventType = 7; // "for example - guest05, platform"
+ string eventId = 5; // required, "event key that is unique to the event source"
+ string eventName = 6; // required, "unique event name"
+ string eventType = 7; // "for example - guest05, platform"
- uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
- uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
- string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
- string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
- string nfVendorName = 12; // " Vendor Name providing the nf "
+ string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
+ string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
+ string nfVendorName = 12; // " Vendor Name providing the nf "
- bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
- string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
- bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
- string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
- string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
- string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
+ bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+ string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+ bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+ string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
+ string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
+ string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
- reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+ reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
reserved 100;
}
diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
index 9a8582d5..94b40106 100644
--- a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
+++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
@@ -24,20 +24,14 @@ import "MeasDataCollection.proto"; // for 3GPP PM format
message HVMeasFields
{
string hvMeasFieldsVersion = 1;
- measDataCollection.MeasDataCollection measDataCollection = 2;
- // From 3GPP TS 28.550
+ MeasDataCollection measDataCollection = 2;
+ // Based on 3GPP TS 28.550
// Informative: mapping between similar header fields (format may be different)
- // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader
+ // 3GPP MeasHeader ONAP/VES CommonEventHeader
// senderName sourceName
// senderType nfNamingCode + nfcNamingCode
// vendorName nfVendorName
// collectionBeginTime startEpochMicrosec
// timestamp lastEpochMicrosec
- repeated HashMap eventAddlFlds = 3; // optional per-event data
+ map<string, string> eventAddlFlds = 3; // optional per-event data (name/value HashMap)
}
-
-message HashMap
-{
- string name = 1;
- string value = 2;
-} \ No newline at end of file
diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
index 472dcc43..31f4dfb1 100644
--- a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
+++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
@@ -18,87 +18,65 @@
* ============LICENSE_END=========================================================
*/
syntax = "proto3";
-package measDataCollection;
+package org.onap.ves;
-// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09).
// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.
-// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.
+// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version.
// Differences/additions to 3GPP TS 28.550 are marked with "%%".
-message MeasDataCollection // top-level message
+message MeasDataCollection // top-level message
{
- MeasHeader measHeader = 1;
- repeated MeasData measData = 2; // %%: use a single instance for RTPM
- MeasFooter measFooter = 3;
-}
-
-message MeasHeader
-{
- string streamFormatVersion = 1;
+ // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod).
+ string formatVersion = 1;
string senderName = 2;
string senderType = 3;
string vendorName = 4;
string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)
+ uint32 granularityPeriod = 6; // duration in seconds, %% moved from MeasInfo (single reporting period per event)
+ string measuredEntityUserName = 7; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622
+ string measuredEntityDn = 8; // DN as per 3GPP TS 32.300
+ string measuredEntitySoftwareVersion = 9;
+ repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
+ repeated MeasInfo measInfo = 11;
}
-message MeasData
-{
- string measuredEntityId = 1; // DN as per 3GPP TS 32.300
- string measuredEntityUserName = 2; // network function User Name
- string measuredEntitySoftwareVersion = 3;
- uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event)
- repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
- repeated MeasInfo measInfo = 6;
-}
-
-
message MeasInfo
{
oneof MeasInfoId { // measurement group identifier
- uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
- string measInfoId = 2; // identifier as string (more generic)
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
+ string measInfoId = 2; // identifier as string (more generic)
}
oneof MeasTypes { // measurement identifiers associated with the measurement results
- IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
- SMeasTypes measTypes = 4; // identifiers as strings (more generic)
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)
}
// Needed only because GPB does not support repeated fields directly inside 'oneof'
message IMeasTypes { repeated uint32 iMeasType = 1; }
message SMeasTypes { repeated string measType = 1; }
- string jobIdList = 5;
- repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups
+ string jobId = 5;
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement object
}
message MeasValue
{
oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432
- string measObjInstId = 1; // LDN itself
- uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
+ string measObjInstId = 1; // LDN itself
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
}
repeated MeasResult measResults = 3;
bool suspectFlag = 4;
- repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data
+ map<string, string> measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap)
}
message MeasResult
{
- uint32 p = 1; // Optional index in the MeasTypes array
+ uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes
oneof xValue {
sint64 iValue = 2;
double rValue = 3;
bool isNull = 4;
}
}
-
-message MeasFooter
-{
- string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"
-}
-
-message nameValue // %%: vendor-defined name-value pair
-{
- string name = 1;
- string value = 2;
-} \ No newline at end of file
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index b992d530..988789d2 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -28,7 +28,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -42,8 +42,7 @@ object WireFrameCodecsTest : Spek({
val encoder = WireFrameEncoder()
val decoder = WireFrameDecoder()
- fun createSampleFrame() =
- PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
+ fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
@@ -53,11 +52,11 @@ object WireFrameCodecsTest : Spek({
describe("Wire Frame invariants") {
given("input with unsupported major version") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData.EMPTY,
versionMajor = 100,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 0)
it("should fail validation") {
@@ -66,11 +65,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported minor version") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData.EMPTY,
versionMajor = 1,
versionMinor = 6,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 0)
it("should pass validation") {
@@ -79,11 +78,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported payload type") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData.EMPTY,
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = 0x69,
+ payloadType = 0x69,
payloadSize = 0)
it("should fail validation") {
@@ -92,11 +91,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with too small payload size") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 1)
it("should fail validation") {
@@ -105,11 +104,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with too big payload size") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 8)
it("should fail validation") {
@@ -119,11 +118,11 @@ object WireFrameCodecsTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
it("should pass validation") {
@@ -139,14 +138,18 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
+ val decoded = decoder.decodeFirst(encoded).getMessageOrFail()
- it("should decode version") {
+ it("should decode major version") {
assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor)
}
+ it("should decode minor version") {
+ assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor)
+ }
+
it("should decode payload type") {
- assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw)
+ assertThat(decoded.payloadType).isEqualTo(frame.payloadType)
}
it("should decode payload size") {
@@ -170,14 +173,7 @@ object WireFrameCodecsTest : Spek({
assertBufferIntact(buff)
}
- it("should return end-of-transmission message when given end-of-transmission marker byte") {
- val buff = Unpooled.buffer()
- .writeByte(0xAA)
-
- assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
- }
-
- it("should return error when given any single byte other than end-of-transmission marker byte") {
+ it("should return error when given any single byte other than marker byte") {
val buff = Unpooled.buffer()
.writeByte(0xEE)
@@ -194,7 +190,7 @@ object WireFrameCodecsTest : Spek({
assertBufferIntact(buff)
}
- it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
+ it("should return error when length looks ok but first byte is not 0xFF") {
val buff = Unpooled.buffer()
.writeByte(0x69)
.writeBytes("some garbage".toByteArray())
@@ -203,14 +199,6 @@ object WireFrameCodecsTest : Spek({
assertBufferIntact(buff)
}
- it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
- val buff = Unpooled.buffer()
- .writeByte(0xAA)
- .writeBytes("some garbage".toByteArray())
-
- assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
- }
-
it("should return error when payload doesn't fit") {
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
@@ -223,8 +211,8 @@ object WireFrameCodecsTest : Spek({
it("should decode payload message leaving rest unread") {
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
- .writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+ .writeByte(0xAB)
+ val decoded = decoder.decodeFirst(buff).getMessageOrFail()
assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
assertThat(buff.readableBytes()).isEqualTo(1)
@@ -236,11 +224,11 @@ object WireFrameCodecsTest : Spek({
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
@@ -250,11 +238,11 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
val buff = encoder.encode(input)
@@ -266,11 +254,11 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
@@ -289,21 +277,6 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
- fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
-
-private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
- this as? PayloadWireFrameMessage
- ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
-
-
-private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
- decoded.getEndOfTransmissionMessageOrFail()
-}
-
-private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
- fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getMessageOrFail(): WireFrameMessage =
+ fold({ fail(it.message) }, { it })
-private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
- this as? EndOfTransmissionMessage
- ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")