From 7333951cfec6b79a92b12e70cf679bff2f01825a Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 20 Sep 2018 12:17:46 +0200 Subject: Enhance releasing memory - Some buffers may be emitted as cancelled and thus they would not be handled by doOnTerminate method - Moved data stream creation for Netty inbound to time when collector is fully functional Change-Id: If2f2195fadeca957679f6be696802f48a616f48d Issue-ID: DCAEGEN2-815 Signed-off-by: Filip Krzywka --- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 3 +- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 32 ++++++++++++---------- 2 files changed, 18 insertions(+), 17 deletions(-) (limited to 'hv-collector-core') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 52689162..f608a2b9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.SynchronousSink @@ -58,7 +57,7 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) - .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .doFinally { releaseBuffersMemory(wireDecoder) } .onErrorResume(::handleErrors) .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a34be7cd..b4ad4b7d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono +import reactor.ipc.netty.ByteBufFlux import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions @@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.port(serverConfig.listenPort) } - private fun handleConnection(nettyInbound: NettyInbound): Mono { - logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + private fun handleConnection(nettyInbound: NettyInbound): Mono = + collectorProvider().fold( + { + logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + Mono.empty() + }, + { + logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } + it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + } + ) - val dataStream = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - .receive() - .retain() - return collectorProvider().fold( - { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } - Mono.empty() - }, - { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) }) - - } + fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { onReadIdle(timeout.toMillis()) { -- cgit 1.2.3-korg