From dc936d27d761bde31ac5916a84efa2f48ec32b83 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 10 May 2019 11:53:21 +0200 Subject: Enable Kafka consumer offset committing It appears that reactor-kafka is setting auto.commit property to false, which makes our CSITs fail nondeterministically due to automatic reset of consumer offset. By acknowledging manually, we will mark every message for committing, which will be performed according to ConsumerConfiguration. This way, Kafka broker should persist consumer offset. Change-Id: I0c5156ff8df9bb3341e733e50a3c6866fdd94976 Issue-ID: DCAEGEN2-1495 Signed-off-by: Filip Krzywka --- .../veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'sources') diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index b5b692d8..a108eba7 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018,2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ internal class KafkaSource(private val receiver: KafkaReceiver> = receiver.receive() + .doOnNext { it.receiverOffset().acknowledge() } .also { logger.info { "Started Kafka source" } } companion object { @@ -50,10 +51,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver).name - fun create(bootstrapServers: String, topics: Set): KafkaSource { - return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) - } - + fun create(bootstrapServers: String, topics: Set) = + KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) fun createReceiverOptions(bootstrapServers: String, topics: Set): ReceiverOptions? { @@ -64,6 +63,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver