summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt32
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt14
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt9
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt19
4 files changed, 65 insertions, 9 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
new file mode 100644
index 00000000..6dddd0f8
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.api
+
+import kotlinx.coroutines.Job
+import java.time.Duration
+
+interface MetricsKafkaConsumer {
+ suspend fun start(updateInterval: Long = defaultUpdateInterval, pollTimeout: Duration = defaultPollTimeoutMs): Job
+
+ companion object{
+ private const val defaultUpdateInterval = 500L
+ private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L)
+ }
+} \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index d105c4a7..18de6fcc 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -28,19 +28,23 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
+import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.time.Duration
internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
private val topics: Set<String>,
private val metrics: Metrics,
- private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
+ : MetricsKafkaConsumer{
- suspend fun start(updateInterval: Long = 500L): Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
GlobalScope.launch(dispatcher) {
- kafkaConsumer.subscribe(topics)
- val topicPartitions = kafkaConsumer.assignment()
+ kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
while (isActive) {
+ val topicPartitions = kafkaConsumer.assignment()
+
kafkaConsumer.endOffsets(topicPartitions)
.forEach { (topicPartition, offset) ->
update(topicPartition, offset)
@@ -58,6 +62,6 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
}
companion object {
- val logger = Logger(OffsetKafkaConsumer::class)
+ private val logger = Logger(OffsetKafkaConsumer::class)
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
index f47a66d0..7574d61d 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
@@ -29,21 +29,22 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass
import java.time.Duration
internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
private val topics: Set<String>,
private val metrics: Metrics,
- private val dispatcher: CoroutineDispatcher = Dispatchers.IO){
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
+ : MetricsKafkaConsumer{
- suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
GlobalScope.launch(dispatcher){
kafkaConsumer.subscribe(topics)
while (isActive){
- kafkaConsumer.poll(timeout).forEach(::update)
+ kafkaConsumer.poll(pollTimeout).forEach(::update)
kafkaConsumer.commitSync()
delay(updateInterval)
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
index 7e77bae9..9bf4310b 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -19,9 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
import org.onap.dcae.collectors.veshv.utils.process.ExitCode
@@ -37,6 +42,20 @@ fun main(args: Array<String>): Unit =
private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+ val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create(
+ config.kafkaBootstrapServers)
+ )
+
+ runBlocking {
+ if (config.disableProcessing) {
+ OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+ .start()
+ } else {
+ ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+ .start()
+ }
+ }
+
PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
.start().block()!!.await().block() // TODO refactor netty server logic