diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-core | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
5 files changed, 140 insertions, 47 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index d6158481..ff997173 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.model.ServerConfiguration import reactor.core.publisher.Flux import reactor.core.publisher.Mono diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 3246cf59..ceae78c9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,13 +25,18 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.SynchronousSink +import java.util.function.BiConsumer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -50,9 +55,10 @@ internal class VesHvCollector( dataStream .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireDecoder::decode) + .handle(completeStreamOnEOT) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - .filter(WireFrame::isValid) - .map(WireFrame::payload) + .filter(PayloadWireFrameMessage::isValid) + .map(PayloadWireFrameMessage::payload) .map(protobufDecoder::decode) .filter(validator::isValid) .flatMap(this::findRoute) @@ -76,11 +82,22 @@ internal class VesHvCollector( return Flux.empty() } - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { - wireChunkDecoder.release() - } + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() companion object { private val logger = Logger(VesHvCollector::class) + + private val completeStreamOnEOT by lazy { + BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink -> + when (frame) { + is EndOfTransmissionMessage -> { + logger.info("Completing stream because of receiving EOT message") + sink.complete() + } + is PayloadWireFrameMessage -> sink.next(frame) + else -> sink.error(UnknownWireFrameTypeException(frame)) + } + } + } } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0426ceb1..e9985766 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -75,7 +75,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, onReadIdle(timeout.toMillis()) { logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } context().channel().close().addListener { - if (it.isSuccess) logger.debug { "Client disconnected because of idle timeout" } else diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 502505c4..fbff769f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -22,11 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError +import org.onap.dcae.collectors.veshv.domain.* import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -44,7 +40,7 @@ internal class WireChunkDecoder( streamBuffer.release() } - fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer { + fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer { logIncomingMessage(byteBuf) if (byteBuf.readableBytes() == 0) { byteBuf.release() @@ -55,13 +51,13 @@ internal class WireChunkDecoder( } } - private fun generateFrames(): Flux<WireFrame> = Flux.generate { next -> + private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) .unsafeRunSync() } - private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err -> + private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> when (err) { is InvalidWireFrame -> IO { next.error(WireFrameException(err)) @@ -73,20 +69,29 @@ internal class WireChunkDecoder( } } - private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) + private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> + when (frame) { + is PayloadWireFrameMessage -> IO { + logDecodedWireMessage(frame) + next.next(frame) + } + is EndOfTransmissionMessage -> IO { + logEndOfTransmissionWireMessage() + next.next(frame) + } } } - private fun logIncomingMessage(wire: ByteBuf) { logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } - private fun logDecodedWireMessage(wire: WireFrame) { - logger.trace { "Wire payload size: ${wire.payloadSize} B." } + private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) { + logger.trace { "Wire payload size: ${wire.payloadSize} B" } + } + + private fun logEndOfTransmissionWireMessage() { + logger.trace { "Received end-of-transmission message" } } private fun logEndOfData() { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index 33f71684..a9364ed3 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -27,10 +27,13 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import reactor.test.test +import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> @@ -43,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({ val encoder = WireFrameEncoder(alloc) - fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) + fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) @@ -98,23 +101,23 @@ internal object WireChunkDecoderTest : Spek({ } given("valid input") { - val input = WireFrame(samplePayload) + val input = PayloadWireFrameMessage(samplePayload) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() } } given("valid input with part of next frame") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(WireFrame(samplePayload))) - .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3)) + .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3)) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() } @@ -123,14 +126,30 @@ internal object WireChunkDecoderTest : Spek({ } } + given("end-of-transmission marker byte with garbage after it") { + val input = Unpooled.buffer() + .writeByte(0xAA) + .writeBytes(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded end-of-transmission frame and error") { + createInstance().decode(input).test() + .expectNextMatches { it is EndOfTransmissionMessage } + .verifyError(WireFrameException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + given("valid input with garbage after it") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(WireFrame(samplePayload))) + .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) .writeBytes(Unpooled.wrappedBuffer(samplePayload)) it("should yield decoded input frame and error") { createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyError(WireFrameException::class.java) } @@ -140,16 +159,16 @@ internal object WireChunkDecoderTest : Spek({ } given("two inputs containing two separate messages") { - val input1 = encoder.encode(WireFrame(samplePayload)) - val input2 = encoder.encode(WireFrame(anotherPayload)) + val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } .verifyComplete() } @@ -158,15 +177,57 @@ internal object WireChunkDecoderTest : Spek({ } } + given("two payload messages followed by end-of-transmission marker byte") { + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + + val input = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2) + .writeByte(0xAA) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input).test() + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it is EndOfTransmissionMessage } + .verifyComplete() + } + } + + given("two payload messages separated by end-of-transmission marker byte") { + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + + val input = Unpooled.buffer() + .writeBytes(frame1) + .writeByte(0xAA) + .writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input).test() + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it is EndOfTransmissionMessage } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input) + } + } + given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = encoder.encode(WireFrame(samplePayload)) + val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) val input2 = Unpooled.wrappedBuffer(anotherPayload) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1) .test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() .verifyError(WireFrameException::class.java) @@ -183,8 +244,8 @@ internal object WireChunkDecoderTest : Spek({ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = encoder.encode(WireFrame(samplePayload)) - val frame2 = encoder.encode(WireFrame(anotherPayload)) + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1) @@ -194,10 +255,10 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } .verifyComplete() } @@ -207,8 +268,8 @@ internal object WireChunkDecoderTest : Spek({ } given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = encoder.encode(WireFrame(samplePayload)) - val frame2 = encoder.encode(WireFrame(anotherPayload)) + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1, 5) @@ -221,8 +282,8 @@ internal object WireChunkDecoderTest : Spek({ cut.decode(input1).test() .verifyComplete() cut.decode(input2).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .expectNextMatches { it.payloadSize == anotherPayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } .verifyComplete() } @@ -232,3 +293,15 @@ internal object WireChunkDecoderTest : Spek({ } } }) + + +private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage = + if (msg is PayloadWireFrameMessage) { + msg + } else { + fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}") + } + +private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + this as? EndOfTransmissionMessage + ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file |