diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-02 15:40:46 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-03 08:51:03 +0200 |
commit | 302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch) | |
tree | c9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-utils | |
parent | 6a00e38550fd1745c3377da2099bf5a615f69053 (diff) |
Fix shutting down when new config received bug
When new configuration has been received and at least one client
connection has been active the collector used to shut down.
Also got rid of some more IO monad usage.
Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d
Issue-ID: DCAEGEN2-1382
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-utils')
2 files changed, 28 insertions, 14 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt index 00b814cc..ec654b32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -19,22 +19,18 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since January 2019 */ interface Closeable { - fun close(): IO<Unit> = IO.unit + fun close(): Mono<Void> = Mono.empty() companion object { fun closeAll(closeables: Iterable<Closeable>) = - IO.monadError().binding { - closeables.forEach { it.close().bind() } - }.fix() + Flux.fromIterable(closeables).flatMap(Closeable::close).then() } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 5b582ed5..670ab4ac 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import reactor.netty.DisposableServer -import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable { * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun close() = IO { - ctx.disposeNow(SHUTDOWN_TIMEOUT) - } +class NettyServerHandle(private val ctx: DisposableServer, + private val closeAction: Mono<Void> = Mono.empty()) + : ServerHandle(ctx.host(), ctx.port()) { + + override fun close(): Mono<Void> = + Mono.just(ctx) + .filter { !it.isDisposed } + .flatMap { + closeAction.thenReturn(it) + } + .then(dispose()) + + private fun dispose(): Mono<Void> = + Mono.create { callback -> + logger.debug { "About to dispose NettyServer" } + ctx.dispose() + ctx.onDispose { + logger.debug { "Netty server disposed" } + callback.success() + } + } override fun await() = IO<Unit> { ctx.channel().closeFuture().sync() } companion object { - private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) + private val logger = Logger(NettyServerHandle::class) } } |