From 7c3b59560f015b65882a56db585b7d4bdd10d434 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 8 Jun 2018 16:29:31 +0200 Subject: Implement Kafka Sink Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../onap/dcae/collectors/veshv/domain/ByteData.kt | 58 +++++++ .../onap/dcae/collectors/veshv/domain/WireFrame.kt | 75 +-------- .../org/onap/dcae/collectors/veshv/domain/codec.kt | 98 +++++++++++ .../collectors/veshv/domain/WireFrameCodecsTest.kt | 180 ++++++++++++++++++++ .../dcae/collectors/veshv/domain/WireFrameTest.kt | 185 --------------------- 5 files changed, 342 insertions(+), 254 deletions(-) create mode 100644 hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt create mode 100644 hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt create mode 100644 hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt delete mode 100644 hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt (limited to 'hv-collector-domain/src') diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt new file mode 100644 index 00000000..2b84e3f1 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import com.google.protobuf.MessageLite +import io.netty.buffer.ByteBuf +import java.nio.charset.Charset + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class ByteData(private val data: ByteArray) { + + fun size() = data.size + + /** + * This will expose mutable state of the data. + * + * @return wrapped data buffer (NOT a copy) + */ + fun unsafeAsArray() = data + + fun writeTo(byteBuf: ByteBuf) { + byteBuf.writeBytes(data) + } + + fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset) + + companion object { + val EMPTY = ByteData(byteArrayOf()) + + fun readFrom(byteBuf: ByteBuf, length: Int): ByteData { + val dataArray = ByteArray(length) + byteBuf.readBytes(dataArray) + return ByteData(dataArray) + } + } +} + +fun MessageLite.toByteData(): ByteData = ByteData(toByteArray()) diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 8c8b4718..caf13c53 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -19,12 +19,6 @@ */ package org.onap.dcae.collectors.veshv.domain -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException - /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). * @@ -55,82 +49,25 @@ import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesExc * @author Piotr Jaszczyk * @since May 2018 */ -data class WireFrame(val payload: ByteBuf, +data class WireFrame(val payload: ByteData, val majorVersion: Short, val minorVersion: Short, val payloadSize: Int) { - constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes()) + constructor(payload: ByteArray) : this(ByteData(payload), 1, 0, payload.size) fun isValid(): Boolean = majorVersion == SUPPORTED_MAJOR_VERSION - && payload.readableBytes() == payloadSize - - fun encode(allocator: ByteBufAllocator): ByteBuf { - val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes()) - - bb.writeByte(MARKER_BYTE.toInt()) - bb.writeByte(majorVersion.toInt()) - bb.writeByte(minorVersion.toInt()) - bb.writeInt(payloadSize) - bb.writeBytes(payload) - - return bb - } + && payload.size() == payloadSize companion object { - fun decodeFirst(byteBuf: ByteBuf): WireFrame { - verifyNotEmpty(byteBuf) - byteBuf.markReaderIndex() - - verifyMarker(byteBuf) - verifyMinimumSize(byteBuf) - - val majorVersion = byteBuf.readUnsignedByte() - val minorVersion = byteBuf.readUnsignedByte() - val payloadSize = verifyPayloadSize(byteBuf) - - val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize) - byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize) - - return WireFrame(payload, majorVersion, minorVersion, payloadSize) - } - - private fun verifyPayloadSize(byteBuf: ByteBuf): Int = - byteBuf.readInt().let { payloadSize -> - if (byteBuf.readableBytes() < payloadSize) { - byteBuf.resetReaderIndex() - throw MissingWireFrameBytesException("readable bytes < payload size") - } else { - payloadSize - } - } - - private fun verifyMinimumSize(byteBuf: ByteBuf) { - if (byteBuf.readableBytes() < HEADER_SIZE) { - byteBuf.resetReaderIndex() - throw MissingWireFrameBytesException("readable bytes < header size") - } - } - - private fun verifyMarker(byteBuf: ByteBuf) { - val mark = byteBuf.readUnsignedByte() - if (mark != MARKER_BYTE) { - byteBuf.resetReaderIndex() - throw InvalidWireFrameMarkerException(mark) - } - } - - private fun verifyNotEmpty(byteBuf: ByteBuf) { - if (byteBuf.readableBytes() < 1) { - throw EmptyWireFrameException() - } - } + const val SUPPORTED_MAJOR_VERSION: Short = 1 const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + 1 * java.lang.Integer.BYTES const val MARKER_BYTE: Short = 0xFF - const val SUPPORTED_MAJOR_VERSION: Short = 1 + } + } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt new file mode 100644 index 00000000..d6804c7d --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class WireFrameEncoder(val allocator: ByteBufAllocator) { + + fun encode(frame: WireFrame): ByteBuf { + val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size()) + + bb.writeByte(WireFrame.MARKER_BYTE.toInt()) + bb.writeByte(frame.majorVersion.toInt()) + bb.writeByte(frame.minorVersion.toInt()) + bb.writeInt(frame.payloadSize) + frame.payload.writeTo(bb) + + return bb + } +} + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class WireFrameDecoder { + + fun decodeFirst(byteBuf: ByteBuf): WireFrame { + verifyNotEmpty(byteBuf) + byteBuf.markReaderIndex() + + verifyMarker(byteBuf) + verifyMinimumSize(byteBuf) + + val majorVersion = byteBuf.readUnsignedByte() + val minorVersion = byteBuf.readUnsignedByte() + val payloadSize = verifyPayloadSize(byteBuf) + val payload = ByteData.readFrom(byteBuf, payloadSize) + + return WireFrame(payload, majorVersion, minorVersion, payloadSize) + } + + private fun verifyPayloadSize(byteBuf: ByteBuf): Int = + byteBuf.readInt().let { payloadSize -> + if (byteBuf.readableBytes() < payloadSize) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < payload size") + } else { + payloadSize + } + } + + private fun verifyMinimumSize(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < header size") + } + } + + private fun verifyMarker(byteBuf: ByteBuf) { + val mark = byteBuf.readUnsignedByte() + if (mark != WireFrame.MARKER_BYTE) { + byteBuf.resetReaderIndex() + throw InvalidWireFrameMarkerException(mark) + } + } + + private fun verifyNotEmpty(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < 1) { + throw EmptyWireFrameException() + } + } +} diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt new file mode 100644 index 00000000..ed64f3b3 --- /dev/null +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -0,0 +1,180 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException +import java.nio.charset.Charset + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +object WireFrameCodecsTest : Spek({ + val payloadAsString = "coffeebabe" + val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT) + val decoder = WireFrameDecoder() + + fun createSampleFrame() = + WireFrame(payloadAsString.toByteArray(Charset.defaultCharset())) + + fun encodeSampleFrame() = + createSampleFrame().let { + encoder.encode(it) + } + + describe("Wire Frame invariants") { + + given("input with unsupported major version") { + val input = WireFrame( + payload = ByteData.EMPTY, + majorVersion = 100, + minorVersion = 2, + payloadSize = 0) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too small payload size") { + val input = WireFrame( + payload = ByteData(byteArrayOf(1, 2, 3)), + majorVersion = 1, + minorVersion = 0, + payloadSize = 1) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too big payload size") { + val input = WireFrame( + payload = ByteData(byteArrayOf(1, 2, 3)), + majorVersion = 1, + minorVersion = 0, + payloadSize = 8) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("valid input") { + val payload = byteArrayOf(6, 9, 8, 6) + val input = WireFrame( + payload = ByteData(payload), + majorVersion = 1, + minorVersion = 0, + payloadSize = payload.size) + + it("should pass validation") { + assertThat(input.isValid()).isTrue() + } + } + + + } + + describe("Wire Frame codec") { + + describe("encode-decode methods' compatibility") { + val frame = createSampleFrame() + val encoded = encodeSampleFrame() + val decoded = decoder.decodeFirst(encoded) + + it("should decode major version") { + assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion) + } + + it("should decode minor version") { + assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion) + } + + it("should decode payload size") { + assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize) + } + + it("should decode payload") { + assertThat(decoded.payload.asString()) + .isEqualTo(payloadAsString) + } + } + + describe("TCP framing") { + // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 + + it("should decode message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = decoder.decodeFirst(buff) + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } + + it("should throw exception when not even header fits") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + + assertThatExceptionOfType(MissingWireFrameBytesException::class.java) + .isThrownBy { decoder.decodeFirst(buff) } + } + + it("should throw exception when first byte is not 0xFF but length looks ok") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) + + assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) + .isThrownBy { decoder.decodeFirst(buff) } + } + + it("should throw exception when first byte is not 0xFF and length is to short") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + + assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) + .isThrownBy { decoder.decodeFirst(buff) } + } + + it("should throw exception when payload doesn't fit") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + buff.writerIndex(buff.writerIndex() - 2) + + assertThatExceptionOfType(MissingWireFrameBytesException::class.java) + .isThrownBy { decoder.decodeFirst(buff) } + } + + } + } + +}) \ No newline at end of file diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt deleted file mode 100644 index 00113267..00000000 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt +++ /dev/null @@ -1,185 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.domain - -import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException -import java.nio.charset.Charset - -/** - * @author Piotr Jaszczyk - * @since June 2018 - */ -object WireFrameTest : Spek({ - val payloadAsString = "coffeebabe" - - fun createSampleFrame() = - WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset()))) - - fun encodeSampleFrame() = - createSampleFrame().let { - Unpooled.buffer() - .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT)) - - } - - describe("Wire Frame invariants") { - - given("input with unsupported major version") { - val input = WireFrame( - payload = Unpooled.EMPTY_BUFFER, - majorVersion = 100, - minorVersion = 2, - payloadSize = 0) - - it("should fail validation") { - assertThat(input.isValid()).isFalse() - } - } - - given("input with too small payload size") { - val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), - majorVersion = 1, - minorVersion = 0, - payloadSize = 1) - - it("should fail validation") { - assertThat(input.isValid()).isFalse() - } - } - - given("input with too big payload size") { - val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), - majorVersion = 1, - minorVersion = 0, - payloadSize = 8) - - it("should fail validation") { - assertThat(input.isValid()).isFalse() - } - } - - given("valid input") { - val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( - payload = Unpooled.wrappedBuffer(payload), - majorVersion = 1, - minorVersion = 0, - payloadSize = payload.size) - - it("should pass validation") { - assertThat(input.isValid()).isTrue() - } - } - - - } - - describe("Wire Frame codec") { - - describe("encode-decode methods' compatibility") { - val frame = createSampleFrame() - val encoded = encodeSampleFrame() - val decoded = WireFrame.decodeFirst(encoded) - - it("should decode major version") { - assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion) - } - - it("should decode minor version") { - assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion) - } - - it("should decode payload size") { - assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize) - } - - it("should decode payload") { - assertThat(decoded.payload.toString(Charset.defaultCharset())) - .isEqualTo(payloadAsString) - } - - it("should retain decoded payload") { - encoded.release() - assertThat(decoded.payload.refCnt()).isEqualTo(1) - } - } - - describe("TCP framing") { - // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 - - it("should decode message leaving rest unread") { - val buff = Unpooled.buffer() - .writeBytes(encodeSampleFrame()) - .writeByte(0xAA) - val decoded = WireFrame.decodeFirst(buff) - - assertThat(decoded.isValid()).describedAs("should be valid").isTrue() - assertThat(buff.readableBytes()).isEqualTo(1) - } - - it("should throw exception when not even header fits") { - val buff = Unpooled.buffer() - .writeByte(0xFF) - - assertThatExceptionOfType(MissingWireFrameBytesException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } - } - - it("should throw exception when first byte is not 0xFF but length looks ok") { - val buff = Unpooled.buffer() - .writeByte(0xAA) - .writeBytes("some garbage".toByteArray()) - - assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } - } - - it("should throw exception when first byte is not 0xFF and length is to short") { - val buff = Unpooled.buffer() - .writeByte(0xAA) - - assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } - } - - it("should throw exception when payload doesn't fit") { - val buff = Unpooled.buffer() - .writeBytes(encodeSampleFrame()) - buff.writerIndex(buff.writerIndex() - 2) - - assertThatExceptionOfType(MissingWireFrameBytesException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } - } - - } - } - -}) \ No newline at end of file -- cgit 1.2.3-korg