diff options
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
4 files changed, 65 insertions, 9 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt new file mode 100644 index 00000000..6dddd0f8 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.api + +import kotlinx.coroutines.Job +import java.time.Duration + +interface MetricsKafkaConsumer { + suspend fun start(updateInterval: Long = defaultUpdateInterval, pollTimeout: Duration = defaultPollTimeoutMs): Job + + companion object{ + private const val defaultUpdateInterval = 500L + private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L) + } +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index d105c4a7..18de6fcc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -28,19 +28,23 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.time.Duration internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, private val topics: Set<String>, private val metrics: Metrics, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) + : MetricsKafkaConsumer{ - suspend fun start(updateInterval: Long = 500L): Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = GlobalScope.launch(dispatcher) { - kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() + kafkaConsumer.assign(topics.map { TopicPartition(it, 0) }) while (isActive) { + val topicPartitions = kafkaConsumer.assignment() + kafkaConsumer.endOffsets(topicPartitions) .forEach { (topicPartition, offset) -> update(topicPartition, offset) @@ -58,6 +62,6 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte } companion object { - val logger = Logger(OffsetKafkaConsumer::class) + private val logger = Logger(OffsetKafkaConsumer::class) } } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt index f47a66d0..7574d61d 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt @@ -29,21 +29,22 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass import java.time.Duration internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, private val topics: Set<String>, private val metrics: Metrics, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) + : MetricsKafkaConsumer{ - suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = GlobalScope.launch(dispatcher){ kafkaConsumer.subscribe(topics) while (isActive){ - kafkaConsumer.poll(timeout).forEach(::update) + kafkaConsumer.poll(pollTimeout).forEach(::update) kafkaConsumer.commitSync() delay(updateInterval) } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt index 7e77bae9..9bf4310b 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt @@ -19,9 +19,14 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer import org.onap.dcae.collectors.veshv.utils.process.ExitCode @@ -37,6 +42,20 @@ fun main(args: Array<String>): Unit = private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess { + val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create( + config.kafkaBootstrapServers) + ) + + runBlocking { + if (config.disableProcessing) { + OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE) + .start() + } else { + ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE) + .start() + } + } + PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE) .start().block()!!.await().block() // TODO refactor netty server logic |