diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-02 15:40:46 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-03 08:51:03 +0200 |
commit | 302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch) | |
tree | c9b716c649deb8b14d9ace320b3f35ed22604d0e | |
parent | 6a00e38550fd1745c3377da2099bf5a615f69053 (diff) |
Fix shutting down when new config received bug
When new configuration has been received and at least one client
connection has been active the collector used to shut down.
Also got rid of some more IO monad usage.
Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d
Issue-ID: DCAEGEN2-1382
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
21 files changed, 151 insertions, 113 deletions
diff --git a/development/configuration/local.json b/development/configuration/local.json new file mode 100644 index 00000000..a1a8b533 --- /dev/null +++ b/development/configuration/local.json @@ -0,0 +1,20 @@ +{ + "logLevel": "DEBUG", + "server": { + "listenPort": 8061, + "idleTimeoutSec": 60, + "maxPayloadSizeBytes": 1048576 + }, + "cbs": { + "firstRequestDelaySec": 10, + "requestIntervalSec": 5 + }, + "security": { + "keys": { + "keyStoreFile": "development/ssl/server.p12", + "keyStorePassword": "onaponap", + "trustStoreFile": "development/ssl/trust.p12", + "trustStorePassword": "onaponap" + } + } +}
\ No newline at end of file diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml index 7f8643de..078a3cb5 100644 --- a/sources/hv-collector-commandline/pom.xml +++ b/sources/hv-collector-commandline/pom.xml @@ -41,10 +41,6 @@ <artifactId>kotlin-reflect</artifactId> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <scope>test</scope> diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index e7134e18..e15592f3 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -97,10 +97,6 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> </dependency> <dependency> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ba0a9eee..0039ef62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable @@ -36,5 +35,5 @@ interface CollectorProvider : Closeable { } interface Server { - fun start(): IO<ServerHandle> + fun start(): Mono<ServerHandle> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 1c79abd2..8fb4e80d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration, } private fun createVesHvCollector(ctx: ClientContext): Collector = - VesHvCollector( + HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index 618b818f..7d8f0cb1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -44,7 +44,7 @@ import reactor.core.publisher.Mono * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class VesHvCollector( +internal class HvVesCollector( private val clientContext: ClientContext, private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, @@ -116,6 +116,6 @@ internal class VesHvCollector( filterFailedWithLog(logger, clientContext::fullMdc, predicate) companion object { - private val logger = Logger(VesHvCollector::class) + private val logger = Logger(HvVesCollector::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 6e2e20f7..b03b89e1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing, fun route(message: VesMessage): Flux<ConsumedMessage> = routeFor(message.header) - .fold({ - metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) - logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } - logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } - Flux.empty<Route>() - }, { - logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } - Flux.just(it) - }) + .fold({ routeNotFound(message) }, { routeFound(message, it) }) .flatMap { val sinkTopic = it.sink.topicName() messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } + private fun routeNotFound(message: VesMessage): Flux<Route> { + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: $routing" } + return Flux.empty<Route>() + } + + private fun routeFound(message: VesMessage, route: Route): Flux<Route> { + logger.trace(ctx::fullMdc) { + "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION" + } + return Flux.just(route) + } + + private fun routeFor(header: CommonEventHeader) = routing.find { it.domain == header.domain }.toOption() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 2ce0f42f..7b726ab4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 7a498652..86980832 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender @@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import reactor.kafka.sender.KafkaSender import java.util.Collections.synchronizedMap @@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider { } } - override fun close() = IO { - messageSinks.values.forEach { it.close() } - logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } - } + override fun close(): Mono<Void> = + Flux.fromIterable(messageSinks.values) + .publishOn(Schedulers.elastic()) + .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close) + .then() + .doOnSuccess { + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } + } companion object { private val logger = Logger(KafkaSinkProvider::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 3e19414d..a208384a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.Option import arrow.core.getOrElse -import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider @@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { - override fun start(): IO<ServerHandle> = IO { - TcpServer.create() - .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } - .configureSsl() - .handle(this::handleConnection) - .doOnUnbound { - logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } - collectorProvider.close().unsafeRunSync() - } - .let { NettyServerHandle(it.bindNow()) } - } + override fun start(): Mono<ServerHandle> = + Mono.defer { + TcpServer.create() + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } + .configureSsl() + .handle(this::handleConnection) + .bind() + .map { + NettyServerHandle(it, closeAction()) + } + } + + private fun closeAction(): Mono<Void> = + collectorProvider.close().doOnSuccess { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + } + private fun TcpServer.configureSsl() = sslContext @@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = withNewClientContextFrom(nettyInbound, nettyOutbound) { clientContext -> - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + logger.debug(clientContext::fullMdc) { "Client connection request received" } clientContext.clientAddress .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } @@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() - logger.info(clientContext::fullMdc) { "Handling new client connection" } + logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } val collector = collectorProvider(clientContext) return collector.handleClient(clientContext, nettyInbound) } private fun Collector.handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - connection - .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) - .logConnectionClosed(clientContext) - }.run { - handleConnection(nettyInbound.createDataStream()) - } + nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + connection + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + handleConnection(nettyInbound.createDataStream()) + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b735138d..ca9d28ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.wire -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink @@ -63,26 +62,22 @@ internal class WireChunkDecoder( private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) - .unsafeRunSync() } - private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> + private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err -> when (err) { - is InvalidWireFrame -> IO { + is InvalidWireFrame -> next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { + is MissingWireFrameBytes -> { logEndOfData() next.complete() } } } - private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } + private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame -> + logDecodedWireMessage(frame) + next.next(frame) } private fun logIncomingMessage(wire: ByteBuf) { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index f79c2e46..95b9159e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { if (sinkInitialized.get()) { sink.close() } else { - IO.unit + Mono.empty() } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2430c74f..d845f7c4 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -67,7 +67,7 @@ object VesHvSpecification : Spek({ // just connecting should not create sink sut.handleConnection() - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isFalse() @@ -80,7 +80,7 @@ object VesHvSpecification : Spek({ sut.handleConnection(vesWireFrameMessage(PERF3GPP)) // when - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isTrue() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 160defdb..f1b1ba2d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque @@ -50,12 +51,9 @@ class StoringSink : Sink { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } - /* - * TOD0: if the code would look like: - * ```IO { active.set(false) }``` - * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) - */ - override fun close() = active.set(false).run { IO.unit } + override fun close(): Mono<Void> = Mono.fromRunnable { + active.set(false) + } } /** diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index edbdaa36..57f21a66 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -97,10 +97,6 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> <artifactId>arrow-syntax</artifactId> </dependency> <dependency> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc207ef8..8b0a38bb 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers +import java.time.Duration import java.util.concurrent.atomic.AtomicReference @@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference<ServerHandle>() private val configurationModule = ConfigurationModule() +private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array<String>) { val configStateListener = object : ConfigurationStateListener { @@ -60,30 +63,36 @@ fun main(args: Array<String>) { logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) } HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } - .doOnNext(::startServer) + .flatMap(::startServer) .doOnError(::logServerStartFailed) .then() .block() } -private fun startServer(config: HvVesConfiguration) { - stopRunningServer() +private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> = + stopRunningServer() + .timeout(maxCloseTime) + .then(deferredVesServer(config)) + .doOnNext { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } + +private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - - VesServer.start(config).let { - registerShutdownHook { shutdownGracefully(it) } - hvVesServer.set(it) - } + VesServer.start(config) } -private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() +private fun stopRunningServer() = Mono.defer { + hvVesServer.get()?.close() ?: Mono.empty() +} internal fun shutdownGracefully(serverHandle: ServerHandle, healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - serverHandle.close().unsafeRunSync() + serverHandle.close().block(maxCloseTime) logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index c079cc59..fc4d8662 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,11 +39,10 @@ object VesServer { private val logger = Logger(VesServer::class) - fun start(config: HvVesConfiguration): ServerHandle = + fun start(config: HvVesConfiguration): Mono<ServerHandle> = createVesServer(config) .start() - .then(::logServerStarted) - .unsafeRunSync() + .doOnNext(::logServerStarted) private fun createVesServer(config: HvVesConfiguration): Server = initializeCollectorFactory(config) diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 21c1fa31..539f7c2c 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -91,6 +91,7 @@ </appender> <logger name="reactor.netty" level="WARN"/> + <logger name="reactor.netty.tcp.TcpServer" level="OFF"/> <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> <logger name="org.apache.kafka" level="INFO"/> diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index d8de9f25..a967fba0 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.inOrder import com.nhaarman.mockitokotlin2.mock @@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Mono /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle internal object MainTest : Spek({ describe("closeServer shutdown hook") { given("server handles and health state") { - val handle = mock<ServerHandle>() + val handle: ServerHandle = mock() var closed = false - val handleClose = IO { - closed = true - } - whenever(handle.close()).thenReturn(handleClose) + whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true }) val healthState: HealthState = mock() on("shutdownGracefully") { diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt index 00b814cc..ec654b32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -19,22 +19,18 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since January 2019 */ interface Closeable { - fun close(): IO<Unit> = IO.unit + fun close(): Mono<Void> = Mono.empty() companion object { fun closeAll(closeables: Iterable<Closeable>) = - IO.monadError().binding { - closeables.forEach { it.close().bind() } - }.fix() + Flux.fromIterable(closeables).flatMap(Closeable::close).then() } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 5b582ed5..670ab4ac 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import reactor.netty.DisposableServer -import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable { * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun close() = IO { - ctx.disposeNow(SHUTDOWN_TIMEOUT) - } +class NettyServerHandle(private val ctx: DisposableServer, + private val closeAction: Mono<Void> = Mono.empty()) + : ServerHandle(ctx.host(), ctx.port()) { + + override fun close(): Mono<Void> = + Mono.just(ctx) + .filter { !it.isDisposed } + .flatMap { + closeAction.thenReturn(it) + } + .then(dispose()) + + private fun dispose(): Mono<Void> = + Mono.create { callback -> + logger.debug { "About to dispose NettyServer" } + ctx.dispose() + ctx.onDispose { + logger.debug { "Netty server disposed" } + callback.success() + } + } override fun await() = IO<Unit> { ctx.channel().closeFuture().sync() } companion object { - private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) + private val logger = Logger(NettyServerHandle::class) } } |