aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-core/src
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt35
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt121
5 files changed, 140 insertions, 47 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index d6158481..ff997173 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 3246cf59..ceae78c9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,13 +25,18 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.SynchronousSink
+import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,9 +55,10 @@ internal class VesHvCollector(
dataStream
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireDecoder::decode)
+ .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- .filter(WireFrame::isValid)
- .map(WireFrame::payload)
+ .filter(PayloadWireFrameMessage::isValid)
+ .map(PayloadWireFrameMessage::payload)
.map(protobufDecoder::decode)
.filter(validator::isValid)
.flatMap(this::findRoute)
@@ -76,11 +82,22 @@ internal class VesHvCollector(
return Flux.empty()
}
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
- wireChunkDecoder.release()
- }
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
companion object {
private val logger = Logger(VesHvCollector::class)
+
+ private val completeStreamOnEOT by lazy {
+ BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
+ when (frame) {
+ is EndOfTransmissionMessage -> {
+ logger.info("Completing stream because of receiving EOT message")
+ sink.complete()
+ }
+ is PayloadWireFrameMessage -> sink.next(frame)
+ else -> sink.error(UnknownWireFrameTypeException(frame))
+ }
+ }
+ }
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 0426ceb1..e9985766 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -75,7 +75,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
onReadIdle(timeout.toMillis()) {
logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
context().channel().close().addListener {
-
if (it.isSuccess)
logger.debug { "Client disconnected because of idle timeout" }
else
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 502505c4..fbff769f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -22,11 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.*
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -44,7 +40,7 @@ internal class WireChunkDecoder(
streamBuffer.release()
}
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+ fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
logIncomingMessage(byteBuf)
if (byteBuf.readableBytes() == 0) {
byteBuf.release()
@@ -55,13 +51,13 @@ internal class WireChunkDecoder(
}
}
- private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+ private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
.unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
when (err) {
is InvalidWireFrame -> IO {
next.error(WireFrameException(err))
@@ -73,20 +69,29 @@ internal class WireChunkDecoder(
}
}
- private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
+ when (frame) {
+ is PayloadWireFrameMessage -> IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ is EndOfTransmissionMessage -> IO {
+ logEndOfTransmissionWireMessage()
+ next.next(frame)
+ }
}
}
-
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: WireFrame) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+ private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ }
+
+ private fun logEndOfTransmissionWireMessage() {
+ logger.trace { "Received end-of-transmission message" }
}
private fun logEndOfData() {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index 33f71684..a9364ed3 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -27,10 +27,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.test.test
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -43,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
val encoder = WireFrameEncoder(alloc)
- fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+ fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
@@ -98,23 +101,23 @@ internal object WireChunkDecoderTest : Spek({
}
given("valid input") {
- val input = WireFrame(samplePayload)
+ val input = PayloadWireFrameMessage(samplePayload)
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
}
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
- .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -123,14 +126,30 @@ internal object WireChunkDecoderTest : Spek({
}
}
+ given("end-of-transmission marker byte with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded end-of-transmission frame and error") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyError(WireFrameException::class.java)
}
@@ -140,16 +159,16 @@ internal object WireChunkDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = encoder.encode(WireFrame(samplePayload))
- val input2 = encoder.encode(WireFrame(anotherPayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -158,15 +177,57 @@ internal object WireChunkDecoderTest : Spek({
}
}
+ given("two payload messages followed by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+ .writeByte(0xAA)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyComplete()
+ }
+ }
+
+ given("two payload messages separated by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeByte(0xAA)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(WireFrame(samplePayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1)
.test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
.verifyError(WireFrameException::class.java)
@@ -183,8 +244,8 @@ internal object WireChunkDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -194,10 +255,10 @@ internal object WireChunkDecoderTest : Spek({
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -207,8 +268,8 @@ internal object WireChunkDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -221,8 +282,8 @@ internal object WireChunkDecoderTest : Spek({
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -232,3 +293,15 @@ internal object WireChunkDecoderTest : Spek({
}
}
})
+
+
+private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
+ if (msg is PayloadWireFrameMessage) {
+ msg
+ } else {
+ fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
+ }
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") \ No newline at end of file