aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt24
1 files changed, 21 insertions, 3 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
index 4d65e916..be7b5cca 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import java.net.InetSocketAddress
internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT)
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ KAFKA_TOPICS,
+ KAFKA_SERVERS,
+ DISABLE_PROCESSING
+ )
override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
binding {
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- KafkaConsumerConfiguration(InetSocketAddress(listenPort))
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+ .map { it.split(',').toSet() }
+ .bind()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+ KafkaConsumerConfiguration(
+ InetSocketAddress(listenPort),
+ kafkaTopics,
+ kafkaBootstrapServers,
+ disableProcessing
+ )
}
}