From d7532776b9d608632b91a6c658fcd72ca7c70d64 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 22 Jan 2019 11:43:18 +0100 Subject: Close KafkaSender when handling SIGINT Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk --- .../org/onap/dcae/collectors/veshv/main/main.kt | 45 +++++++++++++--------- .../collectors/veshv/main/servers/ServerStarter.kt | 4 +- .../src/main/resources/logback.xml | 4 +- 3 files changed, 32 insertions(+), 21 deletions(-) (limited to 'sources/hv-collector-main/src') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 16da3721..d865bcf5 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -26,7 +26,11 @@ import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -37,24 +41,29 @@ private val logger = Logger("$VESHV_PACKAGE.main") private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt" fun main(args: Array) = - ArgVesHvConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.info { "Finished" } } - ) + ArgVesHvConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map(::startAndAwaitServers) + .unsafeRunEitherSync( + { ex -> + logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } + ExitFailure(1) + }, + { logger.debug(ServiceContext::mdc) { "Finished" } } + ) private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } - HealthCheckServer.start(config).bind() - VesServer.start(config).bind().run { - registerShutdownHook(shutdown()).bind() - await().bind() + IO.monad().binding { + Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() + VesServer.start(config).bind().let { handle -> + registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() + handle.await().bind() + } + }.fix() + +private fun closeServers(vararg handles: ServerHandle): IO = + Closeable.closeAll(handles.asIterable()).then { + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } - }.fix() diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt index 13b0bc7b..3d1a2a21 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -21,7 +21,9 @@ package org.onap.dcae.collectors.veshv.main.servers import arrow.effects.IO import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger /** @@ -31,7 +33,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger abstract class ServerStarter { fun start(config: ServerConfiguration): IO = startServer(config) - .map { logger.info { serverStartedMessage(it) }; it } + .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } } protected abstract fun startServer(config: ServerConfiguration): IO protected abstract fun serverStartedMessage(handle: ServerHandle): String diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 5ce34800..40f3c8a0 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -90,9 +90,9 @@ - + - + -- cgit 1.2.3-korg