diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 11:43:18 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 14:30:32 +0100 |
commit | d7532776b9d608632b91a6c658fcd72ca7c70d64 (patch) | |
tree | 0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-main | |
parent | 4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff) |
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages.
Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc
Issue-ID: DCAEGEN2-1065
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main')
3 files changed, 32 insertions, 21 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 16da3721..d865bcf5 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -26,7 +26,11 @@ import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -37,24 +41,29 @@ private val logger = Logger("$VESHV_PACKAGE.main") private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt" fun main(args: Array<String>) = - ArgVesHvConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.info { "Finished" } } - ) + ArgVesHvConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map(::startAndAwaitServers) + .unsafeRunEitherSync( + { ex -> + logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } + ExitFailure(1) + }, + { logger.debug(ServiceContext::mdc) { "Finished" } } + ) private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } - HealthCheckServer.start(config).bind() - VesServer.start(config).bind().run { - registerShutdownHook(shutdown()).bind() - await().bind() + IO.monad().binding { + Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() + VesServer.start(config).bind().let { handle -> + registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() + handle.await().bind() + } + }.fix() + +private fun closeServers(vararg handles: ServerHandle): IO<Unit> = + Closeable.closeAll(handles.asIterable()).then { + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } - }.fix() diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt index 13b0bc7b..3d1a2a21 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -21,7 +21,9 @@ package org.onap.dcae.collectors.veshv.main.servers import arrow.effects.IO import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger /** @@ -31,7 +33,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger abstract class ServerStarter { fun start(config: ServerConfiguration): IO<ServerHandle> = startServer(config) - .map { logger.info { serverStartedMessage(it) }; it } + .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } } protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> protected abstract fun serverStartedMessage(handle: ServerHandle): String diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 5ce34800..40f3c8a0 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -90,9 +90,9 @@ </appender> <logger name="reactor.netty" level="WARN"/> - <logger name="io.netty" level="DEBUG"/> + <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> - <logger name="org.apache.kafka" level="WARN"/> + <logger name="org.apache.kafka" level="INFO"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> |