summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 11:43:18 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 14:30:32 +0100
commitd7532776b9d608632b91a6c658fcd72ca7c70d64 (patch)
tree0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-main
parent4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff)
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt45
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt4
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml4
3 files changed, 32 insertions, 21 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 16da3721..d865bcf5 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -26,7 +26,11 @@ import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -37,24 +41,29 @@ private val logger = Logger("$VESHV_PACKAGE.main")
private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
fun main(args: Array<String>) =
- ArgVesHvConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startAndAwaitServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- { logger.info { "Finished" } }
- )
+ ArgVesHvConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startAndAwaitServers)
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+ ExitFailure(1)
+ },
+ { logger.debug(ServiceContext::mdc) { "Finished" } }
+ )
private fun startAndAwaitServers(config: ServerConfiguration) =
- IO.monad().binding {
- Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
- logger.info { "Using configuration: $config" }
- HealthCheckServer.start(config).bind()
- VesServer.start(config).bind().run {
- registerShutdownHook(shutdown()).bind()
- await().bind()
+ IO.monad().binding {
+ Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
+ logger.info { "Using configuration: $config" }
+ val healthCheckServerHandle = HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind().let { handle ->
+ registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
+ handle.await().bind()
+ }
+ }.fix()
+
+private fun closeServers(vararg handles: ServerHandle): IO<Unit> =
+ Closeable.closeAll(handles.asIterable()).then {
+ logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
- }.fix()
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
index 13b0bc7b..3d1a2a21 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -21,7 +21,9 @@ package org.onap.dcae.collectors.veshv.main.servers
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
@@ -31,7 +33,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
abstract class ServerStarter {
fun start(config: ServerConfiguration): IO<ServerHandle> =
startServer(config)
- .map { logger.info { serverStartedMessage(it) }; it }
+ .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
protected abstract fun serverStartedMessage(handle: ServerHandle): String
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 5ce34800..40f3c8a0 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -90,9 +90,9 @@
</appender>
<logger name="reactor.netty" level="WARN"/>
- <logger name="io.netty" level="DEBUG"/>
+ <logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
- <logger name="org.apache.kafka" level="WARN"/>
+ <logger name="org.apache.kafka" level="INFO"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>