From e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 21 Sep 2018 10:14:03 +0200 Subject: Remove end-of-transmission message from protocol Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka --- .../org/onap/dcae/collectors/veshv/domain/codec.kt | 54 +++++++----------- .../onap/dcae/collectors/veshv/domain/errors.kt | 10 +--- .../dcae/collectors/veshv/domain/wire_frame.kt | 46 ++++----------- .../src/main/proto/event/VesEvent.proto | 63 +++++++++++---------- .../src/main/proto/measurements/HVMeasFields.proto | 14 ++--- .../proto/measurements/MeasDataCollection.proto | 66 ++++++++-------------- 6 files changed, 91 insertions(+), 162 deletions(-) (limited to 'hv-collector-domain/src/main') diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index c61ab266..4f867f13 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,8 +24,8 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT /** * @author Piotr Jaszczyk @@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R */ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - fun encode(frame: PayloadWireFrameMessage): ByteBuf { - val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - - bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) - bb.writeByte(frame.versionMajor.toInt()) - bb.writeByte(frame.versionMinor.toInt()) - bb.writeZero(RESERVED_BYTE_COUNT) - bb.writeByte(frame.payloadTypeRaw.toInt()) - bb.writeInt(frame.payloadSize) - frame.payload.writeTo(bb) - - return bb - } + fun encode(frame: WireFrameMessage): ByteBuf = allocator + .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size()) + .run { + writeByte(WireFrameMessage.MARKER_BYTE.toInt()) + writeByte(frame.versionMajor.toInt()) + writeByte(frame.versionMinor.toInt()) + writeZero(RESERVED_BYTE_COUNT) + writeByte(frame.payloadType.toInt()) + writeInt(frame.payloadSize) + } + .also { + frame.payload.writeTo(it) + } } /** @@ -57,36 +57,20 @@ class WireFrameDecoder { fun decodeFirst(byteBuf: ByteBuf): Either = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) - isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) else -> parseWireFrame(byteBuf) } private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 - - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE - - private fun lookForEOTFrame(byteBuf: ByteBuf): Either { - byteBuf.markReaderIndex() - val byte = byteBuf.readUnsignedByte() - - return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { - Right(EndOfTransmissionMessage) - } else { - byteBuf.resetReaderIndex() - Left(MissingWireFrameHeaderBytes) - } - } + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE private fun parseWireFrame(byteBuf: ByteBuf): Either { byteBuf.markReaderIndex() val mark = byteBuf.readUnsignedByte() return when (mark) { - EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) - PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) else -> { byteBuf.resetReaderIndex() Left(InvalidWireFrameMarker(mark)) @@ -94,7 +78,7 @@ class WireFrameDecoder { } } - private fun parsePayloadFrame(byteBuf: ByteBuf): Either { + private fun parsePayloadFrame(byteBuf: ByteBuf): Either { val versionMajor = byteBuf.readUnsignedByte() val versionMinor = byteBuf.readUnsignedByte() byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved @@ -113,7 +97,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) + return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index d82bb25f..dfadc5b8 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk @@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( "Invalid start of frame. Expected 0x%02X, but was 0x%02X" - .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) + .format(WireFrameMessage.MARKER_BYTE, actualMarker) ) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") - - -// Other - -class UnknownWireFrameTypeException(frame: WireFrameMessage) - : Throwable("Unexpected wire frame message type: ${frame.javaClass}") diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index 642179e1..06ca9383 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -20,10 +20,8 @@ package org.onap.dcae.collectors.veshv.domain -sealed class WireFrameMessage - /** - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian). * * ``` * -- Precedes every HV-VES message @@ -31,21 +29,22 @@ sealed class WireFrameMessage * magic INTEGER (0..255), – always 0xFF, identifies extended header usage * versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v * versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v - * reserved BIT STRING (SIZE (16)), – reserved for future use - * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf - * messageLength INTEGER (0..4294967295) – message payload length + * reserved OCTET STRING (SIZE (3)), – reserved for future use + * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf + * payloadLength INTEGER (0..4294967295) – message payload length + * payload OCTET STRING – length as per payloadLength * } * ``` * * @author Piotr Jaszczyk * @since May 2018 */ -data class PayloadWireFrameMessage(val payload: ByteData, - val versionMajor: Short, - val versionMinor: Short, - val payloadTypeRaw: Short, - val payloadSize: Int -) : WireFrameMessage() { +data class WireFrameMessage(val payload: ByteData, + val versionMajor: Short, + val versionMinor: Short, + val payloadType: Short, + val payloadSize: Int +) { constructor(payload: ByteArray) : this( ByteData(payload), SUPPORTED_VERSION_MAJOR, @@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData, fun isValid(): Boolean = versionMajor == SUPPORTED_VERSION_MAJOR - && PayloadContentType.isValidHexValue(payloadTypeRaw) + && PayloadContentType.isValidHexValue(payloadType) && payload.size() == payloadSize companion object { @@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData, const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } - - -/** - * This message type should be used by client to indicate that he has finished sending data to collector. - * - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). - * - * ``` - * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination - * Eot ::= SEQUENCE { - * magic INTEGER (0..255), – always 0xAA - * } - * ``` - * - * @since July 2018 - */ - -object EndOfTransmissionMessage : WireFrameMessage() { - const val MARKER_BYTE: Short = 0xAA -} - diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto index 54a6d149..0f9e5e1f 100644 --- a/hv-collector-domain/src/main/proto/event/VesEvent.proto +++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto @@ -21,54 +21,55 @@ syntax = "proto3"; package org.onap.ves; message VesEvent { - CommonEventHeader commonEventHeader = 1; // required + CommonEventHeader commonEventHeader=1; // required - oneof eventFields // required, payload - { - // each new high-volume domain can add an entry for its own GPB message - // the field can be opaque (bytes) to allow decoding the payload in a separate step - bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields - } + bytes eventFields=2; // required, payload + // this field contains a domain-specific GPB message + // the field being opaque (bytes), the decoding of the payload occurs in a separate step + // the name of the GPB message for domain XYZ is XYZFields + // e.g. for domain==HVMEAS, the GPB message is HVMEASFields } // VES CommonEventHeader adapted to GPB (Google Protocol Buffers) -// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain +// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain. -message CommonEventHeader { - string version = 1; // required, "version of the gpb common event header" - string domain = 2; // required, "the eventing domain associated with the event", allowed values: - // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, - // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS +message CommonEventHeader +{ + string version = 1; // required, "version of the gpb common event header" + string domain = 2; // required, "the eventing domain associated with the event", allowed values: + // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, + // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS - uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" + uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" - enum Priority { + enum Priority + { PRIORITY_NOT_PROVIDED = 0; HIGH = 1; MEDIUM = 2; NORMAL = 3; LOW = 4; } - Priority priority = 4; // required, "processing priority" + Priority priority = 4; // required, "processing priority" - string eventId = 5; // required, "event key that is unique to the event source" - string eventName = 6; // required, "unique event name" - string eventType = 7; // "for example - guest05, platform" + string eventId = 5; // required, "event key that is unique to the event source" + string eventName = 6; // required, "unique event name" + string eventType = 7; // "for example - guest05, platform" - uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" - string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" - string nfVendorName = 12; // " Vendor Name providing the nf " + string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" + string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" + string nfVendorName = 12; // " Vendor Name providing the nf " - bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" - string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" - bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" - string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" - string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" - string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" + bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" + string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" + bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" + string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" + string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" + string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" - reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" + reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" reserved 100; } diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto index 9a8582d5..94b40106 100644 --- a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto +++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto @@ -24,20 +24,14 @@ import "MeasDataCollection.proto"; // for 3GPP PM format message HVMeasFields { string hvMeasFieldsVersion = 1; - measDataCollection.MeasDataCollection measDataCollection = 2; - // From 3GPP TS 28.550 + MeasDataCollection measDataCollection = 2; + // Based on 3GPP TS 28.550 // Informative: mapping between similar header fields (format may be different) - // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader + // 3GPP MeasHeader ONAP/VES CommonEventHeader // senderName sourceName // senderType nfNamingCode + nfcNamingCode // vendorName nfVendorName // collectionBeginTime startEpochMicrosec // timestamp lastEpochMicrosec - repeated HashMap eventAddlFlds = 3; // optional per-event data + map eventAddlFlds = 3; // optional per-event data (name/value HashMap) } - -message HashMap -{ - string name = 1; - string value = 2; -} \ No newline at end of file diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto index 472dcc43..31f4dfb1 100644 --- a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto +++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto @@ -18,87 +18,65 @@ * ============LICENSE_END========================================================= */ syntax = "proto3"; -package measDataCollection; +package org.onap.ves; -// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08). +// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09). // Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file. -// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version. +// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version. // Differences/additions to 3GPP TS 28.550 are marked with "%%". -message MeasDataCollection // top-level message +message MeasDataCollection // top-level message { - MeasHeader measHeader = 1; - repeated MeasData measData = 2; // %%: use a single instance for RTPM - MeasFooter measFooter = 3; -} - -message MeasHeader -{ - string streamFormatVersion = 1; + // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod). + string formatVersion = 1; string senderName = 2; string senderType = 3; string vendorName = 4; string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format) + uint32 granularityPeriod = 6; // duration in seconds, %% moved from MeasInfo (single reporting period per event) + string measuredEntityUserName = 7; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622 + string measuredEntityDn = 8; // DN as per 3GPP TS 32.300 + string measuredEntitySoftwareVersion = 9; + repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432 + repeated MeasInfo measInfo = 11; } -message MeasData -{ - string measuredEntityId = 1; // DN as per 3GPP TS 32.300 - string measuredEntityUserName = 2; // network function User Name - string measuredEntitySoftwareVersion = 3; - uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event) - repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432 - repeated MeasInfo measInfo = 6; -} - - message MeasInfo { oneof MeasInfoId { // measurement group identifier - uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact) - string measInfoId = 2; // identifier as string (more generic) + uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact) + string measInfoId = 2; // identifier as string (more generic) } oneof MeasTypes { // measurement identifiers associated with the measurement results - IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact) - SMeasTypes measTypes = 4; // identifiers as strings (more generic) + IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact) + SMeasTypes measTypes = 4; // identifiers as strings (more generic) } // Needed only because GPB does not support repeated fields directly inside 'oneof' message IMeasTypes { repeated uint32 iMeasType = 1; } message SMeasTypes { repeated string measType = 1; } - string jobIdList = 5; - repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups + string jobId = 5; + repeated MeasValue measValues = 6; // performance measurements grouped by measurement object } message MeasValue { oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432 - string measObjInstId = 1; // LDN itself - uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList + string measObjInstId = 1; // LDN itself + uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList } repeated MeasResult measResults = 3; bool suspectFlag = 4; - repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data + map measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap) } message MeasResult { - uint32 p = 1; // Optional index in the MeasTypes array + uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes oneof xValue { sint64 iValue = 2; double rValue = 3; bool isNull = 4; } } - -message MeasFooter -{ - string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime" -} - -message nameValue // %%: vendor-defined name-value pair -{ - string name = 1; - string value = 2; -} \ No newline at end of file -- cgit 1.2.3-korg