summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt7
1 files changed, 4 insertions, 3 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index 52bcf1e4..18999153 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -41,9 +41,10 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
GlobalScope.launch(dispatcher) {
- val topicPartitions = topics.flatMap {
- listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
- }
+
+ val topicPartitions = topics.flatMap(kafkaConsumer::partitionsFor)
+ .map { TopicPartition(it.topic(), it.partition()) }
+
kafkaConsumer.assign(topicPartitions)
while (isActive) {