diff options
7 files changed, 57 insertions, 27 deletions
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt index 9d875571..4047e21a 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt @@ -137,4 +137,4 @@ private fun option(conf: OptionDSL.() -> Unit): Option { .hasArg(dsl.hasArgument) .desc(dsl.desc) .build() -}
\ No newline at end of file +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index 52bcf1e4..18999153 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -41,9 +41,10 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job = GlobalScope.launch(dispatcher) { - val topicPartitions = topics.flatMap { - listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2)) - } + + val topicPartitions = topics.flatMap(kafkaConsumer::partitionsFor) + .map { TopicPartition(it.topic(), it.partition()) } + kafkaConsumer.assign(topicPartitions) while (isActive) { diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt index bb0bfe1d..306a5793 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt @@ -137,4 +137,4 @@ internal object ArgKafkaConsumerConfigurationTest : Spek({ } } } -})
\ No newline at end of file +}) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt index 26616f1c..faa700bb 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt @@ -55,11 +55,11 @@ object OffsetKafkaConsumerTest : Spek({ val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) on("started OffsetKafkaConsumer") { - val topicPartition = createTopicPartition(topicName) + val topicPartition = createTopicPartition(topicName, 0) val topicPartitions = setOf(topicPartition) whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) - .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset)) + .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset1)) runBlockingTest(testDispatcher) { val job = offsetKafkaConsumer.start(updateIntervalInMs) @@ -67,7 +67,35 @@ object OffsetKafkaConsumerTest : Spek({ } it("should notify offset changed with $topicName") { - verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition) + verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition) + } + } + } + + given("single topicName and multiple partitions") { + val topicName = "topicName" + val topics = setOf(topicName) + val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) + + on("started OffsetKafkaConsumer") { + val topicPartition1 = createTopicPartition(topicName, 0) + val topicPartition2 = createTopicPartition(topicName, 2) + val topicPartition3 = createTopicPartition(topicName, 3) + val topicPartitions = setOf(topicPartition1, topicPartition2, topicPartition3) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(mapOf<TopicPartition, Long>( + topicPartition1 to newOffset1, topicPartition2 to newOffset2, topicPartition3 to newOffset3)) + + runBlockingTest(testDispatcher) { + val job = offsetKafkaConsumer.start(updateIntervalInMs) + job.cancelAndJoin() + } + + it("should notify offset changed with $topicName") { + verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition1) + verify(mockedMetrics).notifyOffsetChanged(newOffset2, topicPartition2) + verify(mockedMetrics).notifyOffsetChanged(newOffset3, topicPartition3) } } } @@ -80,18 +108,18 @@ object OffsetKafkaConsumerTest : Spek({ val offsetArgumentCaptor = argumentCaptor<Long>() val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>() - val topicPartition1 = createTopicPartition(topicName1) - val topicPartition2 = createTopicPartition(topicName2) + val topicPartition1 = createTopicPartition(topicName1, 0) + val topicPartition2 = createTopicPartition(topicName2, 0) val topicPartitions = setOf(topicPartition1, topicPartition2) whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) val partitionToOffset1 = - mapOf(topicPartition1 to newOffset, - topicPartition2 to anotherNewOffset) + mapOf(topicPartition1 to newOffset1, + topicPartition2 to newOffset2) val partitionToOffset2 = - mapOf(topicPartition1 to anotherNewOffset, - topicPartition2 to newOffset) + mapOf(topicPartition1 to newOffset2, + topicPartition2 to newOffset1) whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) .thenReturn(partitionToOffset1, partitionToOffset2) @@ -103,13 +131,13 @@ object OffsetKafkaConsumerTest : Spek({ ) it("should notify offset changed with proper arguments - before interval"){ - assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) + assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset1) assertThat(topicPartitionArgumentCaptor.firstValue.topic()) .isEqualToIgnoringCase(topicPartition1.topic()) assertThat(topicPartitionArgumentCaptor.firstValue.partition()) .isEqualTo(topicPartition1.partition()) - assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) + assertThat(offsetArgumentCaptor.secondValue).isEqualTo(newOffset2) assertThat(topicPartitionArgumentCaptor.secondValue.topic()) .isEqualToIgnoringCase(topicPartition2.topic()) assertThat(topicPartitionArgumentCaptor.secondValue.partition()) @@ -126,13 +154,13 @@ object OffsetKafkaConsumerTest : Spek({ ) it("should notify offset changed with proper arguments - after interval") { - assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset) + assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(newOffset2) assertThat(topicPartitionArgumentCaptor.thirdValue.topic()) .isEqualToIgnoringCase(topicPartition1.topic()) assertThat(topicPartitionArgumentCaptor.thirdValue.partition()) .isEqualTo(topicPartition1.partition()) - assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset) + assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset1) assertThat(topicPartitionArgumentCaptor.lastValue.topic()) .isEqualToIgnoringCase(topicPartition2.topic()) assertThat(topicPartitionArgumentCaptor.lastValue.partition()) @@ -167,10 +195,10 @@ object OffsetKafkaConsumerTest : Spek({ }) private const val updateIntervalInMs = 10L -private const val partitionNumber = 0 -private const val newOffset = 2L -private const val anotherNewOffset = 10L +private const val newOffset1 = 2L +private const val newOffset2 = 10L +private const val newOffset3 = 125L private const val topicName1 = "topicName1" private const val topicName2 = "topicName2" private const val topicsAmount = 2 -fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file +fun createTopicPartition(topic: String, number: Int) = TopicPartition(topic, number) diff --git a/tools/performance/cloud/consumer-deployment.yaml b/tools/performance/cloud/consumer-deployment.yaml index c0692ca3..9ce5e650 100755 --- a/tools/performance/cloud/consumer-deployment.yaml +++ b/tools/performance/cloud/consumer-deployment.yaml @@ -42,7 +42,7 @@ spec: spec: containers: - name: kafka-consumer-counting - image: nexus.onap.dyn.nesc.nokia.net:10001/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer:latest + image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer:latest ports: - containerPort: 8080 env: @@ -86,7 +86,7 @@ spec: spec: containers: - name: kafka-processing-consumer - image: nexus.onap.dyn.nesc.nokia.net:10001/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer:latest + image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer:latest ports: - containerPort: 8080 env: diff --git a/tools/performance/cloud/producer-pod.yaml b/tools/performance/cloud/producer-pod.yaml index 5b41c4e0..53821a41 100755 --- a/tools/performance/cloud/producer-pod.yaml +++ b/tools/performance/cloud/producer-pod.yaml @@ -26,7 +26,8 @@ metadata: spec: containers: - name: hv-collector-producer - image: the-a-team-registry-local.esisoj70.emea.nsn-net.net/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-go-client:latest + imagePullPolicy: IfNotPresent + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-go-client:latest volumeMounts: - name: ssl-certs mountPath: /ssl @@ -78,4 +79,4 @@ spec: - name: ssl-certs secret: secretName: cert - restartPolicy: Never
\ No newline at end of file + restartPolicy: Never diff --git a/tools/performance/cloud/reboot-test-environment.sh b/tools/performance/cloud/reboot-test-environment.sh index f2dedf50..0fb916f0 100755 --- a/tools/performance/cloud/reboot-test-environment.sh +++ b/tools/performance/cloud/reboot-test-environment.sh @@ -20,7 +20,7 @@ ONAP_NAMESPACE=onap HVVES_POD_NAME=$(kubectl -n ${ONAP_NAMESPACE} get pods --no-headers=true -o custom-columns=:metadata.name | grep hv-ves-collector) HVVES_CONTAINER_NAME=dep-dcae-hv-ves-collector -HV_VES_IMAGE="nexus.onap.dyn.nesc.nokia.net:10001/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest" +HV_VES_IMAGE="nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest" KAFKA_ROUTER_0_POD_NAME=$(kubectl -n ${ONAP_NAMESPACE} get pods --no-headers=true -o custom-columns=:metadata.name | grep router-kafka-0) KAFKA_TOPIC_RESET_CMD='kafka-topics --delete --zookeeper message-router-zookeeper:2181 --topic HV_VES_PERF3GPP' HIDE_OUTPUT='grep abc | grep 123' |