diff options
author | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-07-04 08:59:04 +0200 |
---|---|---|
committer | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-07-04 15:42:08 +0200 |
commit | 583bb983ae956f366c034fdad20df79ccef196fd (patch) | |
tree | e82e39ea78955e726eb89f32ef8729a685ea2daf /sources/hv-collector-kafka-consumer/src/main | |
parent | 3a4020ae3e810dad7979597feadc13d8b56b3bb1 (diff) |
Measure message travel time
Issue-ID: DCAEGEN2-1654
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Change-Id: Ifd6597209c5be51d5b4ff5faf7b3b1b1a2871403
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main')
-rw-r--r-- | sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt | 63 | ||||
-rw-r--r-- | sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt (renamed from sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt) | 32 | ||||
-rw-r--r-- | sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt | 39 |
3 files changed, 83 insertions, 51 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt new file mode 100644 index 00000000..d105c4a7 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + + suspend fun start(updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } + + private fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic partition $topicPartition" + } + metrics.notifyOffsetChanged(offset, topicPartition) + } + + companion object { + val logger = Logger(OffsetKafkaConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt index dd24345d..f47a66d0 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope @@ -26,23 +27,30 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventOuterClass +import java.time.Duration + +internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ -internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, - private val topics: Set<String>, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { - suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = - GlobalScope.launch(dispatcher) { + suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + GlobalScope.launch(dispatcher){ kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() - while (isActive) { - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - offsetConsumer.update(topicPartition, offset) - } + while (isActive){ + kafkaConsumer.poll(timeout).forEach(::update) kafkaConsumer.commitSync() delay(updateInterval) } } + + private fun update(record: ConsumerRecord<ByteArray, ByteArray>) { + val vesEvent = VesEventOuterClass.VesEvent.parseFrom(record.value()) + metrics.notifyMessageTravelTime(vesEvent.commonEventHeader.lastEpochMicrosec) + } } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt deleted file mode 100644 index 57a5f33f..00000000 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.state - -import org.apache.kafka.common.TopicPartition -import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics -import org.onap.dcae.collectors.veshv.utils.logging.Logger - - -internal class OffsetConsumer(private val metrics: Metrics) { - - fun update(topicPartition: TopicPartition, offset: Long) { - logger.trace { - "Current consumer offset $offset for topic partition $topicPartition" - } - metrics.notifyOffsetChanged(offset, topicPartition) - } - - companion object { - val logger = Logger(OffsetConsumer::class) - } -} |