diff options
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt')
-rw-r--r-- | sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index 18de6fcc..52bcf1e4 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte private val topics: Set<String>, private val metrics: Metrics, private val dispatcher: CoroutineDispatcher = Dispatchers.IO) - : MetricsKafkaConsumer{ + : MetricsKafkaConsumer { - override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job = GlobalScope.launch(dispatcher) { - kafkaConsumer.assign(topics.map { TopicPartition(it, 0) }) - while (isActive) { - val topicPartitions = kafkaConsumer.assignment() + val topicPartitions = topics.flatMap { + listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2)) + } + kafkaConsumer.assign(topicPartitions) - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - update(topicPartition, offset) - } - kafkaConsumer.commitSync() - delay(updateInterval) - } + while (isActive) { + kafkaConsumer.endOffsets(kafkaConsumer.assignment()) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } } private fun update(topicPartition: TopicPartition, offset: Long) { |