diff options
author | Joanna Jeremicz <joanna.jeremicz@nokia.com> | 2020-04-28 13:25:46 +0200 |
---|---|---|
committer | Joanna Jeremicz <joanna.jeremicz@nokia.com> | 2020-04-30 13:16:40 +0200 |
commit | f3b6f88fe48da4808f2521cb0769cc77caf4f3fb (patch) | |
tree | 45e741e4cf4de5a6d6c5eff01a8d98c667516805 /sources/hv-collector-kafka-consumer/src/main | |
parent | 66254ca1c5df0d7971764799088c8f20c79f4ca7 (diff) |
Make offset consumer more generic
Offsets are now correctly read despite of partitions number on topic.
Issue-ID: DCAEGEN2-1576
Change-Id: Id89bebbcf1b6181a2f9fd5a4a4600329d046c5e5
Signed-off-by: Joanna Jeremicz <joanna.jeremicz@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main')
-rw-r--r-- | sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index 52bcf1e4..18999153 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -41,9 +41,10 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job = GlobalScope.launch(dispatcher) { - val topicPartitions = topics.flatMap { - listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2)) - } + + val topicPartitions = topics.flatMap(kafkaConsumer::partitionsFor) + .map { TopicPartition(it.topic(), it.partition()) } + kafkaConsumer.assign(topicPartitions) while (isActive) { |