summaryrefslogtreecommitdiffstats
path: root/hv-collector-domain
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-08 16:29:31 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 07:06:19 +0200
commit7c3b59560f015b65882a56db585b7d4bdd10d434 (patch)
tree4c15d3657e373d3a681fdd2ab865623aeecc82e7 /hv-collector-domain
parent07bbbf71cd65b29f446a1b475add87f20365db83 (diff)
Implement Kafka Sink
Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain')
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt58
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt75
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt98
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt (renamed from hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt)37
4 files changed, 178 insertions, 90 deletions
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
new file mode 100644
index 00000000..2b84e3f1
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import com.google.protobuf.MessageLite
+import io.netty.buffer.ByteBuf
+import java.nio.charset.Charset
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ByteData(private val data: ByteArray) {
+
+ fun size() = data.size
+
+ /**
+ * This will expose mutable state of the data.
+ *
+ * @return wrapped data buffer (NOT a copy)
+ */
+ fun unsafeAsArray() = data
+
+ fun writeTo(byteBuf: ByteBuf) {
+ byteBuf.writeBytes(data)
+ }
+
+ fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset)
+
+ companion object {
+ val EMPTY = ByteData(byteArrayOf())
+
+ fun readFrom(byteBuf: ByteBuf, length: Int): ByteData {
+ val dataArray = ByteArray(length)
+ byteBuf.readBytes(dataArray)
+ return ByteData(dataArray)
+ }
+ }
+}
+
+fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
index 8c8b4718..caf13c53 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -19,12 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
*
@@ -55,82 +49,25 @@ import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesExc
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class WireFrame(val payload: ByteBuf,
+data class WireFrame(val payload: ByteData,
val majorVersion: Short,
val minorVersion: Short,
val payloadSize: Int) {
- constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes())
+ constructor(payload: ByteArray) : this(ByteData(payload), 1, 0, payload.size)
fun isValid(): Boolean =
majorVersion == SUPPORTED_MAJOR_VERSION
- && payload.readableBytes() == payloadSize
-
- fun encode(allocator: ByteBufAllocator): ByteBuf {
- val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
-
- bb.writeByte(MARKER_BYTE.toInt())
- bb.writeByte(majorVersion.toInt())
- bb.writeByte(minorVersion.toInt())
- bb.writeInt(payloadSize)
- bb.writeBytes(payload)
-
- return bb
- }
+ && payload.size() == payloadSize
companion object {
- fun decodeFirst(byteBuf: ByteBuf): WireFrame {
- verifyNotEmpty(byteBuf)
- byteBuf.markReaderIndex()
-
- verifyMarker(byteBuf)
- verifyMinimumSize(byteBuf)
-
- val majorVersion = byteBuf.readUnsignedByte()
- val minorVersion = byteBuf.readUnsignedByte()
- val payloadSize = verifyPayloadSize(byteBuf)
-
- val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize)
- byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize)
-
- return WireFrame(payload, majorVersion, minorVersion, payloadSize)
- }
-
- private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
- byteBuf.readInt().let { payloadSize ->
- if (byteBuf.readableBytes() < payloadSize) {
- byteBuf.resetReaderIndex()
- throw MissingWireFrameBytesException("readable bytes < payload size")
- } else {
- payloadSize
- }
- }
-
- private fun verifyMinimumSize(byteBuf: ByteBuf) {
- if (byteBuf.readableBytes() < HEADER_SIZE) {
- byteBuf.resetReaderIndex()
- throw MissingWireFrameBytesException("readable bytes < header size")
- }
- }
-
- private fun verifyMarker(byteBuf: ByteBuf) {
- val mark = byteBuf.readUnsignedByte()
- if (mark != MARKER_BYTE) {
- byteBuf.resetReaderIndex()
- throw InvalidWireFrameMarkerException(mark)
- }
- }
-
- private fun verifyNotEmpty(byteBuf: ByteBuf) {
- if (byteBuf.readableBytes() < 1) {
- throw EmptyWireFrameException()
- }
- }
+ const val SUPPORTED_MAJOR_VERSION: Short = 1
const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
1 * java.lang.Integer.BYTES
const val MARKER_BYTE: Short = 0xFF
- const val SUPPORTED_MAJOR_VERSION: Short = 1
+
}
+
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
new file mode 100644
index 00000000..d6804c7d
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameEncoder(val allocator: ByteBufAllocator) {
+
+ fun encode(frame: WireFrame): ByteBuf {
+ val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+
+ bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+ bb.writeByte(frame.majorVersion.toInt())
+ bb.writeByte(frame.minorVersion.toInt())
+ bb.writeInt(frame.payloadSize)
+ frame.payload.writeTo(bb)
+
+ return bb
+ }
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameDecoder {
+
+ fun decodeFirst(byteBuf: ByteBuf): WireFrame {
+ verifyNotEmpty(byteBuf)
+ byteBuf.markReaderIndex()
+
+ verifyMarker(byteBuf)
+ verifyMinimumSize(byteBuf)
+
+ val majorVersion = byteBuf.readUnsignedByte()
+ val minorVersion = byteBuf.readUnsignedByte()
+ val payloadSize = verifyPayloadSize(byteBuf)
+ val payload = ByteData.readFrom(byteBuf, payloadSize)
+
+ return WireFrame(payload, majorVersion, minorVersion, payloadSize)
+ }
+
+ private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
+ byteBuf.readInt().let { payloadSize ->
+ if (byteBuf.readableBytes() < payloadSize) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < payload size")
+ } else {
+ payloadSize
+ }
+ }
+
+ private fun verifyMinimumSize(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < header size")
+ }
+ }
+
+ private fun verifyMarker(byteBuf: ByteBuf) {
+ val mark = byteBuf.readUnsignedByte()
+ if (mark != WireFrame.MARKER_BYTE) {
+ byteBuf.resetReaderIndex()
+ throw InvalidWireFrameMarkerException(mark)
+ }
+ }
+
+ private fun verifyNotEmpty(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < 1) {
+ throw EmptyWireFrameException()
+ }
+ }
+}
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 00113267..ed64f3b3 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -35,24 +35,24 @@ import java.nio.charset.Charset
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-object WireFrameTest : Spek({
+object WireFrameCodecsTest : Spek({
val payloadAsString = "coffeebabe"
+ val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
+ val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset())))
+ WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
- Unpooled.buffer()
- .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT))
-
+ encoder.encode(it)
}
describe("Wire Frame invariants") {
given("input with unsupported major version") {
val input = WireFrame(
- payload = Unpooled.EMPTY_BUFFER,
+ payload = ByteData.EMPTY,
majorVersion = 100,
minorVersion = 2,
payloadSize = 0)
@@ -64,7 +64,7 @@ object WireFrameTest : Spek({
given("input with too small payload size") {
val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ payload = ByteData(byteArrayOf(1, 2, 3)),
majorVersion = 1,
minorVersion = 0,
payloadSize = 1)
@@ -76,7 +76,7 @@ object WireFrameTest : Spek({
given("input with too big payload size") {
val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ payload = ByteData(byteArrayOf(1, 2, 3)),
majorVersion = 1,
minorVersion = 0,
payloadSize = 8)
@@ -89,7 +89,7 @@ object WireFrameTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
val input = WireFrame(
- payload = Unpooled.wrappedBuffer(payload),
+ payload = ByteData(payload),
majorVersion = 1,
minorVersion = 0,
payloadSize = payload.size)
@@ -107,7 +107,7 @@ object WireFrameTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = WireFrame.decodeFirst(encoded)
+ val decoded = decoder.decodeFirst(encoded)
it("should decode major version") {
assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
@@ -122,14 +122,9 @@ object WireFrameTest : Spek({
}
it("should decode payload") {
- assertThat(decoded.payload.toString(Charset.defaultCharset()))
+ assertThat(decoded.payload.asString())
.isEqualTo(payloadAsString)
}
-
- it("should retain decoded payload") {
- encoded.release()
- assertThat(decoded.payload.refCnt()).isEqualTo(1)
- }
}
describe("TCP framing") {
@@ -139,7 +134,7 @@ object WireFrameTest : Spek({
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = WireFrame.decodeFirst(buff)
+ val decoded = decoder.decodeFirst(buff)
assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
assertThat(buff.readableBytes()).isEqualTo(1)
@@ -150,7 +145,7 @@ object WireFrameTest : Spek({
.writeByte(0xFF)
assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
it("should throw exception when first byte is not 0xFF but length looks ok") {
@@ -159,7 +154,7 @@ object WireFrameTest : Spek({
.writeBytes("some garbage".toByteArray())
assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
it("should throw exception when first byte is not 0xFF and length is to short") {
@@ -167,7 +162,7 @@ object WireFrameTest : Spek({
.writeByte(0xAA)
assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
it("should throw exception when payload doesn't fit") {
@@ -176,7 +171,7 @@ object WireFrameTest : Spek({
buff.writerIndex(buff.writerIndex() - 2)
assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
}