aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-utils')
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt12
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt30
2 files changed, 28 insertions, 14 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
index 00b814cc..ec654b32 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
@@ -19,22 +19,18 @@
*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
interface Closeable {
- fun close(): IO<Unit> = IO.unit
+ fun close(): Mono<Void> = Mono.empty()
companion object {
fun closeAll(closeables: Iterable<Closeable>) =
- IO.monadError().binding {
- closeables.forEach { it.close().bind() }
- }.fix()
+ Flux.fromIterable(closeables).flatMap(Closeable::close).then()
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index 5b582ed5..670ab4ac 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,8 +20,9 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable {
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun close() = IO {
- ctx.disposeNow(SHUTDOWN_TIMEOUT)
- }
+class NettyServerHandle(private val ctx: DisposableServer,
+ private val closeAction: Mono<Void> = Mono.empty())
+ : ServerHandle(ctx.host(), ctx.port()) {
+
+ override fun close(): Mono<Void> =
+ Mono.just(ctx)
+ .filter { !it.isDisposed }
+ .flatMap {
+ closeAction.thenReturn(it)
+ }
+ .then(dispose())
+
+ private fun dispose(): Mono<Void> =
+ Mono.create { callback ->
+ logger.debug { "About to dispose NettyServer" }
+ ctx.dispose()
+ ctx.onDispose {
+ logger.debug { "Netty server disposed" }
+ callback.success()
+ }
+ }
override fun await() = IO<Unit> {
ctx.channel().closeFuture().sync()
}
companion object {
- private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
+ private val logger = Logger(NettyServerHandle::class)
}
}