diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
16 files changed, 407 insertions, 210 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index d6158481..ff997173 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.model.ServerConfiguration import reactor.core.publisher.Flux import reactor.core.publisher.Mono diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 3246cf59..ceae78c9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,13 +25,18 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.SynchronousSink +import java.util.function.BiConsumer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -50,9 +55,10 @@ internal class VesHvCollector( dataStream .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireDecoder::decode) + .handle(completeStreamOnEOT) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - .filter(WireFrame::isValid) - .map(WireFrame::payload) + .filter(PayloadWireFrameMessage::isValid) + .map(PayloadWireFrameMessage::payload) .map(protobufDecoder::decode) .filter(validator::isValid) .flatMap(this::findRoute) @@ -76,11 +82,22 @@ internal class VesHvCollector( return Flux.empty() } - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { - wireChunkDecoder.release() - } + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() companion object { private val logger = Logger(VesHvCollector::class) + + private val completeStreamOnEOT by lazy { + BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink -> + when (frame) { + is EndOfTransmissionMessage -> { + logger.info("Completing stream because of receiving EOT message") + sink.complete() + } + is PayloadWireFrameMessage -> sink.next(frame) + else -> sink.error(UnknownWireFrameTypeException(frame)) + } + } + } } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0426ceb1..e9985766 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -75,7 +75,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, onReadIdle(timeout.toMillis()) { logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } context().channel().close().addListener { - if (it.isSuccess) logger.debug { "Client disconnected because of idle timeout" } else diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 502505c4..fbff769f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -22,11 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError +import org.onap.dcae.collectors.veshv.domain.* import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -44,7 +40,7 @@ internal class WireChunkDecoder( streamBuffer.release() } - fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer { + fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer { logIncomingMessage(byteBuf) if (byteBuf.readableBytes() == 0) { byteBuf.release() @@ -55,13 +51,13 @@ internal class WireChunkDecoder( } } - private fun generateFrames(): Flux<WireFrame> = Flux.generate { next -> + private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) .unsafeRunSync() } - private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err -> + private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> when (err) { is InvalidWireFrame -> IO { next.error(WireFrameException(err)) @@ -73,20 +69,29 @@ internal class WireChunkDecoder( } } - private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) + private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> + when (frame) { + is PayloadWireFrameMessage -> IO { + logDecodedWireMessage(frame) + next.next(frame) + } + is EndOfTransmissionMessage -> IO { + logEndOfTransmissionWireMessage() + next.next(frame) + } } } - private fun logIncomingMessage(wire: ByteBuf) { logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } - private fun logDecodedWireMessage(wire: WireFrame) { - logger.trace { "Wire payload size: ${wire.payloadSize} B." } + private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) { + logger.trace { "Wire payload size: ${wire.payloadSize} B" } + } + + private fun logEndOfTransmissionWireMessage() { + logger.trace { "Received end-of-transmission message" } } private fun logEndOfData() { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index 33f71684..a9364ed3 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -27,10 +27,13 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import reactor.test.test +import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> @@ -43,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({ val encoder = WireFrameEncoder(alloc) - fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) + fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) @@ -98,23 +101,23 @@ internal object WireChunkDecoderTest : Spek({ } given("valid input") { - val input = WireFrame(samplePayload) + val input = PayloadWireFrameMessage(samplePayload) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() } } given("valid input with part of next frame") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(WireFrame(samplePayload))) - .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3)) + .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3)) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() } @@ -123,14 +126,30 @@ internal object WireChunkDecoderTest : Spek({ } } + given("end-of-transmission marker byte with garbage after it") { + val input = Unpooled.buffer() + .writeByte(0xAA) + .writeBytes(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded end-of-transmission frame and error") { + createInstance().decode(input).test() + .expectNextMatches { it is EndOfTransmissionMessage } + .verifyError(WireFrameException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + given("valid input with garbage after it") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(WireFrame(samplePayload))) + .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) .writeBytes(Unpooled.wrappedBuffer(samplePayload)) it("should yield decoded input frame and error") { createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyError(WireFrameException::class.java) } @@ -140,16 +159,16 @@ internal object WireChunkDecoderTest : Spek({ } given("two inputs containing two separate messages") { - val input1 = encoder.encode(WireFrame(samplePayload)) - val input2 = encoder.encode(WireFrame(anotherPayload)) + val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } .verifyComplete() } @@ -158,15 +177,57 @@ internal object WireChunkDecoderTest : Spek({ } } + given("two payload messages followed by end-of-transmission marker byte") { + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + + val input = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2) + .writeByte(0xAA) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input).test() + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it is EndOfTransmissionMessage } + .verifyComplete() + } + } + + given("two payload messages separated by end-of-transmission marker byte") { + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + + val input = Unpooled.buffer() + .writeBytes(frame1) + .writeByte(0xAA) + .writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input).test() + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it is EndOfTransmissionMessage } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input) + } + } + given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = encoder.encode(WireFrame(samplePayload)) + val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) val input2 = Unpooled.wrappedBuffer(anotherPayload) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1) .test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() .verifyError(WireFrameException::class.java) @@ -183,8 +244,8 @@ internal object WireChunkDecoderTest : Spek({ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = encoder.encode(WireFrame(samplePayload)) - val frame2 = encoder.encode(WireFrame(anotherPayload)) + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1) @@ -194,10 +255,10 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } .verifyComplete() } @@ -207,8 +268,8 @@ internal object WireChunkDecoderTest : Spek({ } given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = encoder.encode(WireFrame(samplePayload)) - val frame2 = encoder.encode(WireFrame(anotherPayload)) + val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1, 5) @@ -221,8 +282,8 @@ internal object WireChunkDecoderTest : Spek({ cut.decode(input1).test() .verifyComplete() cut.decode(input2).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .expectNextMatches { it.payloadSize == anotherPayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } .verifyComplete() } @@ -232,3 +293,15 @@ internal object WireChunkDecoderTest : Spek({ } } }) + + +private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage = + if (msg is PayloadWireFrameMessage) { + msg + } else { + fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}") + } + +private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + this as? EndOfTransmissionMessage + ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 246fc7ed..5e6e666f 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -37,22 +37,42 @@ object VesHvSpecification : Spek({ describe("VES High Volume Collector") { it("should handle multiple HV RAN events") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) assertThat(messages) .describedAs("should send all events") .hasSize(2) } + + it("should not handle messages received from client after end-of-transmission message") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesMessage(Domain.HVRANMEAS) + val anotherValidMessage = vesMessage(Domain.HVRANMEAS) + val endOfTransmissionMessage = endOfTransmissionMessage() + + val handledEvents = sut.handleConnection(sink, + validMessage, + endOfTransmissionMessage, + anotherValidMessage + ) + + assertThat(handledEvents).hasSize(1) + assertThat(validMessage.refCnt()) + .describedAs("first message should be released") + .isEqualTo(0) + assertThat(endOfTransmissionMessage.refCnt()) + .describedAs("end-of-transmission message should be released") + .isEqualTo(0) + assertThat(anotherValidMessage.refCnt()) + .describedAs("second (not handled) message should not be released") + .isEqualTo(1) + } } describe("Memory management") { it("should release memory for each handled and dropped message") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) val msgWithInvalidFrame = invalidWireFrame() @@ -76,13 +96,30 @@ object VesHvSpecification : Spek({ assertThat(msgWithTooBigPayload.refCnt()) .describedAs("message with payload exceeding 1MiB should be released") .isEqualTo(expectedRefCnt) + } + it("should release memory for end-of-transmission message") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesMessage(Domain.HVRANMEAS) + val endOfTransmissionMessage = endOfTransmissionMessage() + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection(sink, + validMessage, + endOfTransmissionMessage + ) + + assertThat(handledEvents).hasSize(1) + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(endOfTransmissionMessage.refCnt()) + .describedAs("end-of-transmission message should be released") + .isEqualTo(expectedRefCnt) } it("should release memory for each message with invalid payload") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidPayload = invalidVesMessage() val expectedRefCnt = 0 @@ -101,9 +138,7 @@ object VesHvSpecification : Spek({ } it("should release memory for each message with garbage frame") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 @@ -124,9 +159,7 @@ object VesHvSpecification : Spek({ describe("message routing") { it("should direct message to a topic by means of routing configuration") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) @@ -137,8 +170,7 @@ object VesHvSpecification : Spek({ } it("should be able to direct 2 messages from different domains to one topic") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) @@ -149,20 +181,18 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(3) - assertThat(messages.get(0).topic).describedAs("first message topic") + assertThat(messages[0].topic).describedAs("first message topic") .isEqualTo(HVRANMEAS_TOPIC) - assertThat(messages.get(1).topic).describedAs("second message topic") + assertThat(messages[1].topic).describedAs("second message topic") .isEqualTo(HVRANMEAS_TOPIC) - assertThat(messages.get(2).topic).describedAs("last message topic") + assertThat(messages[2].topic).describedAs("last message topic") .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) } it("should drop message if route was not found") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, vesMessage(Domain.OTHER, "first"), vesMessage(Domain.HVRANMEAS, "second"), @@ -181,8 +211,7 @@ object VesHvSpecification : Spek({ val defaultTimeout = Duration.ofSeconds(10) it("should update collector on configuration change") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) val firstCollector = sut.collector @@ -195,8 +224,7 @@ object VesHvSpecification : Spek({ } it("should start routing messages on configuration change") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(configurationWithoutRouting) @@ -216,8 +244,7 @@ object VesHvSpecification : Spek({ } it("should change domain routing on configuration change") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) @@ -244,8 +271,7 @@ object VesHvSpecification : Spek({ } it("should update routing for each client sending one message") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) @@ -274,8 +300,7 @@ object VesHvSpecification : Spek({ it("should not update routing for client sending continuous stream of messages") { - val sink = StoringSink() - val sut = Sut(sink) + val (sut, sink) = vesHvWithStoringSink() sut.configurationProvider.updateConfiguration(basicConfiguration) @@ -311,9 +336,7 @@ object VesHvSpecification : Spek({ describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + val (sut, sink) = vesHvWithStoringSink() val handledMessages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS, "first"), @@ -326,3 +349,10 @@ object VesHvSpecification : Spek({ } }) + +private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { + val sink = StoringSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + return Pair(sut, sink) +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt index e620e6b9..64b4ba26 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt @@ -23,9 +23,7 @@ import com.google.protobuf.ByteString import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.* -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE -import org.onap.ves.HVRanMeasFieldsV5 +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -33,15 +31,19 @@ import java.util.* val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT -fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = + allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB - val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes -} + val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes + } + +fun endOfTransmissionMessage(): ByteBuf = + allocator.buffer().writeByte(0xAA) fun invalidVesMessage(): ByteBuf = allocator.buffer().run { @@ -65,22 +67,25 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run { writeByte(0x01) // content type = GPB } -fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB +fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = + allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB - val gpb = vesEvent( - domain, - id, - ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) - ).toByteString().asReadOnlyByteBuffer() + val gpb = vesEvent( + domain, + id, + ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) + ).toByteString().asReadOnlyByteBuffer() - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes -} + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes + } -fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) = +fun vesEvent(domain: Domain = Domain.HVRANMEAS, + id: String = UUID.randomUUID().toString(), + hvRanMeasFields: ByteString = ByteString.EMPTY) = VesEvent.newBuilder() .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt index db6e1070..4811a2b4 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt @@ -19,6 +19,9 @@ */ package org.onap.dcae.collectors.veshv.domain + +sealed class WireFrameMessage + /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). * @@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class WireFrame(val payload: ByteData, - val version: Short, - val payloadTypeRaw: Short, - val payloadSize: Int) { +data class PayloadWireFrameMessage(val payload: ByteData, + val version: Short, + val payloadTypeRaw: Short, + val payloadSize: Int +) : WireFrameMessage() { constructor(payload: ByteArray) : this( ByteData(payload), @@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData, && payload.size() == payloadSize companion object { + const val MARKER_BYTE: Short = 0xFF + const val SUPPORTED_VERSION: Short = 1 const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + 1 * java.lang.Integer.BYTES - const val MARKER_BYTE: Short = 0xFF + + const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } + + +/** + * This message type should be used by client to indicate that he has finished sending data to collector. + * + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * ┌─────┬───────────────────────┐ + * │octet│ 0 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤ + * │ bit │ 0│ │ │ │ │ │ │ │ + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ 0xAA │ + * └─────┴───────────────────────┘ + * ``` + * + * @since July 2018 + */ + +object EndOfTransmissionMessage : WireFrameMessage() { + const val MARKER_BYTE: Short = 0xAA +} + diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index 39841d6a..ab82dc04 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,6 +24,7 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator */ class WireFrameEncoder(val allocator: ByteBufAllocator) { - fun encode(frame: WireFrame): ByteBuf { - val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size()) + fun encode(frame: PayloadWireFrameMessage): ByteBuf { + val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - bb.writeByte(WireFrame.MARKER_BYTE.toInt()) + bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) bb.writeByte(frame.version.toInt()) bb.writeByte(frame.payloadTypeRaw.toInt()) bb.writeInt(frame.payloadSize) @@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) { */ class WireFrameDecoder { - fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> = + fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) + isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) - else -> parseFrame(byteBuf) + else -> parseWireFrame(byteBuf) } - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE - private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> { + private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 + + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE + + private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> { byteBuf.markReaderIndex() + val byte = byteBuf.readUnsignedByte() - val mark = byteBuf.readUnsignedByte() - if (mark != WireFrame.MARKER_BYTE) { + return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { + Right(EndOfTransmissionMessage) + } else { byteBuf.resetReaderIndex() - return Left(InvalidWireFrameMarker(mark)) + Left(MissingWireFrameHeaderBytes) + } + } + + private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { + byteBuf.markReaderIndex() + + val mark = byteBuf.readUnsignedByte() + return when (mark) { + EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) + PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + else -> { + byteBuf.resetReaderIndex() + Left(InvalidWireFrameMarker(mark)) + } } + } + private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> { val version = byteBuf.readUnsignedByte() val payloadTypeRaw = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() if (payloadSize > MAX_PAYLOAD_SIZE) { + byteBuf.resetReaderIndex() return Left(PayloadSizeExceeded) } @@ -86,10 +109,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize)) - } + return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize)) - companion object { - const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index 626bf329..d82bb25f 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String) sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) -class InvalidWireFrameMarker(actualMarker: Short) - : InvalidWireFrame( - "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) +class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X" + .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) +) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") + + +// Other + +class UnknownWireFrameTypeException(frame: WireFrameMessage) + : Throwable("Unexpected wire frame message type: ${frame.javaClass}") diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 4d6f0716..a5242e0f 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -20,19 +20,18 @@ package org.onap.dcae.collectors.veshv.domain import arrow.core.Either -import arrow.core.identity import io.netty.buffer.Unpooled import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail import org.assertj.core.api.ObjectAssert import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue +import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({ val decoder = WireFrameDecoder() fun createSampleFrame() = - WireFrame(payloadAsString.toByteArray(Charset.defaultCharset())) + PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { @@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { given("input with unsupported version") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 100, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported payload type") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData.EMPTY, version = 1, payloadTypeRaw = 0x69, @@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too small payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({ } given("input with too big payload size") { - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded).getOrFail() + val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() it("should decode version") { assertThat(decoded.version).isEqualTo(frame.version) @@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({ } } + describe("TCP framing") { // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 - it("should decode message leaving rest unread") { + it("should return error when buffer is empty") { + val buff = Unpooled.buffer() + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) } + } + + it("should return end-of-transmission message when given end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeBytes(encodeSampleFrame()) .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff).getOrFail() - assertThat(decoded.isValid()).describedAs("should be valid").isTrue() - assertThat(buff.readableBytes()).isEqualTo(1) + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } - it("should return error when not even header fits") { + it("should return error when given any single byte other than end-of-transmission marker byte") { val buff = Unpooled.buffer() - .writeByte(0xFF) + .writeByte(0xEE) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + } + it("should return error when payload message header does not fit") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + .writeBytes("MOMOM".toByteArray()) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } } - it("should return error when first byte is not 0xFF but length looks ok") { + it("should return error when length looks ok but first byte is not 0xFF or 0xAA") { val buff = Unpooled.buffer() - .writeByte(0xAA) + .writeByte(0x69) .writeBytes("some garbage".toByteArray()) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } } - it("should return error when first byte is not 0xFF and length is to short") { + it("should return end-of-transmission message when length looks ok and first byte is 0xAA") { val buff = Unpooled.buffer() .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) - decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) } it("should return error when payload doesn't fit") { @@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) } } + it("should decode payload message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail() + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } } - describe("payload size limit"){ + describe("payload size limit") { it("should decode successfully when payload size is equal 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = WireFrame( + val input = PayloadWireFrameMessage( payload = ByteData(payload), version = 1, payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, @@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) fold({ assertj(assertThat(it)) }, { fail("Error expected") }) } -private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame = - fold({ fail(it.message) }, ::identity) as WireFrame +private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage = + fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() }) + +private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage = + this as? PayloadWireFrameMessage + ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}") + + +private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) { + decoded.getEndOfTransmissionMessageOrFail() +} + +private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() }) + +private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = + this as? EndOfTransmissionMessage + ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json deleted file mode 100644 index ca8bd885..00000000 --- a/hv-collector-xnf-simulator/sample-request.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "commonEventHeader": { - "version": "sample-version", - "domain": 10, - "sequence": 1, - "priority": 1, - "eventId": "sample-event-id", - "eventName": "sample-event-name", - "eventType": "sample-event-type", - "startEpochMicrosec": 120034455, - "lastEpochMicrosec": 120034455, - "nfNamingCode": "sample-nf-naming-code", - "nfcNamingCode": "sample-nfc-naming-code", - "reportingEntityId": "sample-reporting-entity-id", - "reportingEntityName": "sample-reporting-entity-name", - "sourceId": "sample-source-id", - "sourceName": "sample-source-name" - }, - "messagesAmount": 25000 -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt index f4c92fd4..a6d6af84 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.api -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import reactor.core.publisher.Flux @@ -28,5 +28,5 @@ import reactor.core.publisher.Flux * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> + fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index b67bc644..6346b648 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger import ratpack.exec.Promise @@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> { + private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> { return ctx.request.body .map { Json.createReader(it.inputStream).readObject() } .map { extractMessageParameters(it) } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt index 0d28bad0..baff967a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.ves.VesEventV5.VesEvent @@ -35,7 +35,7 @@ import javax.json.JsonObject */ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> = + override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> = Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { if (messageParameters.amount < 0) it.repeat() @@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat .build() - private fun createMessage(commonHeader: CommonEventHeader): WireFrame = - WireFrame(vesMessageBytes(commonHeader)) + private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage = + PayloadWireFrameMessage(vesMessageBytes(commonHeader)) private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 6487888e..2f9e0b59 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -20,13 +20,13 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<WireFrame>) = IO<Unit> { + fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> { sendRx(messages).block() } - fun sendRx(messages: Flux<WireFrame>): Mono<Void> { + fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { return complete.then() } - private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound): + private fun handler(complete: ReplayProcessor<Void>, + messages: Flux<PayloadWireFrameMessage>, + nettyOutbound: NettyOutbound): Publisher<Void> { - val encoder = WireFrameEncoder(nettyOutbound.alloc()) - val context = nettyOutbound.context() - - context.onClose { - logger.info { "Connection to ${context.address()} has been closed" } - } - - // TODO: Close channel after all messages have been sent - // The code bellow doesn't work because it closes the channel earlier and not all are consumed... -// complete.subscribe { -// context.channel().disconnect().addListener { -// if (it.isSuccess) -// logger.info { "Connection closed" } -// else -// logger.warn("Failed to close the connection", it.cause()) -// } -// } - + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) val frames = messages .map(encoder::encode) .window(MAX_BATCH_SIZE) return nettyOutbound + .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { .clientAuth(ClientAuth.REQUIRE) .build() + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + companion object { private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE private val logger = Logger(XnfSimulator::class) } } |