aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-09-20 12:17:46 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2018-09-20 14:47:23 +0200
commit7333951cfec6b79a92b12e70cf679bff2f01825a (patch)
tree52aa940c4408826679ec2d11d7b51a696519a9f6 /hv-collector-core/src/main
parent0b9207babf9185f2235ebe9fdfa1c936b9997d99 (diff)
Enhance releasing memory
- Some buffers may be emitted as cancelled and thus they would not be handled by doOnTerminate method - Moved data stream creation for Netty inbound to time when collector is fully functional Change-Id: If2f2195fadeca957679f6be696802f48a616f48d Issue-ID: DCAEGEN2-815 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-core/src/main')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt32
2 files changed, 18 insertions, 17 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 52689162..f608a2b9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SynchronousSink
@@ -58,7 +57,7 @@ internal class VesHvCollector(
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
- .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .doFinally { releaseBuffersMemory(wireDecoder) }
.onErrorResume(::handleErrors)
.then()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index a34be7cd..b4ad4b7d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
+import reactor.ipc.netty.ByteBufFlux
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
@@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
opts.port(serverConfig.listenPort)
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
- logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
+ it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ }
+ )
- val dataStream = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- .receive()
- .retain()
- return collectorProvider().fold(
- {
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
- Mono.empty()
- },
- { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
-
- }
+ fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {