summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-09-24 09:46:24 +0200
committerkjaniak <kornel.janiak@nokia.com>2019-09-26 14:34:14 +0200
commit728ca6351b290a15cb7fec07f56e0752f7addd70 (patch)
tree62314472bf5e5180f671e8f8a45239d4bf46c674
parent6d815b760e927bace06451cf72972a505a080b6c (diff)
Add partition offset metric to each topic partition
Before this commit offset consumer was able to fetch offset just from one partition. This commit solve this. Change-Id: I2c2c300219e43ab422b237094ad775ca8795169e Issue-ID: DCAEGEN2-1783 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt2
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt26
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt9
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt4
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt31
-rwxr-xr-xtools/performance/cloud/cloud-based-performance-test.sh2
-rw-r--r--tools/performance/cloud/test.properties2
-rw-r--r--tools/performance/local/grafana/dashboards/performance_tests.json4
8 files changed, 50 insertions, 30 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
index 6dddd0f8..acbcbddf 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
@@ -29,4 +29,4 @@ interface MetricsKafkaConsumer {
private const val defaultUpdateInterval = 500L
private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L)
}
-} \ No newline at end of file
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index 18de6fcc..52bcf1e4 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
private val topics: Set<String>,
private val metrics: Metrics,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
- : MetricsKafkaConsumer{
+ : MetricsKafkaConsumer {
- override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
GlobalScope.launch(dispatcher) {
- kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
- while (isActive) {
- val topicPartitions = kafkaConsumer.assignment()
+ val topicPartitions = topics.flatMap {
+ listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
+ }
+ kafkaConsumer.assign(topicPartitions)
- kafkaConsumer.endOffsets(topicPartitions)
- .forEach { (topicPartition, offset) ->
- update(topicPartition, offset)
- }
- kafkaConsumer.commitSync()
- delay(updateInterval)
- }
+ while (isActive) {
+ kafkaConsumer.endOffsets(kafkaConsumer.assignment())
+ .forEach { (topicPartition, offset) ->
+ update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
}
private fun update(topicPartition: TopicPartition, offset: Long) {
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index 0af2cb22..da6a4676 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -35,8 +35,10 @@ internal class MicrometerMetrics constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- private val currentOffsetByTopicPartition = { topic: String ->
- registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
+ private val currentOffsetByTopicPartition = { topicPartition: String ->
+ registry.gauge(name(OFFSET, PARTITION, topicPartition.toLowerCase()),
+ listOf(Tag.of(PARTITION, topicPartition)),
+ AtomicLong(0))
}.memoize<String, AtomicLong>()
private val travelTime = Timer.builder(name(TRAVEL,TIME))
@@ -58,9 +60,8 @@ internal class MicrometerMetrics constructor(
companion object {
val INSTANCE by lazy { MicrometerMetrics() }
- private const val CONSUMER = "consumer"
private const val OFFSET = "offset"
- private const val TOPIC = "topic"
+ private const val PARTITION = "partition"
private const val TRAVEL = "travel"
private const val TIME = "time"
private const val PREFIX = "hv-kafka-consumer"
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
index 3bb9ebae..26616f1c 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
@@ -72,7 +72,7 @@ object OffsetKafkaConsumerTest : Spek({
}
}
- given("two topics with partition") {
+ given("two topics with one partition each") {
val topics = setOf(topicName1, topicName2)
val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
@@ -147,7 +147,7 @@ object OffsetKafkaConsumerTest : Spek({
val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher)
on("call of function start") {
- val emptyTopicPartitions = setOf(null)
+ val emptyTopicPartitions = emptySet<TopicPartition>()
whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
.thenReturn(emptyMap())
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
index 93a39ae8..cfe67df2 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -69,17 +69,34 @@ object MicrometerMetricsTest : Spek({
}
describe("Gauges") {
- val gaugeName = "$PREFIX.consumer.offset.topic"
+ val gaugeName1 = "$PREFIX.offset.partition.sample_topic-0"
+ val gaugeName2 = "$PREFIX.offset.partition.sample_topic-1"
+ val offset1 = 966L
+ val offset2 = 967L
+ val topicPartition1 = TopicPartition("sample_topic", 0)
+ val topicPartition2 = TopicPartition("sample_topic", 1)
on("notifyOffsetChanged") {
- val offset = 966L
- val topicPartition = TopicPartition("sample_topic", 1)
+ it("should update $gaugeName1") {
+ cut.notifyOffsetChanged(offset1, topicPartition1)
- it("should update $gaugeName") {
- cut.notifyOffsetChanged(offset, topicPartition)
+ registry.verifyGauge(gaugeName1) {
+ assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision)
+ }
+ }
+ }
+
+ on("two partition update") {
+ it("should update $gaugeName1") {
+ cut.notifyOffsetChanged(offset1, topicPartition1)
+ cut.notifyOffsetChanged(offset2, topicPartition2)
+
+ registry.verifyGauge(gaugeName1) {
+ assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision)
+ }
- registry.verifyGauge(gaugeName) {
- assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+ registry.verifyGauge(gaugeName2) {
+ assertThat(it.value()).isCloseTo(offset2.toDouble(), doublePrecision)
}
}
}
diff --git a/tools/performance/cloud/cloud-based-performance-test.sh b/tools/performance/cloud/cloud-based-performance-test.sh
index cc08f4b0..300bf203 100755
--- a/tools/performance/cloud/cloud-based-performance-test.sh
+++ b/tools/performance/cloud/cloud-based-performance-test.sh
@@ -77,7 +77,7 @@ function usage() {
echo "./cloud-based-performance-test.sh setup"
echo "./cloud-based-performance-test.sh start"
echo "./cloud-based-performance-test.sh start --containers 10"
- echo "./cloud-based-performance-test.sh start --containers 10"
+ echo "./cloud-based-performance-test.sh start --properties-file ~/other_test.properties"
echo "./cloud-based-performance-test.sh clean"
exit 1
}
diff --git a/tools/performance/cloud/test.properties b/tools/performance/cloud/test.properties
index bd746a15..04169e3a 100644
--- a/tools/performance/cloud/test.properties
+++ b/tools/performance/cloud/test.properties
@@ -14,6 +14,6 @@ producer.message.interval=0
# CONSUMER CONFIGURATION
# Addresses of Kafka services to consume from
-consumer.kafka.bootstrapServers=message-router-kafka-0:9093,message-router-kafka-1:9093,message-router-kafka-2:9093
+consumer.kafka.bootstrapServers=message-router-kafka:9092
# Kafka topics to subscribe to
consumer.kafka.topics=HV_VES_PERF3GPP
diff --git a/tools/performance/local/grafana/dashboards/performance_tests.json b/tools/performance/local/grafana/dashboards/performance_tests.json
index 3784a961..654fdc36 100644
--- a/tools/performance/local/grafana/dashboards/performance_tests.json
+++ b/tools/performance/local/grafana/dashboards/performance_tests.json
@@ -191,7 +191,7 @@
"tableColumn": "",
"targets": [
{
- "expr": "hv_kafka_consumer_consumer_offset_topic",
+ "expr": "hv_kafka_consumer_offset_partition_hv_ves_perf3gpp_0",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
@@ -200,7 +200,7 @@
"thresholds": "",
"timeFrom": null,
"timeShift": null,
- "title": "Current offset",
+ "title": "Current offset on partition 0",
"type": "singlestat",
"valueFontSize": "80%",
"valueMaps": [