summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-05-10 11:53:21 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-05-10 11:53:51 +0200
commitdc936d27d761bde31ac5916a84efa2f48ec32b83 (patch)
tree0c52440545105d137d13c238cb54a2e0c364d167
parente3b124ab11ac8c837ff692539696d1e43fcd8247 (diff)
Enable Kafka consumer offset committing
It appears that reactor-kafka is setting auto.commit property to false, which makes our CSITs fail nondeterministically due to automatic reset of consumer offset. By acknowledging manually, we will mark every message for committing, which will be performed according to ConsumerConfiguration. This way, Kafka broker should persist consumer offset. Change-Id: I0c5156ff8df9bb3341e733e50a3c6866fdd94976 Issue-ID: DCAEGEN2-1495 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt11
1 files changed, 6 insertions, 5 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index b5b692d8..a108eba7 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018,2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
receiver.receive()
+ .doOnNext { it.receiverOffset().acknowledge() }
.also { logger.info { "Started Kafka source" } }
companion object {
@@ -50,10 +51,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
- fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
- return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
- }
-
+ fun create(bootstrapServers: String, topics: Set<String>) =
+ KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
fun createReceiverOptions(bootstrapServers: String,
topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
@@ -64,6 +63,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+ ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000",
+
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,