summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt26
1 files changed, 14 insertions, 12 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index 18de6fcc..52bcf1e4 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
private val topics: Set<String>,
private val metrics: Metrics,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
- : MetricsKafkaConsumer{
+ : MetricsKafkaConsumer {
- override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
GlobalScope.launch(dispatcher) {
- kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
- while (isActive) {
- val topicPartitions = kafkaConsumer.assignment()
+ val topicPartitions = topics.flatMap {
+ listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
+ }
+ kafkaConsumer.assign(topicPartitions)
- kafkaConsumer.endOffsets(topicPartitions)
- .forEach { (topicPartition, offset) ->
- update(topicPartition, offset)
- }
- kafkaConsumer.commitSync()
- delay(updateInterval)
- }
+ while (isActive) {
+ kafkaConsumer.endOffsets(kafkaConsumer.assignment())
+ .forEach { (topicPartition, offset) ->
+ update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
}
private fun update(topicPartition: TopicPartition, offset: Long) {