diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 5e7d9f57..aa76ce3e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import arrow.effects.IO import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG @@ -30,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions @@ -47,7 +50,13 @@ internal class KafkaSinkProvider internal constructor( override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) + override fun close() = IO { + kafkaSender.close() + logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + } + companion object { + private val logger = Logger(KafkaSinkProvider::class) private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 |