From 5644fbd17af113c2d65ffbad71548eb26898ee18 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Mon, 4 Jun 2018 13:51:29 +0200 Subject: Fix wire protocol decoder refCnt issue We should use retain + slice because every reactor-netty operator automatically releases the buffer. Change-Id: Ie0282e70fadb56d56fc410a08e036fb0ca10584c Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../onap/dcae/collectors/veshv/domain/WireFrame.kt | 2 +- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 17 ++++--- .../dcae/collectors/veshv/domain/WireFrameTest.kt | 55 ++++++++++++++++++++++ .../veshv/tests/component/VesHvSpecification.kt | 9 ++-- 4 files changed, 72 insertions(+), 11 deletions(-) create mode 100644 hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 306b7762..ffd59bdc 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -81,7 +81,7 @@ data class WireFrame(val payload: ByteBuf, val majorVersion = byteBuf.readUnsignedByte() val minorVersion = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() - val payload = byteBuf.slice() + val payload = byteBuf.retainedSlice() return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index af9d0b0a..a3f26ce5 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -32,11 +32,11 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - val wireDecoder: WireDecoder, - val protobufDecoder: VesDecoder, - val validator: MessageValidator, - val router: Router, - val sink: Sink) : Collector { + private val wireDecoder: WireDecoder, + private val protobufDecoder: VesDecoder, + private val validator: MessageValidator, + private val router: Router, + private val sink: Sink) : Collector { override fun handleConnection(dataStream: Flux): Mono = dataStream .flatMap(this::decodeWire) @@ -47,7 +47,7 @@ internal class VesHvCollector( .doOnNext(this::releaseMemory) .then() - private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode) + private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) @@ -71,6 +71,11 @@ internal class VesHvCollector( msg.rawMessage.release() } + + + private fun omitWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono = + Mono.justOrEmpty(mapper(input)) + private fun releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono { val result = mapper(input) return if (result == null) { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt new file mode 100644 index 00000000..5a923c4e --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt @@ -0,0 +1,55 @@ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.Unpooled +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +object WireFrameTest : Spek({ + describe("Wire Frame codec") { + describe("encode-decode methods' compatibility") { + val payloadContent = "test" + val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII)) + val frame = WireFrame(payload = payload, + majorVersion = 1, + minorVersion = 2, + mark = 0xFF, + payloadSize = payload.readableBytes()) + + val encoded = frame.encode(ByteBufAllocator.DEFAULT) + val decoded = WireFrame.decode(encoded) + + it("should decode major version") { + assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion) + } + + it("should decode minor version") { + assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion) + } + + it("should decode mark") { + assertThat(decoded.mark).isEqualTo(frame.mark) + } + + it("should decode payload size") { + assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize) + } + + it("should decode payload") { + assertThat(decoded.payload.toString(Charsets.US_ASCII)) + .isEqualTo(payloadContent) + } + + it("should retain decoded payload") { + encoded.release() + assertThat(decoded.payload.refCnt()).isEqualTo(1) + } + } + } +}) \ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2cfb785e..5990fd0a 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -47,21 +47,22 @@ object VesHvSpecification : Spek({ val msgWithInvalidPayload = invalidVesMessage() val msgWithInvalidFrame = invalidWireFrame() val validMessage = vesMessage(Domain.HVRANMEAS) + val refCntBeforeSending = msgWithInvalidDomain.refCnt() sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage) assertThat(msgWithInvalidDomain.refCnt()) .describedAs("message with invalid domain should be released") - .isEqualTo(0) + .isEqualTo(refCntBeforeSending) assertThat(msgWithInvalidPayload.refCnt()) .describedAs("message with invalid payload should be released") - .isEqualTo(0) + .isEqualTo(refCntBeforeSending) assertThat(msgWithInvalidFrame.refCnt()) .describedAs("message with invalid frame should be released") - .isEqualTo(0) + .isEqualTo(refCntBeforeSending) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") - .isEqualTo(0) + .isEqualTo(refCntBeforeSending) } } -- cgit 1.2.3-korg