From d7532776b9d608632b91a6c658fcd72ca7c70d64 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 22 Jan 2019 11:43:18 +0100 Subject: Close KafkaSender when handling SIGINT Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk --- .../onap/dcae/collectors/veshv/boundary/adapters.kt | 11 ++--------- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 7 ++++--- .../dcae/collectors/veshv/factory/CollectorFactory.kt | 9 +++++++-- .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 9 +++++++++ .../collectors/veshv/impl/socket/NettyTcpServer.kt | 5 +++++ .../onap/dcae/collectors/veshv/model/ServiceContext.kt | 2 +- .../veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt | 18 ++++++++++++++++++ 7 files changed, 46 insertions(+), 15 deletions(-) (limited to 'sources/hv-collector-core') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index e4a73947..6a6e73fb 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.utils.Closeable import reactor.core.publisher.Flux interface Sink { @@ -42,16 +43,8 @@ interface Metrics { fun notifyClientRejected(cause: ClientRejectionCause) } -interface SinkProvider { +interface SinkProvider: Closeable { operator fun invoke(ctx: ClientContext): Sink - - companion object { - fun just(sink: Sink): SinkProvider = - object : SinkProvider { - override fun invoke( - ctx: ClientContext): Sink = sink - } - } } interface ConfigurationProvider { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 5584d61d..5c64c70b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,18 +22,19 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import java.util.* interface Collector { fun handleConnection(dataStream: Flux): Mono } -typealias CollectorProvider = (ClientContext) -> Option +interface CollectorProvider : Closeable { + operator fun invoke(ctx: ClientContext): Option +} interface Server { fun start(): IO diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 861065c1..535d1baa 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.factory +import arrow.core.Option +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider @@ -60,8 +62,11 @@ class CollectorFactory(val configuration: ConfigurationProvider, } .subscribe(config::set) - return { ctx: ClientContext -> - config.getOption().map { createVesHvCollector(it, ctx) } + return object : CollectorProvider { + override fun invoke(ctx: ClientContext): Option = + config.getOption().map { createVesHvCollector(it, ctx) } + + override fun close() = sinkProvider.close() } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 5e7d9f57..aa76ce3e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import arrow.effects.IO import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG @@ -30,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions @@ -47,7 +50,13 @@ internal class KafkaSinkProvider internal constructor( override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) + override fun close() = IO { + kafkaSender.close() + logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + } + companion object { + private val logger = Logger(KafkaSinkProvider::class) private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 725622f7..c76233f0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -63,6 +64,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, .addressSupplier { serverConfig.serverListenAddress } .configureSsl() .handle(this::handleConnection) + .doOnUnbound { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + collectorProvider.close().unsafeRunSync() + } .let { NettyServerHandle(it.bindNow()) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt index 2407eced..a72ec034 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt @@ -30,7 +30,7 @@ import java.util.* */ object ServiceContext { val instanceId = UUID.randomUUID().toString() - val serverFqdn = getHost().hostName + val serverFqdn = getHost().hostName!! val mdc = mapOf( OnapMdc.INSTANCE_ID to instanceId, diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index f23154a4..2db6a152 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -20,6 +20,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.syntax.collections.tail +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -28,6 +30,9 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.ves.VesEventOuterClass +import reactor.kafka.sender.KafkaSender /** * @author Piotr Jaszczyk @@ -61,5 +66,18 @@ internal object KafkaSinkProviderTest : Spek({ } } } + + given("dummy KafkaSender") { + val kafkaSender: KafkaSender = mock() + val cut = KafkaSinkProvider(kafkaSender) + + on("close") { + cut.close().unsafeRunSync() + + it("should close KafkaSender") { + verify(kafkaSender).close() + } + } + } } }) -- cgit 1.2.3-korg