summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-02 15:40:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-03 08:51:03 +0200
commit302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch)
treec9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-utils
parent6a00e38550fd1745c3377da2099bf5a615f69053 (diff)
Fix shutting down when new config received bug
When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-utils')
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt12
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt30
2 files changed, 28 insertions, 14 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
index 00b814cc..ec654b32 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
@@ -19,22 +19,18 @@
*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
interface Closeable {
- fun close(): IO<Unit> = IO.unit
+ fun close(): Mono<Void> = Mono.empty()
companion object {
fun closeAll(closeables: Iterable<Closeable>) =
- IO.monadError().binding {
- closeables.forEach { it.close().bind() }
- }.fix()
+ Flux.fromIterable(closeables).flatMap(Closeable::close).then()
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index 5b582ed5..670ab4ac 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,8 +20,9 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable {
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun close() = IO {
- ctx.disposeNow(SHUTDOWN_TIMEOUT)
- }
+class NettyServerHandle(private val ctx: DisposableServer,
+ private val closeAction: Mono<Void> = Mono.empty())
+ : ServerHandle(ctx.host(), ctx.port()) {
+
+ override fun close(): Mono<Void> =
+ Mono.just(ctx)
+ .filter { !it.isDisposed }
+ .flatMap {
+ closeAction.thenReturn(it)
+ }
+ .then(dispose())
+
+ private fun dispose(): Mono<Void> =
+ Mono.create { callback ->
+ logger.debug { "About to dispose NettyServer" }
+ ctx.dispose()
+ ctx.onDispose {
+ logger.debug { "Netty server disposed" }
+ callback.success()
+ }
+ }
override fun await() = IO<Unit> {
ctx.channel().closeFuture().sync()
}
companion object {
- private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
+ private val logger = Logger(NettyServerHandle::class)
}
}