aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt24
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt7
2 files changed, 27 insertions, 4 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
index 4d65e916..be7b5cca 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import java.net.InetSocketAddress
internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT)
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ KAFKA_TOPICS,
+ KAFKA_SERVERS,
+ DISABLE_PROCESSING
+ )
override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
binding {
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- KafkaConsumerConfiguration(InetSocketAddress(listenPort))
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+ .map { it.split(',').toSet() }
+ .bind()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+ KafkaConsumerConfiguration(
+ InetSocketAddress(listenPort),
+ kafkaTopics,
+ kafkaBootstrapServers,
+ disableProcessing
+ )
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
index ef06a0d1..cdd4c30a 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
@@ -21,4 +21,9 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.config
import java.net.InetSocketAddress
-internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress)
+internal data class KafkaConsumerConfiguration(
+ val apiAddress: InetSocketAddress,
+ val kafkaTopics: Set<String>,
+ val kafkaBootstrapServers: String,
+ val disableProcessing: Boolean
+)