From 583bb983ae956f366c034fdad20df79ccef196fd Mon Sep 17 00:00:00 2001 From: Izabela Zawadzka Date: Thu, 4 Jul 2019 08:59:04 +0200 Subject: Measure message travel time Issue-ID: DCAEGEN2-1654 Signed-off-by: Izabela Zawadzka Change-Id: Ifd6597209c5be51d5b4ff5faf7b3b1b1a2871403 --- .../veshv/kafkaconsumer/impl/KafkaSource.kt | 48 ----------------- .../kafkaconsumer/impl/OffsetKafkaConsumer.kt | 63 ++++++++++++++++++++++ .../kafkaconsumer/impl/ProcessingKafkaConsumer.kt | 56 +++++++++++++++++++ .../veshv/kafkaconsumer/state/OffsetConsumer.kt | 39 -------------- 4 files changed, 119 insertions(+), 87 deletions(-) delete mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt create mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt create mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt delete mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt (limited to 'sources/hv-collector-kafka-consumer/src/main/kotlin') diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt deleted file mode 100644 index dd24345d..00000000 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.impl - -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer - -internal class KafkaSource(private val kafkaConsumer: KafkaConsumer, - private val topics: Set, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { - suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = - GlobalScope.launch(dispatcher) { - kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() - while (isActive) { - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - offsetConsumer.update(topicPartition, offset) - } - kafkaConsumer.commitSync() - delay(updateInterval) - } - } -} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt new file mode 100644 index 00000000..d105c4a7 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer, + private val topics: Set, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + + suspend fun start(updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } + + private fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic partition $topicPartition" + } + metrics.notifyOffsetChanged(offset, topicPartition) + } + + companion object { + val logger = Logger(OffsetKafkaConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt new file mode 100644 index 00000000..f47a66d0 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventOuterClass +import java.time.Duration + +internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer, + private val topics: Set, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ + + suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + GlobalScope.launch(dispatcher){ + kafkaConsumer.subscribe(topics) + while (isActive){ + kafkaConsumer.poll(timeout).forEach(::update) + kafkaConsumer.commitSync() + delay(updateInterval) + } + } + + private fun update(record: ConsumerRecord) { + val vesEvent = VesEventOuterClass.VesEvent.parseFrom(record.value()) + metrics.notifyMessageTravelTime(vesEvent.commonEventHeader.lastEpochMicrosec) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt deleted file mode 100644 index 57a5f33f..00000000 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.state - -import org.apache.kafka.common.TopicPartition -import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics -import org.onap.dcae.collectors.veshv.utils.logging.Logger - - -internal class OffsetConsumer(private val metrics: Metrics) { - - fun update(topicPartition: TopicPartition, offset: Long) { - logger.trace { - "Current consumer offset $offset for topic partition $topicPartition" - } - metrics.notifyOffsetChanged(offset, topicPartition) - } - - companion object { - val logger = Logger(OffsetConsumer::class) - } -} -- cgit 1.2.3-korg