diff options
4 files changed, 18 insertions, 4 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 2fa4f545..5e7d9f57 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -25,7 +25,8 @@ import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CON import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -import org.onap.dcae.collectors.veshv.boundary.Sink +import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.KafkaConfiguration @@ -33,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions +import java.lang.Integer.max /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -46,17 +48,28 @@ internal class KafkaSinkProvider internal constructor( override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) companion object { + private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f + private const val BUFFER_MEMORY_MULTIPLIER = 32 + private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 private fun constructKafkaSender(config: KafkaConfiguration) = KafkaSender.create(constructSenderOptions(config)) private fun constructSenderOptions(config: KafkaConfiguration) = SenderOptions.create<CommonEventHeader, VesMessage>() .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) + .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config)) + .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config)) .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) .producerProperty(RETRIES_CONFIG, 1) .producerProperty(ACKS_CONFIG, "1") .stopOnError(false) + + private fun maxRequestSize(config: KafkaConfiguration) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt() + + private fun bufferMemory(config: KafkaConfiguration) = + max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt index f65e157d..2aa2791e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt @@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since December 2018 */ -data class KafkaConfiguration(val bootstrapServers: String) +data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 3a924e48..f23154a4 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -36,7 +36,8 @@ import org.onap.dcae.collectors.veshv.model.KafkaConfiguration internal object KafkaSinkProviderTest : Spek({ describe("non functional requirements") { given("sample configuration") { - val config = KafkaConfiguration("localhost:9090") + val config = KafkaConfiguration("localhost:9090", + 1024 * 1024) val cut = KafkaSinkProvider(config) on("sample clients") { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 2311b2ba..c97486c6 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -91,7 +91,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() ServerConfiguration( serverListenAddress = InetSocketAddress(listenPort), - kafkaConfiguration = KafkaConfiguration(kafkaServers), + kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes), healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), configurationProviderParams = configurationProviderParams, securityConfiguration = security, |