diff options
Diffstat (limited to 'sources/hv-collector-main/src')
4 files changed, 26 insertions, 19 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc207ef8..8b0a38bb 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers +import java.time.Duration import java.util.concurrent.atomic.AtomicReference @@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference<ServerHandle>() private val configurationModule = ConfigurationModule() +private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array<String>) { val configStateListener = object : ConfigurationStateListener { @@ -60,30 +63,36 @@ fun main(args: Array<String>) { logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) } HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } - .doOnNext(::startServer) + .flatMap(::startServer) .doOnError(::logServerStartFailed) .then() .block() } -private fun startServer(config: HvVesConfiguration) { - stopRunningServer() +private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> = + stopRunningServer() + .timeout(maxCloseTime) + .then(deferredVesServer(config)) + .doOnNext { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } + +private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - - VesServer.start(config).let { - registerShutdownHook { shutdownGracefully(it) } - hvVesServer.set(it) - } + VesServer.start(config) } -private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() +private fun stopRunningServer() = Mono.defer { + hvVesServer.get()?.close() ?: Mono.empty() +} internal fun shutdownGracefully(serverHandle: ServerHandle, healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - serverHandle.close().unsafeRunSync() + serverHandle.close().block(maxCloseTime) logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index c079cc59..fc4d8662 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,11 +39,10 @@ object VesServer { private val logger = Logger(VesServer::class) - fun start(config: HvVesConfiguration): ServerHandle = + fun start(config: HvVesConfiguration): Mono<ServerHandle> = createVesServer(config) .start() - .then(::logServerStarted) - .unsafeRunSync() + .doOnNext(::logServerStarted) private fun createVesServer(config: HvVesConfiguration): Server = initializeCollectorFactory(config) diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 21c1fa31..539f7c2c 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -91,6 +91,7 @@ </appender> <logger name="reactor.netty" level="WARN"/> + <logger name="reactor.netty.tcp.TcpServer" level="OFF"/> <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> <logger name="org.apache.kafka" level="INFO"/> diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index d8de9f25..a967fba0 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.inOrder import com.nhaarman.mockitokotlin2.mock @@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Mono /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle internal object MainTest : Spek({ describe("closeServer shutdown hook") { given("server handles and health state") { - val handle = mock<ServerHandle>() + val handle: ServerHandle = mock() var closed = false - val handleClose = IO { - closed = true - } - whenever(handle.close()).thenReturn(handleClose) + whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true }) val healthState: HealthState = mock() on("shutdownGracefully") { |