aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-04 13:51:29 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 09:06:13 +0200
commit5644fbd17af113c2d65ffbad71548eb26898ee18 (patch)
tree2a4a4b4515658cf9d1467dc5c9f136eb4454d8de
parent4b8cfb3e5bafc0cb078e37f64d0f21e8dfb0916a (diff)
Fix wire protocol decoder refCnt issue
We should use retain + slice because every reactor-netty operator automatically releases the buffer. Change-Id: Ie0282e70fadb56d56fc410a08e036fb0ca10584c Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt55
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt9
4 files changed, 72 insertions, 11 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
index 306b7762..ffd59bdc 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -81,7 +81,7 @@ data class WireFrame(val payload: ByteBuf,
val majorVersion = byteBuf.readUnsignedByte()
val minorVersion = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
- val payload = byteBuf.slice()
+ val payload = byteBuf.retainedSlice()
return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index af9d0b0a..a3f26ce5 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -32,11 +32,11 @@ import reactor.core.publisher.Mono
* @since May 2018
*/
internal class VesHvCollector(
- val wireDecoder: WireDecoder,
- val protobufDecoder: VesDecoder,
- val validator: MessageValidator,
- val router: Router,
- val sink: Sink) : Collector {
+ private val wireDecoder: WireDecoder,
+ private val protobufDecoder: VesDecoder,
+ private val validator: MessageValidator,
+ private val router: Router,
+ private val sink: Sink) : Collector {
override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
dataStream
.flatMap(this::decodeWire)
@@ -47,7 +47,7 @@ internal class VesHvCollector(
.doOnNext(this::releaseMemory)
.then()
- private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode)
+ private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
@@ -71,6 +71,11 @@ internal class VesHvCollector(
msg.rawMessage.release()
}
+
+
+ private fun <T>omitWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> =
+ Mono.justOrEmpty(mapper(input))
+
private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
val result = mapper(input)
return if (result == null) {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
new file mode 100644
index 00000000..5a923c4e
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
@@ -0,0 +1,55 @@
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.Unpooled
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+object WireFrameTest : Spek({
+ describe("Wire Frame codec") {
+ describe("encode-decode methods' compatibility") {
+ val payloadContent = "test"
+ val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII))
+ val frame = WireFrame(payload = payload,
+ majorVersion = 1,
+ minorVersion = 2,
+ mark = 0xFF,
+ payloadSize = payload.readableBytes())
+
+ val encoded = frame.encode(ByteBufAllocator.DEFAULT)
+ val decoded = WireFrame.decode(encoded)
+
+ it("should decode major version") {
+ assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
+ }
+
+ it("should decode minor version") {
+ assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion)
+ }
+
+ it("should decode mark") {
+ assertThat(decoded.mark).isEqualTo(frame.mark)
+ }
+
+ it("should decode payload size") {
+ assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
+ }
+
+ it("should decode payload") {
+ assertThat(decoded.payload.toString(Charsets.US_ASCII))
+ .isEqualTo(payloadContent)
+ }
+
+ it("should retain decoded payload") {
+ encoded.release()
+ assertThat(decoded.payload.refCnt()).isEqualTo(1)
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2cfb785e..5990fd0a 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -47,21 +47,22 @@ object VesHvSpecification : Spek({
val msgWithInvalidPayload = invalidVesMessage()
val msgWithInvalidFrame = invalidWireFrame()
val validMessage = vesMessage(Domain.HVRANMEAS)
+ val refCntBeforeSending = msgWithInvalidDomain.refCnt()
sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
assertThat(msgWithInvalidDomain.refCnt())
.describedAs("message with invalid domain should be released")
- .isEqualTo(0)
+ .isEqualTo(refCntBeforeSending)
assertThat(msgWithInvalidPayload.refCnt())
.describedAs("message with invalid payload should be released")
- .isEqualTo(0)
+ .isEqualTo(refCntBeforeSending)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
- .isEqualTo(0)
+ .isEqualTo(refCntBeforeSending)
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
- .isEqualTo(0)
+ .isEqualTo(refCntBeforeSending)
}
}