aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoanna Jeremicz <joanna.jeremicz@nokia.com>2020-04-28 13:25:46 +0200
committerJoanna Jeremicz <joanna.jeremicz@nokia.com>2020-04-30 13:16:40 +0200
commitf3b6f88fe48da4808f2521cb0769cc77caf4f3fb (patch)
tree45e741e4cf4de5a6d6c5eff01a8d98c667516805
parent66254ca1c5df0d7971764799088c8f20c79f4ca7 (diff)
Make offset consumer more generic
Offsets are now correctly read despite of partitions number on topic. Issue-ID: DCAEGEN2-1576 Change-Id: Id89bebbcf1b6181a2f9fd5a4a4600329d046c5e5 Signed-off-by: Joanna Jeremicz <joanna.jeremicz@nokia.com>
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt2
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt7
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt2
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt62
4 files changed, 51 insertions, 22 deletions
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
index 9d875571..4047e21a 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
@@ -137,4 +137,4 @@ private fun option(conf: OptionDSL.() -> Unit): Option {
.hasArg(dsl.hasArgument)
.desc(dsl.desc)
.build()
-} \ No newline at end of file
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index 52bcf1e4..18999153 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -41,9 +41,10 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
GlobalScope.launch(dispatcher) {
- val topicPartitions = topics.flatMap {
- listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
- }
+
+ val topicPartitions = topics.flatMap(kafkaConsumer::partitionsFor)
+ .map { TopicPartition(it.topic(), it.partition()) }
+
kafkaConsumer.assign(topicPartitions)
while (isActive) {
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt
index bb0bfe1d..306a5793 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt
@@ -137,4 +137,4 @@ internal object ArgKafkaConsumerConfigurationTest : Spek({
}
}
}
-}) \ No newline at end of file
+})
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
index 26616f1c..faa700bb 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
@@ -55,11 +55,11 @@ object OffsetKafkaConsumerTest : Spek({
val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
on("started OffsetKafkaConsumer") {
- val topicPartition = createTopicPartition(topicName)
+ val topicPartition = createTopicPartition(topicName, 0)
val topicPartitions = setOf(topicPartition)
whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
- .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+ .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset1))
runBlockingTest(testDispatcher) {
val job = offsetKafkaConsumer.start(updateIntervalInMs)
@@ -67,7 +67,35 @@ object OffsetKafkaConsumerTest : Spek({
}
it("should notify offset changed with $topicName") {
- verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
+ verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition)
+ }
+ }
+ }
+
+ given("single topicName and multiple partitions") {
+ val topicName = "topicName"
+ val topics = setOf(topicName)
+ val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
+
+ on("started OffsetKafkaConsumer") {
+ val topicPartition1 = createTopicPartition(topicName, 0)
+ val topicPartition2 = createTopicPartition(topicName, 2)
+ val topicPartition3 = createTopicPartition(topicName, 3)
+ val topicPartitions = setOf(topicPartition1, topicPartition2, topicPartition3)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(mapOf<TopicPartition, Long>(
+ topicPartition1 to newOffset1, topicPartition2 to newOffset2, topicPartition3 to newOffset3))
+
+ runBlockingTest(testDispatcher) {
+ val job = offsetKafkaConsumer.start(updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should notify offset changed with $topicName") {
+ verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition1)
+ verify(mockedMetrics).notifyOffsetChanged(newOffset2, topicPartition2)
+ verify(mockedMetrics).notifyOffsetChanged(newOffset3, topicPartition3)
}
}
}
@@ -80,18 +108,18 @@ object OffsetKafkaConsumerTest : Spek({
val offsetArgumentCaptor = argumentCaptor<Long>()
val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
- val topicPartition1 = createTopicPartition(topicName1)
- val topicPartition2 = createTopicPartition(topicName2)
+ val topicPartition1 = createTopicPartition(topicName1, 0)
+ val topicPartition2 = createTopicPartition(topicName2, 0)
val topicPartitions = setOf(topicPartition1, topicPartition2)
whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
val partitionToOffset1 =
- mapOf(topicPartition1 to newOffset,
- topicPartition2 to anotherNewOffset)
+ mapOf(topicPartition1 to newOffset1,
+ topicPartition2 to newOffset2)
val partitionToOffset2 =
- mapOf(topicPartition1 to anotherNewOffset,
- topicPartition2 to newOffset)
+ mapOf(topicPartition1 to newOffset2,
+ topicPartition2 to newOffset1)
whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
.thenReturn(partitionToOffset1, partitionToOffset2)
@@ -103,13 +131,13 @@ object OffsetKafkaConsumerTest : Spek({
)
it("should notify offset changed with proper arguments - before interval"){
- assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+ assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset1)
assertThat(topicPartitionArgumentCaptor.firstValue.topic())
.isEqualToIgnoringCase(topicPartition1.topic())
assertThat(topicPartitionArgumentCaptor.firstValue.partition())
.isEqualTo(topicPartition1.partition())
- assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+ assertThat(offsetArgumentCaptor.secondValue).isEqualTo(newOffset2)
assertThat(topicPartitionArgumentCaptor.secondValue.topic())
.isEqualToIgnoringCase(topicPartition2.topic())
assertThat(topicPartitionArgumentCaptor.secondValue.partition())
@@ -126,13 +154,13 @@ object OffsetKafkaConsumerTest : Spek({
)
it("should notify offset changed with proper arguments - after interval") {
- assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset)
+ assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(newOffset2)
assertThat(topicPartitionArgumentCaptor.thirdValue.topic())
.isEqualToIgnoringCase(topicPartition1.topic())
assertThat(topicPartitionArgumentCaptor.thirdValue.partition())
.isEqualTo(topicPartition1.partition())
- assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset)
+ assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset1)
assertThat(topicPartitionArgumentCaptor.lastValue.topic())
.isEqualToIgnoringCase(topicPartition2.topic())
assertThat(topicPartitionArgumentCaptor.lastValue.partition())
@@ -167,10 +195,10 @@ object OffsetKafkaConsumerTest : Spek({
})
private const val updateIntervalInMs = 10L
-private const val partitionNumber = 0
-private const val newOffset = 2L
-private const val anotherNewOffset = 10L
+private const val newOffset1 = 2L
+private const val newOffset2 = 10L
+private const val newOffset3 = 125L
private const val topicName1 = "topicName1"
private const val topicName2 = "topicName2"
private const val topicsAmount = 2
-fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber) \ No newline at end of file
+fun createTopicPartition(topic: String, number: Int) = TopicPartition(topic, number)