diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-04 13:51:29 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 09:06:13 +0200 |
commit | 5644fbd17af113c2d65ffbad71548eb26898ee18 (patch) | |
tree | 2a4a4b4515658cf9d1467dc5c9f136eb4454d8de /hv-collector-core/src/main | |
parent | 4b8cfb3e5bafc0cb078e37f64d0f21e8dfb0916a (diff) |
Fix wire protocol decoder refCnt issue
We should use retain + slice because every reactor-netty operator
automatically releases the buffer.
Change-Id: Ie0282e70fadb56d56fc410a08e036fb0ca10584c
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main')
-rw-r--r-- | hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt | 2 | ||||
-rw-r--r-- | hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 17 |
2 files changed, 12 insertions, 7 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 306b7762..ffd59bdc 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -81,7 +81,7 @@ data class WireFrame(val payload: ByteBuf, val majorVersion = byteBuf.readUnsignedByte() val minorVersion = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() - val payload = byteBuf.slice() + val payload = byteBuf.retainedSlice() return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index af9d0b0a..a3f26ce5 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -32,11 +32,11 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - val wireDecoder: WireDecoder, - val protobufDecoder: VesDecoder, - val validator: MessageValidator, - val router: Router, - val sink: Sink) : Collector { + private val wireDecoder: WireDecoder, + private val protobufDecoder: VesDecoder, + private val validator: MessageValidator, + private val router: Router, + private val sink: Sink) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = dataStream .flatMap(this::decodeWire) @@ -47,7 +47,7 @@ internal class VesHvCollector( .doOnNext(this::releaseMemory) .then() - private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode) + private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) @@ -71,6 +71,11 @@ internal class VesHvCollector( msg.rawMessage.release() } + + + private fun <T>omitWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> = + Mono.justOrEmpty(mapper(input)) + private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { val result = mapper(input) return if (result == null) { |