From 302d27926c76bb99eecc4f74d333d0e8ff240c6e Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 2 Apr 2019 15:40:46 +0200 Subject: Fix shutting down when new config received bug When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk --- .../onap/dcae/collectors/veshv/utils/Closeable.kt | 12 +++------ .../dcae/collectors/veshv/utils/server_handle.kt | 30 +++++++++++++++++----- 2 files changed, 28 insertions(+), 14 deletions(-) (limited to 'sources/hv-collector-utils') diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt index 00b814cc..ec654b32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -19,22 +19,18 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk * @since January 2019 */ interface Closeable { - fun close(): IO = IO.unit + fun close(): Mono = Mono.empty() companion object { fun closeAll(closeables: Iterable) = - IO.monadError().binding { - closeables.forEach { it.close().bind() } - }.fix() + Flux.fromIterable(closeables).flatMap(Closeable::close).then() } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 5b582ed5..670ab4ac 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import reactor.netty.DisposableServer -import java.time.Duration /** * @author Piotr Jaszczyk @@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable { * @author Piotr Jaszczyk * @since August 2018 */ -class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun close() = IO { - ctx.disposeNow(SHUTDOWN_TIMEOUT) - } +class NettyServerHandle(private val ctx: DisposableServer, + private val closeAction: Mono = Mono.empty()) + : ServerHandle(ctx.host(), ctx.port()) { + + override fun close(): Mono = + Mono.just(ctx) + .filter { !it.isDisposed } + .flatMap { + closeAction.thenReturn(it) + } + .then(dispose()) + + private fun dispose(): Mono = + Mono.create { callback -> + logger.debug { "About to dispose NettyServer" } + ctx.dispose() + ctx.onDispose { + logger.debug { "Netty server disposed" } + callback.success() + } + } override fun await() = IO { ctx.channel().closeFuture().sync() } companion object { - private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) + private val logger = Logger(NettyServerHandle::class) } } -- cgit 1.2.3-korg