diff options
8 files changed, 50 insertions, 30 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt index 6dddd0f8..acbcbddf 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt @@ -29,4 +29,4 @@ interface MetricsKafkaConsumer { private const val defaultUpdateInterval = 500L private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L) } -}
\ No newline at end of file +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index 18de6fcc..52bcf1e4 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte private val topics: Set<String>, private val metrics: Metrics, private val dispatcher: CoroutineDispatcher = Dispatchers.IO) - : MetricsKafkaConsumer{ + : MetricsKafkaConsumer { - override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job = GlobalScope.launch(dispatcher) { - kafkaConsumer.assign(topics.map { TopicPartition(it, 0) }) - while (isActive) { - val topicPartitions = kafkaConsumer.assignment() + val topicPartitions = topics.flatMap { + listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2)) + } + kafkaConsumer.assign(topicPartitions) - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - update(topicPartition, offset) - } - kafkaConsumer.commitSync() - delay(updateInterval) - } + while (isActive) { + kafkaConsumer.endOffsets(kafkaConsumer.assignment()) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } } private fun update(topicPartition: TopicPartition, offset: Long) { diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index 0af2cb22..da6a4676 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -35,8 +35,10 @@ internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - private val currentOffsetByTopicPartition = { topic: String -> - registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0)) + private val currentOffsetByTopicPartition = { topicPartition: String -> + registry.gauge(name(OFFSET, PARTITION, topicPartition.toLowerCase()), + listOf(Tag.of(PARTITION, topicPartition)), + AtomicLong(0)) }.memoize<String, AtomicLong>() private val travelTime = Timer.builder(name(TRAVEL,TIME)) @@ -58,9 +60,8 @@ internal class MicrometerMetrics constructor( companion object { val INSTANCE by lazy { MicrometerMetrics() } - private const val CONSUMER = "consumer" private const val OFFSET = "offset" - private const val TOPIC = "topic" + private const val PARTITION = "partition" private const val TRAVEL = "travel" private const val TIME = "time" private const val PREFIX = "hv-kafka-consumer" diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt index 3bb9ebae..26616f1c 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt @@ -72,7 +72,7 @@ object OffsetKafkaConsumerTest : Spek({ } } - given("two topics with partition") { + given("two topics with one partition each") { val topics = setOf(topicName1, topicName2) val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) @@ -147,7 +147,7 @@ object OffsetKafkaConsumerTest : Spek({ val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher) on("call of function start") { - val emptyTopicPartitions = setOf(null) + val emptyTopicPartitions = emptySet<TopicPartition>() whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions) whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) .thenReturn(emptyMap()) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt index 93a39ae8..cfe67df2 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -69,17 +69,34 @@ object MicrometerMetricsTest : Spek({ } describe("Gauges") { - val gaugeName = "$PREFIX.consumer.offset.topic" + val gaugeName1 = "$PREFIX.offset.partition.sample_topic-0" + val gaugeName2 = "$PREFIX.offset.partition.sample_topic-1" + val offset1 = 966L + val offset2 = 967L + val topicPartition1 = TopicPartition("sample_topic", 0) + val topicPartition2 = TopicPartition("sample_topic", 1) on("notifyOffsetChanged") { - val offset = 966L - val topicPartition = TopicPartition("sample_topic", 1) + it("should update $gaugeName1") { + cut.notifyOffsetChanged(offset1, topicPartition1) - it("should update $gaugeName") { - cut.notifyOffsetChanged(offset, topicPartition) + registry.verifyGauge(gaugeName1) { + assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision) + } + } + } + + on("two partition update") { + it("should update $gaugeName1") { + cut.notifyOffsetChanged(offset1, topicPartition1) + cut.notifyOffsetChanged(offset2, topicPartition2) + + registry.verifyGauge(gaugeName1) { + assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision) + } - registry.verifyGauge(gaugeName) { - assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) + registry.verifyGauge(gaugeName2) { + assertThat(it.value()).isCloseTo(offset2.toDouble(), doublePrecision) } } } diff --git a/tools/performance/cloud/cloud-based-performance-test.sh b/tools/performance/cloud/cloud-based-performance-test.sh index cc08f4b0..300bf203 100755 --- a/tools/performance/cloud/cloud-based-performance-test.sh +++ b/tools/performance/cloud/cloud-based-performance-test.sh @@ -77,7 +77,7 @@ function usage() { echo "./cloud-based-performance-test.sh setup" echo "./cloud-based-performance-test.sh start" echo "./cloud-based-performance-test.sh start --containers 10" - echo "./cloud-based-performance-test.sh start --containers 10" + echo "./cloud-based-performance-test.sh start --properties-file ~/other_test.properties" echo "./cloud-based-performance-test.sh clean" exit 1 } diff --git a/tools/performance/cloud/test.properties b/tools/performance/cloud/test.properties index bd746a15..04169e3a 100644 --- a/tools/performance/cloud/test.properties +++ b/tools/performance/cloud/test.properties @@ -14,6 +14,6 @@ producer.message.interval=0 # CONSUMER CONFIGURATION # Addresses of Kafka services to consume from -consumer.kafka.bootstrapServers=message-router-kafka-0:9093,message-router-kafka-1:9093,message-router-kafka-2:9093 +consumer.kafka.bootstrapServers=message-router-kafka:9092 # Kafka topics to subscribe to consumer.kafka.topics=HV_VES_PERF3GPP diff --git a/tools/performance/local/grafana/dashboards/performance_tests.json b/tools/performance/local/grafana/dashboards/performance_tests.json index 3784a961..654fdc36 100644 --- a/tools/performance/local/grafana/dashboards/performance_tests.json +++ b/tools/performance/local/grafana/dashboards/performance_tests.json @@ -191,7 +191,7 @@ "tableColumn": "", "targets": [ { - "expr": "hv_kafka_consumer_consumer_offset_topic", + "expr": "hv_kafka_consumer_offset_partition_hv_ves_perf3gpp_0", "format": "time_series", "intervalFactor": 1, "refId": "A" @@ -200,7 +200,7 @@ "thresholds": "", "timeFrom": null, "timeShift": null, - "title": "Current offset", + "title": "Current offset on partition 0", "type": "singlestat", "valueFontSize": "80%", "valueMaps": [ |