From d7532776b9d608632b91a6c658fcd72ca7c70d64 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 22 Jan 2019 11:43:18 +0100 Subject: Close KafkaSender when handling SIGINT Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk --- .../onap/dcae/collectors/veshv/utils/Closeable.kt | 40 ++++++++++++++++++++++ .../dcae/collectors/veshv/utils/arrow/effects.kt | 11 +++++- .../dcae/collectors/veshv/utils/server_handle.kt | 9 ++--- 3 files changed, 52 insertions(+), 8 deletions(-) create mode 100644 sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt (limited to 'sources/hv-collector-utils/src/main/kotlin/org') diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt new file mode 100644 index 00000000..00b814cc --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.binding + +/** + * @author Piotr Jaszczyk + * @since January 2019 + */ +interface Closeable { + fun close(): IO = IO.unit + + companion object { + fun closeAll(closeables: Iterable) = + IO.monadError().binding { + closeables.forEach { it.close().bind() } + }.fix() + } +} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index 3c2c64ac..290ef72c 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -23,6 +23,9 @@ import arrow.core.Either import arrow.core.Left import arrow.core.Right import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.binding import reactor.core.publisher.Flux import reactor.core.publisher.Mono import kotlin.system.exitProcess @@ -46,7 +49,7 @@ object ExitSuccess : ExitCode() { data class ExitFailure(override val code: Int) : ExitCode() -fun Either, IO>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = +inline fun Either, IO>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) fun IO.unit() = map { Unit } @@ -66,3 +69,9 @@ fun Flux>.evaluateIo(): Flux = { Flux.just(it) } ) } + +inline fun IO.then(crossinline block: (T) -> Unit): IO = + map { + block(it) + it + } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index b8784c64..5b582ed5 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.netty.DisposableServer import java.time.Duration @@ -28,8 +27,7 @@ import java.time.Duration * @author Piotr Jaszczyk * @since August 2018 */ -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO +abstract class ServerHandle(val host: String, val port: Int) : Closeable { abstract fun await(): IO } @@ -38,10 +36,8 @@ abstract class ServerHandle(val host: String, val port: Int) { * @since August 2018 */ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun shutdown() = IO { - logger.info { "Graceful shutdown" } + override fun close() = IO { ctx.disposeNow(SHUTDOWN_TIMEOUT) - logger.info { "Server disposed" } } override fun await() = IO { @@ -49,7 +45,6 @@ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.ho } companion object { - val logger = Logger(NettyServerHandle::class) private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) } } -- cgit 1.2.3-korg