aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/test/kotlin
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-core/src/test/kotlin
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/test/kotlin')
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt121
1 files changed, 97 insertions, 24 deletions
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index 33f71684..a9364ed3 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -27,10 +27,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.test.test
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -43,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
val encoder = WireFrameEncoder(alloc)
- fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+ fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
@@ -98,23 +101,23 @@ internal object WireChunkDecoderTest : Spek({
}
given("valid input") {
- val input = WireFrame(samplePayload)
+ val input = PayloadWireFrameMessage(samplePayload)
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
}
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
- .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -123,14 +126,30 @@ internal object WireChunkDecoderTest : Spek({
}
}
+ given("end-of-transmission marker byte with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded end-of-transmission frame and error") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyError(WireFrameException::class.java)
}
@@ -140,16 +159,16 @@ internal object WireChunkDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = encoder.encode(WireFrame(samplePayload))
- val input2 = encoder.encode(WireFrame(anotherPayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -158,15 +177,57 @@ internal object WireChunkDecoderTest : Spek({
}
}
+ given("two payload messages followed by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+ .writeByte(0xAA)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyComplete()
+ }
+ }
+
+ given("two payload messages separated by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeByte(0xAA)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(WireFrame(samplePayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1)
.test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
.verifyError(WireFrameException::class.java)
@@ -183,8 +244,8 @@ internal object WireChunkDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -194,10 +255,10 @@ internal object WireChunkDecoderTest : Spek({
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -207,8 +268,8 @@ internal object WireChunkDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -221,8 +282,8 @@ internal object WireChunkDecoderTest : Spek({
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -232,3 +293,15 @@ internal object WireChunkDecoderTest : Spek({
}
}
})
+
+
+private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
+ if (msg is PayloadWireFrameMessage) {
+ msg
+ } else {
+ fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
+ }
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") \ No newline at end of file