diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-02 15:40:46 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-03 08:51:03 +0200 |
commit | 302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch) | |
tree | c9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-main/src/main | |
parent | 6a00e38550fd1745c3377da2099bf5a615f69053 (diff) |
Fix shutting down when new config received bug
When new configuration has been received and at least one client
connection has been active the collector used to shut down.
Also got rid of some more IO monad usage.
Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d
Issue-ID: DCAEGEN2-1382
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main/src/main')
3 files changed, 23 insertions, 13 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc207ef8..8b0a38bb 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers +import java.time.Duration import java.util.concurrent.atomic.AtomicReference @@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference<ServerHandle>() private val configurationModule = ConfigurationModule() +private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array<String>) { val configStateListener = object : ConfigurationStateListener { @@ -60,30 +63,36 @@ fun main(args: Array<String>) { logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) } HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } - .doOnNext(::startServer) + .flatMap(::startServer) .doOnError(::logServerStartFailed) .then() .block() } -private fun startServer(config: HvVesConfiguration) { - stopRunningServer() +private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> = + stopRunningServer() + .timeout(maxCloseTime) + .then(deferredVesServer(config)) + .doOnNext { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } + +private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - - VesServer.start(config).let { - registerShutdownHook { shutdownGracefully(it) } - hvVesServer.set(it) - } + VesServer.start(config) } -private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() +private fun stopRunningServer() = Mono.defer { + hvVesServer.get()?.close() ?: Mono.empty() +} internal fun shutdownGracefully(serverHandle: ServerHandle, healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - serverHandle.close().unsafeRunSync() + serverHandle.close().block(maxCloseTime) logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index c079cc59..fc4d8662 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,11 +39,10 @@ object VesServer { private val logger = Logger(VesServer::class) - fun start(config: HvVesConfiguration): ServerHandle = + fun start(config: HvVesConfiguration): Mono<ServerHandle> = createVesServer(config) .start() - .then(::logServerStarted) - .unsafeRunSync() + .doOnNext(::logServerStarted) private fun createVesServer(config: HvVesConfiguration): Server = initializeCollectorFactory(config) diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 21c1fa31..539f7c2c 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -91,6 +91,7 @@ </appender> <logger name="reactor.netty" level="WARN"/> + <logger name="reactor.netty.tcp.TcpServer" level="OFF"/> <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> <logger name="org.apache.kafka" level="INFO"/> |