diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 11:43:18 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 14:30:32 +0100 |
commit | d7532776b9d608632b91a6c658fcd72ca7c70d64 (patch) | |
tree | 0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-utils/src/main/kotlin | |
parent | 4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff) |
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages.
Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc
Issue-ID: DCAEGEN2-1065
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-utils/src/main/kotlin')
3 files changed, 52 insertions, 8 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt new file mode 100644 index 00000000..00b814cc --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.binding + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since January 2019 + */ +interface Closeable { + fun close(): IO<Unit> = IO.unit + + companion object { + fun closeAll(closeables: Iterable<Closeable>) = + IO.monadError().binding { + closeables.forEach { it.close().bind() } + }.fix() + } +} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index 3c2c64ac..290ef72c 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -23,6 +23,9 @@ import arrow.core.Either import arrow.core.Left import arrow.core.Right import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.binding import reactor.core.publisher.Flux import reactor.core.publisher.Mono import kotlin.system.exitProcess @@ -46,7 +49,7 @@ object ExitSuccess : ExitCode() { data class ExitFailure(override val code: Int) : ExitCode() -fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = +inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) fun IO<Any>.unit() = map { Unit } @@ -66,3 +69,9 @@ fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = { Flux.just<T>(it) } ) } + +inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> = + map { + block(it) + it + } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index b8784c64..5b582ed5 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.netty.DisposableServer import java.time.Duration @@ -28,8 +27,7 @@ import java.time.Duration * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO<Unit> +abstract class ServerHandle(val host: String, val port: Int) : Closeable { abstract fun await(): IO<Unit> } @@ -38,10 +36,8 @@ abstract class ServerHandle(val host: String, val port: Int) { * @since August 2018 */ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun shutdown() = IO { - logger.info { "Graceful shutdown" } + override fun close() = IO { ctx.disposeNow(SHUTDOWN_TIMEOUT) - logger.info { "Server disposed" } } override fun await() = IO<Unit> { @@ -49,7 +45,6 @@ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.ho } companion object { - val logger = Logger(NettyServerHandle::class) private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) } } |