diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-05-10 11:53:21 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-05-10 11:53:51 +0200 |
commit | dc936d27d761bde31ac5916a84efa2f48ec32b83 (patch) | |
tree | 0c52440545105d137d13c238cb54a2e0c364d167 | |
parent | e3b124ab11ac8c837ff692539696d1e43fcd8247 (diff) |
Enable Kafka consumer offset committing
It appears that reactor-kafka is setting auto.commit property to false,
which makes our CSITs fail nondeterministically due to automatic reset of
consumer offset.
By acknowledging manually, we will mark every message for committing,
which will be performed according to ConsumerConfiguration.
This way, Kafka broker should persist consumer offset.
Change-Id: I0c5156ff8df9bb3341e733e50a3c6866fdd94976
Issue-ID: DCAEGEN2-1495
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r-- | sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index b5b692d8..a108eba7 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018,2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> = receiver.receive() + .doOnNext { it.receiverOffset().acknowledge() } .also { logger.info { "Started Kafka source" } } companion object { @@ -50,10 +51,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name - fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { - return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) - } - + fun create(bootstrapServers: String, topics: Set<String>) = + KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { @@ -64,6 +63,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, |