diff options
Diffstat (limited to 'sources')
20 files changed, 131 insertions, 113 deletions
diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml index 7f8643de..078a3cb5 100644 --- a/sources/hv-collector-commandline/pom.xml +++ b/sources/hv-collector-commandline/pom.xml @@ -41,10 +41,6 @@ <artifactId>kotlin-reflect</artifactId> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <scope>test</scope> diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index e7134e18..e15592f3 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -97,10 +97,6 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> </dependency> <dependency> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ba0a9eee..0039ef62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable @@ -36,5 +35,5 @@ interface CollectorProvider : Closeable { } interface Server { - fun start(): IO<ServerHandle> + fun start(): Mono<ServerHandle> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 1c79abd2..8fb4e80d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration, } private fun createVesHvCollector(ctx: ClientContext): Collector = - VesHvCollector( + HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index 618b818f..7d8f0cb1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -44,7 +44,7 @@ import reactor.core.publisher.Mono * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class VesHvCollector( +internal class HvVesCollector( private val clientContext: ClientContext, private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, @@ -116,6 +116,6 @@ internal class VesHvCollector( filterFailedWithLog(logger, clientContext::fullMdc, predicate) companion object { - private val logger = Logger(VesHvCollector::class) + private val logger = Logger(HvVesCollector::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 6e2e20f7..b03b89e1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing, fun route(message: VesMessage): Flux<ConsumedMessage> = routeFor(message.header) - .fold({ - metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) - logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } - logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } - Flux.empty<Route>() - }, { - logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } - Flux.just(it) - }) + .fold({ routeNotFound(message) }, { routeFound(message, it) }) .flatMap { val sinkTopic = it.sink.topicName() messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } + private fun routeNotFound(message: VesMessage): Flux<Route> { + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: $routing" } + return Flux.empty<Route>() + } + + private fun routeFound(message: VesMessage, route: Route): Flux<Route> { + logger.trace(ctx::fullMdc) { + "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION" + } + return Flux.just(route) + } + + private fun routeFor(header: CommonEventHeader) = routing.find { it.domain == header.domain }.toOption() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 2ce0f42f..7b726ab4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 7a498652..86980832 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender @@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import reactor.kafka.sender.KafkaSender import java.util.Collections.synchronizedMap @@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider { } } - override fun close() = IO { - messageSinks.values.forEach { it.close() } - logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } - } + override fun close(): Mono<Void> = + Flux.fromIterable(messageSinks.values) + .publishOn(Schedulers.elastic()) + .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close) + .then() + .doOnSuccess { + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } + } companion object { private val logger = Logger(KafkaSinkProvider::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 3e19414d..a208384a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.Option import arrow.core.getOrElse -import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider @@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { - override fun start(): IO<ServerHandle> = IO { - TcpServer.create() - .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } - .configureSsl() - .handle(this::handleConnection) - .doOnUnbound { - logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } - collectorProvider.close().unsafeRunSync() - } - .let { NettyServerHandle(it.bindNow()) } - } + override fun start(): Mono<ServerHandle> = + Mono.defer { + TcpServer.create() + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } + .configureSsl() + .handle(this::handleConnection) + .bind() + .map { + NettyServerHandle(it, closeAction()) + } + } + + private fun closeAction(): Mono<Void> = + collectorProvider.close().doOnSuccess { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + } + private fun TcpServer.configureSsl() = sslContext @@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = withNewClientContextFrom(nettyInbound, nettyOutbound) { clientContext -> - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + logger.debug(clientContext::fullMdc) { "Client connection request received" } clientContext.clientAddress .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } @@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() - logger.info(clientContext::fullMdc) { "Handling new client connection" } + logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } val collector = collectorProvider(clientContext) return collector.handleClient(clientContext, nettyInbound) } private fun Collector.handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - connection - .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) - .logConnectionClosed(clientContext) - }.run { - handleConnection(nettyInbound.createDataStream()) - } + nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + connection + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + handleConnection(nettyInbound.createDataStream()) + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b735138d..ca9d28ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.wire -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink @@ -63,26 +62,22 @@ internal class WireChunkDecoder( private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) - .unsafeRunSync() } - private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> + private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err -> when (err) { - is InvalidWireFrame -> IO { + is InvalidWireFrame -> next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { + is MissingWireFrameBytes -> { logEndOfData() next.complete() } } } - private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } + private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame -> + logDecodedWireMessage(frame) + next.next(frame) } private fun logIncomingMessage(wire: ByteBuf) { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index f79c2e46..95b9159e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { if (sinkInitialized.get()) { sink.close() } else { - IO.unit + Mono.empty() } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2430c74f..d845f7c4 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -67,7 +67,7 @@ object VesHvSpecification : Spek({ // just connecting should not create sink sut.handleConnection() - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isFalse() @@ -80,7 +80,7 @@ object VesHvSpecification : Spek({ sut.handleConnection(vesWireFrameMessage(PERF3GPP)) // when - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isTrue() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 160defdb..f1b1ba2d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque @@ -50,12 +51,9 @@ class StoringSink : Sink { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } - /* - * TOD0: if the code would look like: - * ```IO { active.set(false) }``` - * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) - */ - override fun close() = active.set(false).run { IO.unit } + override fun close(): Mono<Void> = Mono.fromRunnable { + active.set(false) + } } /** diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index edbdaa36..57f21a66 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -97,10 +97,6 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> <artifactId>arrow-syntax</artifactId> </dependency> <dependency> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc207ef8..8b0a38bb 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers +import java.time.Duration import java.util.concurrent.atomic.AtomicReference @@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference<ServerHandle>() private val configurationModule = ConfigurationModule() +private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array<String>) { val configStateListener = object : ConfigurationStateListener { @@ -60,30 +63,36 @@ fun main(args: Array<String>) { logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) } HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } - .doOnNext(::startServer) + .flatMap(::startServer) .doOnError(::logServerStartFailed) .then() .block() } -private fun startServer(config: HvVesConfiguration) { - stopRunningServer() +private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> = + stopRunningServer() + .timeout(maxCloseTime) + .then(deferredVesServer(config)) + .doOnNext { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } + +private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - - VesServer.start(config).let { - registerShutdownHook { shutdownGracefully(it) } - hvVesServer.set(it) - } + VesServer.start(config) } -private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() +private fun stopRunningServer() = Mono.defer { + hvVesServer.get()?.close() ?: Mono.empty() +} internal fun shutdownGracefully(serverHandle: ServerHandle, healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - serverHandle.close().unsafeRunSync() + serverHandle.close().block(maxCloseTime) logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index c079cc59..fc4d8662 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,11 +39,10 @@ object VesServer { private val logger = Logger(VesServer::class) - fun start(config: HvVesConfiguration): ServerHandle = + fun start(config: HvVesConfiguration): Mono<ServerHandle> = createVesServer(config) .start() - .then(::logServerStarted) - .unsafeRunSync() + .doOnNext(::logServerStarted) private fun createVesServer(config: HvVesConfiguration): Server = initializeCollectorFactory(config) diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 21c1fa31..539f7c2c 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -91,6 +91,7 @@ </appender> <logger name="reactor.netty" level="WARN"/> + <logger name="reactor.netty.tcp.TcpServer" level="OFF"/> <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> <logger name="org.apache.kafka" level="INFO"/> diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index d8de9f25..a967fba0 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.inOrder import com.nhaarman.mockitokotlin2.mock @@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Mono /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle internal object MainTest : Spek({ describe("closeServer shutdown hook") { given("server handles and health state") { - val handle = mock<ServerHandle>() + val handle: ServerHandle = mock() var closed = false - val handleClose = IO { - closed = true - } - whenever(handle.close()).thenReturn(handleClose) + whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true }) val healthState: HealthState = mock() on("shutdownGracefully") { diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt index 00b814cc..ec654b32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -19,22 +19,18 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since January 2019 */ interface Closeable { - fun close(): IO<Unit> = IO.unit + fun close(): Mono<Void> = Mono.empty() companion object { fun closeAll(closeables: Iterable<Closeable>) = - IO.monadError().binding { - closeables.forEach { it.close().bind() } - }.fix() + Flux.fromIterable(closeables).flatMap(Closeable::close).then() } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 5b582ed5..670ab4ac 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import reactor.netty.DisposableServer -import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable { * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun close() = IO { - ctx.disposeNow(SHUTDOWN_TIMEOUT) - } +class NettyServerHandle(private val ctx: DisposableServer, + private val closeAction: Mono<Void> = Mono.empty()) + : ServerHandle(ctx.host(), ctx.port()) { + + override fun close(): Mono<Void> = + Mono.just(ctx) + .filter { !it.isDisposed } + .flatMap { + closeAction.thenReturn(it) + } + .then(dispose()) + + private fun dispose(): Mono<Void> = + Mono.create { callback -> + logger.debug { "About to dispose NettyServer" } + ctx.dispose() + ctx.onDispose { + logger.debug { "Netty server disposed" } + callback.success() + } + } override fun await() = IO<Unit> { ctx.channel().closeFuture().sync() } companion object { - private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) + private val logger = Logger(NettyServerHandle::class) } } |