summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-02 15:40:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-03 08:51:03 +0200
commit302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch)
treec9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-main
parent6a00e38550fd1745c3377da2099bf5a615f69053 (diff)
Fix shutting down when new config received bug
When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/pom.xml4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt29
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml1
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt9
5 files changed, 26 insertions, 23 deletions
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
index edbdaa36..57f21a66 100644
--- a/sources/hv-collector-main/pom.xml
+++ b/sources/hv-collector-main/pom.xml
@@ -97,10 +97,6 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc207ef8..8b0a38bb 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
+import java.time.Duration
import java.util.concurrent.atomic.AtomicReference
@@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main")
private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
+private val maxCloseTime = Duration.ofSeconds(10)
fun main(args: Array<String>) {
val configStateListener = object : ConfigurationStateListener {
@@ -60,30 +63,36 @@ fun main(args: Array<String>) {
logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
- .doOnNext(::startServer)
+ .flatMap(::startServer)
.doOnError(::logServerStartFailed)
.then()
.block()
}
-private fun startServer(config: HvVesConfiguration) {
- stopRunningServer()
+private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
+ stopRunningServer()
+ .timeout(maxCloseTime)
+ .then(deferredVesServer(config))
+ .doOnNext {
+ registerShutdownHook { shutdownGracefully(it) }
+ hvVesServer.set(it)
+ }
+
+private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
logger.debug(ServiceContext::mdc) { "Configuration: $config" }
-
- VesServer.start(config).let {
- registerShutdownHook { shutdownGracefully(it) }
- hvVesServer.set(it)
- }
+ VesServer.start(config)
}
-private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
+private fun stopRunningServer() = Mono.defer {
+ hvVesServer.get()?.close() ?: Mono.empty()
+}
internal fun shutdownGracefully(serverHandle: ServerHandle,
healthState: HealthState = HealthState.INSTANCE) {
logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
healthState.changeState(HealthDescription.SHUTTING_DOWN)
- serverHandle.close().unsafeRunSync()
+ serverHandle.close().block(maxCloseTime)
logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index c079cc59..fc4d8662 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,11 +39,10 @@ object VesServer {
private val logger = Logger(VesServer::class)
- fun start(config: HvVesConfiguration): ServerHandle =
+ fun start(config: HvVesConfiguration): Mono<ServerHandle> =
createVesServer(config)
.start()
- .then(::logServerStarted)
- .unsafeRunSync()
+ .doOnNext(::logServerStarted)
private fun createVesServer(config: HvVesConfiguration): Server =
initializeCollectorFactory(config)
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 21c1fa31..539f7c2c 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -91,6 +91,7 @@
</appender>
<logger name="reactor.netty" level="WARN"/>
+ <logger name="reactor.netty.tcp.TcpServer" level="OFF"/>
<logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
<logger name="org.apache.kafka" level="INFO"/>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index d8de9f25..a967fba0 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.inOrder
import com.nhaarman.mockitokotlin2.mock
@@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
internal object MainTest : Spek({
describe("closeServer shutdown hook") {
given("server handles and health state") {
- val handle = mock<ServerHandle>()
+ val handle: ServerHandle = mock()
var closed = false
- val handleClose = IO {
- closed = true
- }
- whenever(handle.close()).thenReturn(handleClose)
+ whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true })
val healthState: HealthState = mock()
on("shutdownGracefully") {