aboutsummaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt2
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt3
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt2
4 files changed, 18 insertions, 4 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 2fa4f545..5e7d9f57 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -25,7 +25,8 @@ import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CON
import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
-import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
@@ -33,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
+import java.lang.Integer.max
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -46,17 +48,28 @@ internal class KafkaSinkProvider internal constructor(
override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
companion object {
+ private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
+ private const val BUFFER_MEMORY_MULTIPLIER = 32
+ private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
private fun constructKafkaSender(config: KafkaConfiguration) =
KafkaSender.create(constructSenderOptions(config))
private fun constructSenderOptions(config: KafkaConfiguration) =
SenderOptions.create<CommonEventHeader, VesMessage>()
.producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+ .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config))
+ .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config))
.producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
.producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
.producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
.producerProperty(RETRIES_CONFIG, 1)
.producerProperty(ACKS_CONFIG, "1")
.stopOnError(false)
+
+ private fun maxRequestSize(config: KafkaConfiguration) =
+ (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt()
+
+ private fun bufferMemory(config: KafkaConfiguration) =
+ max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
index f65e157d..2aa2791e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since December 2018
*/
-data class KafkaConfiguration(val bootstrapServers: String)
+data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index 3a924e48..f23154a4 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -36,7 +36,8 @@ import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
internal object KafkaSinkProviderTest : Spek({
describe("non functional requirements") {
given("sample configuration") {
- val config = KafkaConfiguration("localhost:9090")
+ val config = KafkaConfiguration("localhost:9090",
+ 1024 * 1024)
val cut = KafkaSinkProvider(config)
on("sample clients") {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 2311b2ba..c97486c6 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -91,7 +91,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
serverListenAddress = InetSocketAddress(listenPort),
- kafkaConfiguration = KafkaConfiguration(kafkaServers),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes),
healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,