aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/test/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 11:52:16 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 13:06:43 +0200
commit07bbbf71cd65b29f446a1b475add87f20365db83 (patch)
treee64fcf12c21e46358043744476d68765634d7f6f /hv-collector-core/src/test/kotlin
parent767d0464a19e0949d2919e6df15c9653dec50503 (diff)
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/test/kotlin')
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt7
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt104
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt233
3 files changed, 238 insertions, 106 deletions
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 8d9e4962..263ad441 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -20,8 +20,10 @@
package org.onap.dcae.collectors.veshv.impl
import com.google.protobuf.ByteString
+import com.google.protobuf.InvalidProtocolBufferException
import io.netty.buffer.Unpooled.wrappedBuffer
import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
@@ -60,8 +62,9 @@ internal object VesDecoderTest : Spek({
on("invalid ves hv message bytes") {
val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
- it("should return empty result") {
- assertThat(cut.decode(rawMessageBytes)).isNull()
+ it("should throw error") {
+ assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+ .isThrownBy { cut.decode(rawMessageBytes) }
}
}
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
deleted file mode 100644
index 81706ce4..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.onap.dcae.collectors.veshv.impl
-
-import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
- * @since May 2018
- */
-internal object WireDecoderTest : Spek({
- describe("decoding wire protocol") {
- val cut = WireDecoder()
-
- fun decode(frame: WireFrame) =
- cut.decode(
- frame.encode(UnpooledByteBufAllocator.DEFAULT))
-
- given("empty input") {
- val input = Unpooled.EMPTY_BUFFER
-
- it("should yield empty result") {
- assertThat(cut.decode(input)).isNull()
- }
- }
-
- given("input without 0xFF first byte") {
- val input = WireFrame(
- payload = Unpooled.EMPTY_BUFFER,
- mark = 0x10,
- majorVersion = 1,
- minorVersion = 2,
- payloadSize = 0)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("input with unsupported major version") {
- val input = WireFrame(
- payload = Unpooled.EMPTY_BUFFER,
- mark = 0xFF,
- majorVersion = 100,
- minorVersion = 2,
- payloadSize = 0)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("input with too small payload size") {
- val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 0,
- payloadSize = 1)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("input with too big payload size") {
- val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 0,
- payloadSize = 8)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("valid input") {
- val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
- payload = Unpooled.wrappedBuffer(payload),
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 0,
- payloadSize = payload.size)
-
-
- it("should yield Google Protocol Buffers payload") {
- val result = decode(input)!!
-
- val actualPayload = ByteArray(result.readableBytes())
- result.readBytes(actualPayload)
-
- assertThat(actualPayload).containsExactly(*payload)
- }
- }
- }
-})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
new file mode 100644
index 00000000..0a10aa1f
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import reactor.test.test
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireDecoderTest : Spek({
+ val alloc = UnpooledByteBufAllocator.DEFAULT
+ val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
+ val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
+
+ fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc))
+
+ fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should be released: $bb ref count")
+ .isEqualTo(0)
+ }
+ }
+
+ fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should not be released: $bb ref count")
+ .isEqualTo(1)
+ }
+ }
+
+ describe("decoding wire protocol") {
+ given("empty input") {
+ val input = Unpooled.EMPTY_BUFFER
+
+ it("should yield empty result") {
+ WireDecoder().decode(input).test().verifyComplete()
+ }
+ }
+
+ given("input with no readable bytes") {
+ val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
+
+ it("should yield empty result") {
+ WireDecoder().decode(input).test().verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
+ given("invalid input (not starting with marker)") {
+ val input = Unpooled.wrappedBuffer(samplePayload)
+
+ it("should yield error") {
+ WireDecoder().decode(input).test()
+ .verifyError(InvalidWireFrameMarkerException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input") {
+ val input = WireFrame(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded input frame") {
+ WireDecoder().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+ }
+
+ given("valid input with part of next frame") {
+ val input = Unpooled.buffer()
+ .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+ .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3))
+
+ it("should yield decoded input frame") {
+ WireDecoder().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded input frame and error") {
+ WireDecoder().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyError(InvalidWireFrameMarkerException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("two inputs containing two separate messages") {
+ val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing 1st frame and 2nd input containing garbage") {
+ val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val input2 = Unpooled.wrappedBuffer(anotherPayload)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1)
+ .doOnNext {
+ // releasing retained payload
+ it.payload.release()
+ }
+ .test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .verifyError(InvalidWireFrameMarkerException::class.java)
+ }
+
+ it("should release memory for 1st input") {
+ verifyMemoryReleased(input1)
+ }
+
+ it("should leave memory unreleased for 2nd input") {
+ verifyMemoryNotReleased(input2)
+ }
+ }
+
+
+ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
+ val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2, 3)
+ val input2 = Unpooled.buffer().writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
+ val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1, 5)
+ val input2 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1).test()
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+ }
+})