diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 11:43:18 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 14:30:32 +0100 |
commit | d7532776b9d608632b91a6c658fcd72ca7c70d64 (patch) | |
tree | 0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources | |
parent | 4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff) |
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages.
Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc
Issue-ID: DCAEGEN2-1065
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources')
18 files changed, 174 insertions, 48 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index e4a73947..6a6e73fb 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.utils.Closeable import reactor.core.publisher.Flux interface Sink { @@ -42,16 +43,8 @@ interface Metrics { fun notifyClientRejected(cause: ClientRejectionCause) } -interface SinkProvider { +interface SinkProvider: Closeable { operator fun invoke(ctx: ClientContext): Sink - - companion object { - fun just(sink: Sink): SinkProvider = - object : SinkProvider { - override fun invoke( - ctx: ClientContext): Sink = sink - } - } } interface ConfigurationProvider { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 5584d61d..5c64c70b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,18 +22,19 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import java.util.* interface Collector { fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = (ClientContext) -> Option<Collector> +interface CollectorProvider : Closeable { + operator fun invoke(ctx: ClientContext): Option<Collector> +} interface Server { fun start(): IO<ServerHandle> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 861065c1..535d1baa 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.factory +import arrow.core.Option +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider @@ -60,8 +62,11 @@ class CollectorFactory(val configuration: ConfigurationProvider, } .subscribe(config::set) - return { ctx: ClientContext -> - config.getOption().map { createVesHvCollector(it, ctx) } + return object : CollectorProvider { + override fun invoke(ctx: ClientContext): Option<Collector> = + config.getOption().map { createVesHvCollector(it, ctx) } + + override fun close() = sinkProvider.close() } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 5e7d9f57..aa76ce3e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import arrow.effects.IO import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG @@ -30,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions @@ -47,7 +50,13 @@ internal class KafkaSinkProvider internal constructor( override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) + override fun close() = IO { + kafkaSender.close() + logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + } + companion object { + private val logger = Logger(KafkaSinkProvider::class) private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 725622f7..c76233f0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -63,6 +64,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, .addressSupplier { serverConfig.serverListenAddress } .configureSsl() .handle(this::handleConnection) + .doOnUnbound { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + collectorProvider.close().unsafeRunSync() + } .let { NettyServerHandle(it.bindNow()) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt index 2407eced..a72ec034 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt @@ -30,7 +30,7 @@ import java.util.* */ object ServiceContext { val instanceId = UUID.randomUUID().toString() - val serverFqdn = getHost().hostName + val serverFqdn = getHost().hostName!! val mdc = mapOf( OnapMdc.INSTANCE_ID to instanceId, diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index f23154a4..2db6a152 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -20,6 +20,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.syntax.collections.tail +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -28,6 +30,9 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.ves.VesEventOuterClass +import reactor.kafka.sender.KafkaSender /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -61,5 +66,18 @@ internal object KafkaSinkProviderTest : Spek({ } } } + + given("dummy KafkaSender") { + val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock() + val cut = KafkaSinkProvider(kafkaSender) + + on("close") { + cut.close().unsafeRunSync() + + it("should close KafkaSender") { + verify(kafkaSender).close() + } + } + } } }) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index c3e4a581..30661e84 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.tests.component import arrow.core.getOrElse +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator @@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.* import reactor.core.publisher.Flux import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) { +class Sut(sink: Sink = StoringSink()): AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() + val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( configurationProvider, - SinkProvider.just(sink), + sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, healthStateProvider) @@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) { throw IllegalStateException("Collector not available.") } + override fun close() { + collectorProvider.close().unsafeRunSync() + } + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } +class DummySinkProvider(private val sink: Sink) : SinkProvider { + private val active = AtomicBoolean(true) + + override fun invoke(ctx: ClientContext) = sink + + override fun close() = IO { + active.set(false) + } + + val closed get() = !active.get() + +} + private val timeout = Duration.ofSeconds(10) fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 75e7cf0e..ed46b119 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -61,6 +61,14 @@ object VesHvSpecification : Spek({ .describedAs("should send all events") .hasSize(2) } + + it("should close sink when closing collector provider") { + val (sut, _) = vesHvWithStoringSink() + + sut.close() + + assertThat(sut.sinkProvider.closed).isTrue() + } } describe("Memory management") { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index b4ce6499..2e7065b2 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes -import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index 7aade34b..32486009 100644 --- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.netty.DisposableServer import reactor.netty.http.server.HttpServer import reactor.netty.http.server.HttpServerRequest import reactor.netty.http.server.HttpServerResponse @@ -48,7 +49,10 @@ class HealthCheckApiServer(private val healthState: HealthState, fun start(): IO<ServerHandle> = IO { healthState().subscribe(healthDescription::set) val ctx = HttpServer.create() - .tcpConfiguration { it.addressSupplier { listenAddress } } + .tcpConfiguration { + it.addressSupplier { listenAddress } + .doOnUnbound { logClose() } + } .route { routes -> routes.get("/health/ready", ::readinessHandler) routes.get("/health/alive", ::livenessHandler) @@ -70,8 +74,13 @@ class HealthCheckApiServer(private val healthState: HealthState, private fun monitoringHandler(_req: HttpServerRequest, resp: HttpServerResponse) = resp.sendString(monitoring.lastStatus()) + private fun logClose() { + logger.info { "Health Check API closed" } + } + companion object { private val logger = Logger(HealthCheckApiServer::class) + } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 16da3721..d865bcf5 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -26,7 +26,11 @@ import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -37,24 +41,29 @@ private val logger = Logger("$VESHV_PACKAGE.main") private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt" fun main(args: Array<String>) = - ArgVesHvConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.info { "Finished" } } - ) + ArgVesHvConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map(::startAndAwaitServers) + .unsafeRunEitherSync( + { ex -> + logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } + ExitFailure(1) + }, + { logger.debug(ServiceContext::mdc) { "Finished" } } + ) private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } - HealthCheckServer.start(config).bind() - VesServer.start(config).bind().run { - registerShutdownHook(shutdown()).bind() - await().bind() + IO.monad().binding { + Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() + VesServer.start(config).bind().let { handle -> + registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() + handle.await().bind() + } + }.fix() + +private fun closeServers(vararg handles: ServerHandle): IO<Unit> = + Closeable.closeAll(handles.asIterable()).then { + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } - }.fix() diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt index 13b0bc7b..3d1a2a21 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -21,7 +21,9 @@ package org.onap.dcae.collectors.veshv.main.servers import arrow.effects.IO import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger /** @@ -31,7 +33,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger abstract class ServerStarter { fun start(config: ServerConfiguration): IO<ServerHandle> = startServer(config) - .map { logger.info { serverStartedMessage(it) }; it } + .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } } protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> protected abstract fun serverStartedMessage(handle: ServerHandle): String diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 5ce34800..40f3c8a0 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -90,9 +90,9 @@ </appender> <logger name="reactor.netty" level="WARN"/> - <logger name="io.netty" level="DEBUG"/> + <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> - <logger name="org.apache.kafka" level="WARN"/> + <logger name="org.apache.kafka" level="INFO"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml index 5bd24729..6ce74bdb 100644 --- a/sources/hv-collector-utils/pom.xml +++ b/sources/hv-collector-utils/pom.xml @@ -82,6 +82,10 @@ </dependency> <dependency> <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects-instances</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> <artifactId>arrow-syntax</artifactId> </dependency> <dependency> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt new file mode 100644 index 00000000..00b814cc --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.binding + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since January 2019 + */ +interface Closeable { + fun close(): IO<Unit> = IO.unit + + companion object { + fun closeAll(closeables: Iterable<Closeable>) = + IO.monadError().binding { + closeables.forEach { it.close().bind() } + }.fix() + } +} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index 3c2c64ac..290ef72c 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -23,6 +23,9 @@ import arrow.core.Either import arrow.core.Left import arrow.core.Right import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monadError.monadError +import arrow.typeclasses.binding import reactor.core.publisher.Flux import reactor.core.publisher.Mono import kotlin.system.exitProcess @@ -46,7 +49,7 @@ object ExitSuccess : ExitCode() { data class ExitFailure(override val code: Int) : ExitCode() -fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = +inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) fun IO<Any>.unit() = map { Unit } @@ -66,3 +69,9 @@ fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = { Flux.just<T>(it) } ) } + +inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> = + map { + block(it) + it + } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index b8784c64..5b582ed5 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.netty.DisposableServer import java.time.Duration @@ -28,8 +27,7 @@ import java.time.Duration * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO<Unit> +abstract class ServerHandle(val host: String, val port: Int) : Closeable { abstract fun await(): IO<Unit> } @@ -38,10 +36,8 @@ abstract class ServerHandle(val host: String, val port: Int) { * @since August 2018 */ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun shutdown() = IO { - logger.info { "Graceful shutdown" } + override fun close() = IO { ctx.disposeNow(SHUTDOWN_TIMEOUT) - logger.info { "Server disposed" } } override fun await() = IO<Unit> { @@ -49,7 +45,6 @@ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.ho } companion object { - val logger = Logger(NettyServerHandle::class) private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) } } |