diff options
3 files changed, 21 insertions, 18 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index f9f52b4e..f37c823a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: "3.4" +version: "3.5" services: zookeeper: image: wurstmeister/zookeeper @@ -35,6 +35,8 @@ services: ports: - "6060:6060" - "6061:6061/tcp" + entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", + "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] command: ["--listen-port", "6061", "--health-check-api-port", "6060", "--config-url", "http://consul:8500/v1/kv/veshv-config"] diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 52689162..f608a2b9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.SynchronousSink @@ -58,7 +57,7 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) - .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .doFinally { releaseBuffersMemory(wireDecoder) } .onErrorResume(::handleErrors) .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a34be7cd..b4ad4b7d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono +import reactor.ipc.netty.ByteBufFlux import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions @@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.port(serverConfig.listenPort) } - private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> { - logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> = + collectorProvider().fold( + { + logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + Mono.empty() + }, + { + logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } + it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + } + ) - val dataStream = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - .receive() - .retain() - return collectorProvider().fold( - { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } - Mono.empty() - }, - { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) }) - - } + fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { onReadIdle(timeout.toMillis()) { |