summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/pom.xml4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt29
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml1
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt9
5 files changed, 26 insertions, 23 deletions
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
index edbdaa36..57f21a66 100644
--- a/sources/hv-collector-main/pom.xml
+++ b/sources/hv-collector-main/pom.xml
@@ -97,10 +97,6 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc207ef8..8b0a38bb 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
+import java.time.Duration
import java.util.concurrent.atomic.AtomicReference
@@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main")
private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
+private val maxCloseTime = Duration.ofSeconds(10)
fun main(args: Array<String>) {
val configStateListener = object : ConfigurationStateListener {
@@ -60,30 +63,36 @@ fun main(args: Array<String>) {
logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
- .doOnNext(::startServer)
+ .flatMap(::startServer)
.doOnError(::logServerStartFailed)
.then()
.block()
}
-private fun startServer(config: HvVesConfiguration) {
- stopRunningServer()
+private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
+ stopRunningServer()
+ .timeout(maxCloseTime)
+ .then(deferredVesServer(config))
+ .doOnNext {
+ registerShutdownHook { shutdownGracefully(it) }
+ hvVesServer.set(it)
+ }
+
+private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
logger.debug(ServiceContext::mdc) { "Configuration: $config" }
-
- VesServer.start(config).let {
- registerShutdownHook { shutdownGracefully(it) }
- hvVesServer.set(it)
- }
+ VesServer.start(config)
}
-private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
+private fun stopRunningServer() = Mono.defer {
+ hvVesServer.get()?.close() ?: Mono.empty()
+}
internal fun shutdownGracefully(serverHandle: ServerHandle,
healthState: HealthState = HealthState.INSTANCE) {
logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
healthState.changeState(HealthDescription.SHUTTING_DOWN)
- serverHandle.close().unsafeRunSync()
+ serverHandle.close().block(maxCloseTime)
logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index c079cc59..fc4d8662 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,11 +39,10 @@ object VesServer {
private val logger = Logger(VesServer::class)
- fun start(config: HvVesConfiguration): ServerHandle =
+ fun start(config: HvVesConfiguration): Mono<ServerHandle> =
createVesServer(config)
.start()
- .then(::logServerStarted)
- .unsafeRunSync()
+ .doOnNext(::logServerStarted)
private fun createVesServer(config: HvVesConfiguration): Server =
initializeCollectorFactory(config)
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 21c1fa31..539f7c2c 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -91,6 +91,7 @@
</appender>
<logger name="reactor.netty" level="WARN"/>
+ <logger name="reactor.netty.tcp.TcpServer" level="OFF"/>
<logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
<logger name="org.apache.kafka" level="INFO"/>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index d8de9f25..a967fba0 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.inOrder
import com.nhaarman.mockitokotlin2.mock
@@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
internal object MainTest : Spek({
describe("closeServer shutdown hook") {
given("server handles and health state") {
- val handle = mock<ServerHandle>()
+ val handle: ServerHandle = mock()
var closed = false
- val handleClose = IO {
- closed = true
- }
- whenever(handle.close()).thenReturn(handleClose)
+ whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true })
val healthState: HealthState = mock()
on("shutdownGracefully") {