aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-domain/src/test
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-domain/src/test
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-domain/src/test')
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt90
1 files changed, 63 insertions, 27 deletions
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 4d6f0716..a5242e0f 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -20,19 +20,18 @@
package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
-import arrow.core.identity
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({
val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
+ PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
@@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({
describe("Wire Frame invariants") {
given("input with unsupported version") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 100,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported payload type") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 1,
payloadTypeRaw = 0x69,
@@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too small payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too big payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getOrFail()
+ val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
it("should decode version") {
assertThat(decoded.version).isEqualTo(frame.version)
@@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({
}
}
+
describe("TCP framing") {
// see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
- it("should decode message leaving rest unread") {
+ it("should return error when buffer is empty") {
+ val buff = Unpooled.buffer()
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+ }
+
+ it("should return end-of-transmission message when given end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getOrFail()
- assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
- assertThat(buff.readableBytes()).isEqualTo(1)
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
- it("should return error when not even header fits") {
+ it("should return error when given any single byte other than end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeByte(0xFF)
+ .writeByte(0xEE)
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ }
+ it("should return error when payload message header does not fit") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+ .writeBytes("MOMOM".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
- it("should return error when first byte is not 0xFF but length looks ok") {
+ it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
val buff = Unpooled.buffer()
- .writeByte(0xAA)
+ .writeByte(0x69)
.writeBytes("some garbage".toByteArray())
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
- it("should return error when first byte is not 0xFF and length is to short") {
+ it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
- decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
it("should return error when payload doesn't fit") {
@@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
}
+ it("should decode payload message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
}
- describe("payload size limit"){
+ describe("payload size limit") {
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
- fold({ fail(it.message) }, ::identity) as WireFrame
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
+ fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
+
+private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
+ this as? PayloadWireFrameMessage
+ ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
+
+
+private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
+ decoded.getEndOfTransmissionMessageOrFail()
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")