aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 11:43:18 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 14:30:32 +0100
commitd7532776b9d608632b91a6c658fcd72ca7c70d64 (patch)
tree0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-utils
parent4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff)
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-utils')
-rw-r--r--sources/hv-collector-utils/pom.xml4
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt40
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt11
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt9
4 files changed, 56 insertions, 8 deletions
diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml
index 5bd24729..6ce74bdb 100644
--- a/sources/hv-collector-utils/pom.xml
+++ b/sources/hv-collector-utils/pom.xml
@@ -82,6 +82,10 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-instances</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
new file mode 100644
index 00000000..00b814cc
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since January 2019
+ */
+interface Closeable {
+ fun close(): IO<Unit> = IO.unit
+
+ companion object {
+ fun closeAll(closeables: Iterable<Closeable>) =
+ IO.monadError().binding {
+ closeables.forEach { it.close().bind() }
+ }.fix()
+ }
+}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
index 3c2c64ac..290ef72c 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
@@ -23,6 +23,9 @@ import arrow.core.Either
import arrow.core.Left
import arrow.core.Right
import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.system.exitProcess
@@ -46,7 +49,7 @@ object ExitSuccess : ExitCode() {
data class ExitFailure(override val code: Int) : ExitCode()
-fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
+inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
fun IO<Any>.unit() = map { Unit }
@@ -66,3 +69,9 @@ fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
{ Flux.just<T>(it) }
)
}
+
+inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> =
+ map {
+ block(it)
+ it
+ }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index b8784c64..5b582ed5 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.netty.DisposableServer
import java.time.Duration
@@ -28,8 +27,7 @@ import java.time.Duration
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
+abstract class ServerHandle(val host: String, val port: Int) : Closeable {
abstract fun await(): IO<Unit>
}
@@ -38,10 +36,8 @@ abstract class ServerHandle(val host: String, val port: Int) {
* @since August 2018
*/
class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun shutdown() = IO {
- logger.info { "Graceful shutdown" }
+ override fun close() = IO {
ctx.disposeNow(SHUTDOWN_TIMEOUT)
- logger.info { "Server disposed" }
}
override fun await() = IO<Unit> {
@@ -49,7 +45,6 @@ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.ho
}
companion object {
- val logger = Logger(NettyServerHandle::class)
private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
}
}