diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2018-09-20 12:17:46 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2018-09-20 14:47:23 +0200 |
commit | 7333951cfec6b79a92b12e70cf679bff2f01825a (patch) | |
tree | 52aa940c4408826679ec2d11d7b51a696519a9f6 /hv-collector-core | |
parent | 0b9207babf9185f2235ebe9fdfa1c936b9997d99 (diff) |
Enhance releasing memory
- Some buffers may be emitted as cancelled and thus they would not be
handled by doOnTerminate method
- Moved data stream creation for Netty inbound to time when
collector is fully functional
Change-Id: If2f2195fadeca957679f6be696802f48a616f48d
Issue-ID: DCAEGEN2-815
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-core')
2 files changed, 18 insertions, 17 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 52689162..f608a2b9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.SynchronousSink @@ -58,7 +57,7 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) - .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .doFinally { releaseBuffersMemory(wireDecoder) } .onErrorResume(::handleErrors) .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a34be7cd..b4ad4b7d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono +import reactor.ipc.netty.ByteBufFlux import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions @@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.port(serverConfig.listenPort) } - private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> { - logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> = + collectorProvider().fold( + { + logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + Mono.empty() + }, + { + logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } + it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + } + ) - val dataStream = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - .receive() - .retain() - return collectorProvider().fold( - { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } - Mono.empty() - }, - { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) }) - - } + fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { onReadIdle(timeout.toMillis()) { |