diff options
Diffstat (limited to 'hv-collector-core')
3 files changed, 31 insertions, 135 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index f608a2b9..8970e03e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.core.publisher.SynchronousSink -import java.util.function.BiConsumer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -53,7 +48,7 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(PayloadWireFrameMessage::isValid) + .filter(WireFrameMessage::isValid) .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) @@ -62,14 +57,13 @@ internal class VesHvCollector( .then() } - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(decoder::decode) - .handle(completeStreamOnEOT) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux - .map(PayloadWireFrameMessage::payload) + private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux + .map(WireFrameMessage::payload) .map(protobufDecoder::decode) .flatMap { omitWhenNone(it) } @@ -95,18 +89,5 @@ internal class VesHvCollector( companion object { private val logger = Logger(VesHvCollector::class) - - private val completeStreamOnEOT by lazy { - BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink -> - when (frame) { - is EndOfTransmissionMessage -> { - logger.info("Completing stream because of receiving EOT message") - sink.complete() - } - is PayloadWireFrameMessage -> sink.next(frame) - else -> sink.error(UnknownWireFrameTypeException(frame)) - } - } - } } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 80f62d1a..0775c652 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -76,15 +74,9 @@ internal class WireChunkDecoder( } private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - when (frame) { - is PayloadWireFrameMessage -> IO { - logDecodedWireMessage(frame) - next.next(frame) - } - is EndOfTransmissionMessage -> IO { - logEndOfTransmissionWireMessage() - next.next(frame) - } + IO { + logDecodedWireMessage(frame) + next.next(frame) } } @@ -92,14 +84,10 @@ internal class WireChunkDecoder( logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } - private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) { + private fun logDecodedWireMessage(wire: WireFrameMessage) { logger.trace { "Wire payload size: ${wire.payloadSize} B" } } - private fun logEndOfTransmissionWireMessage() { - logger.trace { "Received end-of-transmission message" } - } - private fun logEndOfData() { logger.trace { "End of data in current TCP buffer" } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index a9364ed3..d214ffcf 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -27,13 +27,10 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import reactor.test.test -import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> @@ -46,7 +43,7 @@ internal object WireChunkDecoderTest : Spek({ val encoder = WireFrameEncoder(alloc) - fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame)) + fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) @@ -101,23 +98,23 @@ internal object WireChunkDecoderTest : Spek({ } given("valid input") { - val input = PayloadWireFrameMessage(samplePayload) + val input = WireFrameMessage(samplePayload) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() } } given("valid input with part of next frame") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) - .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3)) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3)) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() } @@ -126,30 +123,14 @@ internal object WireChunkDecoderTest : Spek({ } } - given("end-of-transmission marker byte with garbage after it") { - val input = Unpooled.buffer() - .writeByte(0xAA) - .writeBytes(Unpooled.wrappedBuffer(samplePayload)) - - it("should yield decoded end-of-transmission frame and error") { - createInstance().decode(input).test() - .expectNextMatches { it is EndOfTransmissionMessage } - .verifyError(WireFrameException::class.java) - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - given("valid input with garbage after it") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) .writeBytes(Unpooled.wrappedBuffer(samplePayload)) it("should yield decoded input frame and error") { createInstance().decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyError(WireFrameException::class.java) } @@ -159,16 +140,16 @@ internal object WireChunkDecoderTest : Spek({ } given("two inputs containing two separate messages") { - val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + val input1 = encoder.encode(WireFrameMessage(samplePayload)) + val input2 = encoder.encode(WireFrameMessage(anotherPayload)) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } .verifyComplete() } @@ -177,57 +158,15 @@ internal object WireChunkDecoderTest : Spek({ } } - given("two payload messages followed by end-of-transmission marker byte") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) - - val input = Unpooled.buffer() - .writeBytes(frame1) - .writeBytes(frame2) - .writeByte(0xAA) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } - .expectNextMatches { it is EndOfTransmissionMessage } - .verifyComplete() - } - } - - given("two payload messages separated by end-of-transmission marker byte") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) - - val input = Unpooled.buffer() - .writeBytes(frame1) - .writeByte(0xAA) - .writeBytes(frame2) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } - .expectNextMatches { it is EndOfTransmissionMessage } - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input) - } - } - given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val input1 = encoder.encode(WireFrameMessage(samplePayload)) val input2 = Unpooled.wrappedBuffer(anotherPayload) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1) .test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() .verifyError(WireFrameException::class.java) @@ -244,8 +183,8 @@ internal object WireChunkDecoderTest : Spek({ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + val frame1 = encoder.encode(WireFrameMessage(samplePayload)) + val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1) @@ -255,10 +194,10 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } .verifyComplete() } @@ -268,8 +207,8 @@ internal object WireChunkDecoderTest : Spek({ } given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + val frame1 = encoder.encode(WireFrameMessage(samplePayload)) + val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1, 5) @@ -282,8 +221,8 @@ internal object WireChunkDecoderTest : Spek({ cut.decode(input1).test() .verifyComplete() cut.decode(input2).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } .verifyComplete() } @@ -292,16 +231,4 @@ internal object WireChunkDecoderTest : Spek({ } } } -}) - - -private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage = - if (msg is PayloadWireFrameMessage) { - msg - } else { - fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}") - } - -private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = - this as? EndOfTransmissionMessage - ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file +})
\ No newline at end of file |