From 56c5808a172a1ec8d8f82aafbed67fc020df9fac Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 21 Aug 2018 14:46:40 +0200 Subject: Align with latest proposal of wire proto frame Change-Id: I8f989a3913f1592b4d740a80ed30b01bc3aceff2 Issue-ID: DCAEGEN2-722 Signed-off-by: Piotr Jaszczyk --- .../collectors/veshv/domain/WireFrameMessages.kt | 107 --------------------- .../org/onap/dcae/collectors/veshv/domain/codec.kt | 12 ++- .../dcae/collectors/veshv/domain/wire_frame.kt | 97 +++++++++++++++++++ .../collectors/veshv/domain/WireFrameCodecsTest.kt | 41 ++++++-- .../src/main/kotlin/messages.kt | 9 +- .../message/generator/impl/MessageGeneratorImpl.kt | 1 + .../impl/impl/MessageGeneratorImplTest.kt | 2 +- 7 files changed, 144 insertions(+), 125 deletions(-) delete mode 100644 hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt create mode 100644 hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt deleted file mode 100644 index 4811a2b4..00000000 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.domain - - -sealed class WireFrameMessage - -/** - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). - * - * ``` - * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┐ - * │octet│ 0 │ 1 │ 2 │ - * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤ - * │ bit │ 0│ │ │ │ │ │ │ │ 8│ │ │ │ │ │ │ │16│ │ │ │ │ │ │ │ ... - * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┤ - * │field│ 0xFF │ version │ payload content type │ - * └─────┴───────────────────────┴───────────────────────┴───────────────────────┘ - * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┬───────────────────────┐ - * │octet│ 3 │ 4 │ 5 │ 6 │ - * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤ - * ... │ bit │24│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ... - * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┤ - * │field│ payload size │ - * └─────┴───────────────────────────────────────────────────────────────────────────────────────────────┘ - * ┌─────┬─────────────────────── - * │octet│ 7 ... - * ├─────┼──┬──┬──┬──┬──┬──┬──┬── - * ... │ bit │56│ │ │ │ │ │ │... - * ├─────┼──┴──┴──┴──┴──┴──┴──┴── - * │field│ protobuf payload - * └─────┴─────────────────────── - * ``` - * - * @author Piotr Jaszczyk - * @since May 2018 - */ -data class PayloadWireFrameMessage(val payload: ByteData, - val version: Short, - val payloadTypeRaw: Short, - val payloadSize: Int -) : WireFrameMessage() { - - constructor(payload: ByteArray) : this( - ByteData(payload), - SUPPORTED_VERSION, - PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, - payload.size) - - fun isValid(): Boolean = - version == SUPPORTED_VERSION - && PayloadContentType.isValidHexValue(payloadTypeRaw) - && payload.size() == payloadSize - - companion object { - const val MARKER_BYTE: Short = 0xFF - - const val SUPPORTED_VERSION: Short = 1 - - const val HEADER_SIZE = - 3 * java.lang.Byte.BYTES + - 1 * java.lang.Integer.BYTES - - const val MAX_PAYLOAD_SIZE = 1024 * 1024 - } -} - - -/** - * This message type should be used by client to indicate that he has finished sending data to collector. - * - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). - * - * ``` - * ┌─────┬───────────────────────┐ - * │octet│ 0 │ - * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤ - * │ bit │ 0│ │ │ │ │ │ │ │ - * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤ - * │field│ 0xAA │ - * └─────┴───────────────────────┘ - * ``` - * - * @since July 2018 - */ - -object EndOfTransmissionMessage : WireFrameMessage() { - const val MARKER_BYTE: Short = 0xAA -} - diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index cbc18fd0..b2e42509 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -25,6 +25,7 @@ import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT /** * @author Piotr Jaszczyk @@ -36,7 +37,9 @@ class WireFrameEncoder(private val allocator: ByteBufAllocator) { val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) - bb.writeByte(frame.version.toInt()) + bb.writeByte(frame.versionMajor.toInt()) + bb.writeByte(frame.versionMinor.toInt()) + bb.writeZero(RESERVED_BYTE_COUNT) bb.writeByte(frame.payloadTypeRaw.toInt()) bb.writeInt(frame.payloadSize) frame.payload.writeTo(bb) @@ -92,9 +95,10 @@ class WireFrameDecoder { } private fun parsePayloadFrame(byteBuf: ByteBuf): Either { - val version = byteBuf.readUnsignedByte() + val versionMajor = byteBuf.readUnsignedByte() + val versionMinor = byteBuf.readUnsignedByte() + byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved val payloadTypeRaw = byteBuf.readUnsignedByte() - val payloadSize = byteBuf.readInt() if (payloadSize > MAX_PAYLOAD_SIZE) { @@ -109,7 +113,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize)) + return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt new file mode 100644 index 00000000..642179e1 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -0,0 +1,97 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + + +sealed class WireFrameMessage + +/** + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * -- Precedes every HV-VES message + * Header ::= SEQUENCE { + * magic INTEGER (0..255), – always 0xFF, identifies extended header usage + * versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v + * versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v + * reserved BIT STRING (SIZE (16)), – reserved for future use + * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf + * messageLength INTEGER (0..4294967295) – message payload length + * } + * ``` + * + * @author Piotr Jaszczyk + * @since May 2018 + */ +data class PayloadWireFrameMessage(val payload: ByteData, + val versionMajor: Short, + val versionMinor: Short, + val payloadTypeRaw: Short, + val payloadSize: Int +) : WireFrameMessage() { + constructor(payload: ByteArray) : this( + ByteData(payload), + SUPPORTED_VERSION_MAJOR, + SUPPORTED_VERSION_MINOR, + PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payload.size) + + fun isValid(): Boolean = + versionMajor == SUPPORTED_VERSION_MAJOR + && PayloadContentType.isValidHexValue(payloadTypeRaw) + && payload.size() == payloadSize + + companion object { + const val MARKER_BYTE: Short = 0xFF + const val RESERVED_BYTE_COUNT: Int = 3 + + const val SUPPORTED_VERSION_MAJOR: Short = 1 + const val SUPPORTED_VERSION_MINOR: Short = 0 + + const val HEADER_SIZE = + 1 * java.lang.Byte.BYTES + // marker + 3 * java.lang.Byte.BYTES + // single byte fields + RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes + 1 * java.lang.Integer.BYTES // payload length + + const val MAX_PAYLOAD_SIZE = 1024 * 1024 + } +} + + +/** + * This message type should be used by client to indicate that he has finished sending data to collector. + * + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination + * Eot ::= SEQUENCE { + * magic INTEGER (0..255), – always 0xAA + * } + * ``` + * + * @since July 2018 + */ + +object EndOfTransmissionMessage : WireFrameMessage() { + const val MARKER_BYTE: Short = 0xAA +} + diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index a1395266..89d1f32e 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -53,10 +53,11 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { - given("input with unsupported version") { + given("input with unsupported major version") { val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, - version = 100, + versionMajor = 100, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 0) @@ -65,10 +66,24 @@ object WireFrameCodecsTest : Spek({ } } + given("input with unsupported minor version") { + val input = PayloadWireFrameMessage( + payload = ByteData.EMPTY, + versionMajor = 1, + versionMinor = 6, + payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = 0) + + it("should pass validation") { + assertThat(input.isValid()).isTrue() + } + } + given("input with unsupported payload type") { val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = 0x69, payloadSize = 0) @@ -80,7 +95,8 @@ object WireFrameCodecsTest : Spek({ given("input with too small payload size") { val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 1) @@ -92,7 +108,8 @@ object WireFrameCodecsTest : Spek({ given("input with too big payload size") { val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 8) @@ -105,7 +122,8 @@ object WireFrameCodecsTest : Spek({ val payload = byteArrayOf(6, 9, 8, 6) val input = PayloadWireFrameMessage( payload = ByteData(payload), - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) @@ -125,7 +143,7 @@ object WireFrameCodecsTest : Spek({ val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() it("should decode version") { - assertThat(decoded.version).isEqualTo(frame.version) + assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor) } it("should decode payload type") { @@ -221,7 +239,8 @@ object WireFrameCodecsTest : Spek({ val payload = ByteArray(MAX_PAYLOAD_SIZE) val input = PayloadWireFrameMessage( payload = ByteData(payload), - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) @@ -234,7 +253,8 @@ object WireFrameCodecsTest : Spek({ val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) val input = PayloadWireFrameMessage( payload = ByteData(payload), - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) val buff = encoder.encode(input) @@ -249,7 +269,8 @@ object WireFrameCodecsTest : Spek({ val payload = ByteArray(MAX_PAYLOAD_SIZE) val input = PayloadWireFrameMessage( payload = ByteData(payload), - version = 1, + versionMajor = 1, + versionMinor = 0, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) diff --git a/hv-collector-test-utils/src/main/kotlin/messages.kt b/hv-collector-test-utils/src/main/kotlin/messages.kt index f8453c64..c6aa89b2 100644 --- a/hv-collector-test-utils/src/main/kotlin/messages.kt +++ b/hv-collector-test-utils/src/main/kotlin/messages.kt @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import java.util.UUID.randomUUID @@ -31,9 +32,11 @@ import java.util.UUID.randomUUID val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT private fun ByteBuf.writeValidWireFrameHeaders() { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB + writeByte(0xFF) // always 0xFF + writeByte(0x01) // major version + writeByte(0x00) // minor version + writeZero(RESERVED_BYTE_COUNT) // reserved + writeByte(0x01) // content type = GPB } fun vesWireFrameMessage(domain: Domain = Domain.OTHER, diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index c6e0556b..fec2609e 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -71,6 +71,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa PayloadWireFrameMessage( payload, UNSUPPORTED_VERSION, + UNSUPPORTED_VERSION, PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payload.size()) } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index 1b121ec4..f13a33bf 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -142,7 +142,7 @@ object MessageGeneratorImplTest : Spek({ assertThat(it.isValid()).isFalse() assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) - assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION) + assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR) } .verifyComplete() } -- cgit 1.2.3-korg