diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-06-25 11:32:36 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-06-26 08:57:37 +0200 |
commit | 006be7f70368ce91986037ae7a032ba00836c6c2 (patch) | |
tree | f24bd7eeca2e8245dd30fda05c433debb67068bb /sources/hv-collector-kafka-consumer | |
parent | 7808010c1a18531ee9b618f934d31816193cac38 (diff) |
Add environment configuration to kafka consumer
- HV-VES-specific environment prefix moved inside HvVes modules to
allow simpler no-prefix API for other modules
- created OptionDSL for brevity
Change-Id: I2fabbda1280cc0f913f8a0a04b4a055f39ed1fae
Issue-ID: DCAEGEN2-1626
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
2 files changed, 27 insertions, 4 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt index 4d65e916..be7b5cca 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt @@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.* +import org.onap.dcae.collectors.veshv.commandline.hasOption import org.onap.dcae.collectors.veshv.commandline.intValue +import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import java.net.InetSocketAddress internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) { - override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT) + override val cmdLineOptionsList: List<CommandLineOption> = listOf( + LISTEN_PORT, + KAFKA_TOPICS, + KAFKA_SERVERS, + DISABLE_PROCESSING + ) override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> = binding { val listenPort = cmdLine.intValue(LISTEN_PORT).bind() - KafkaConsumerConfiguration(InetSocketAddress(listenPort)) + val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS) + .map { it.split(',').toSet() } + .bind() + val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind() + val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING) + + KafkaConsumerConfiguration( + InetSocketAddress(listenPort), + kafkaTopics, + kafkaBootstrapServers, + disableProcessing + ) } } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt index ef06a0d1..cdd4c30a 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt @@ -21,4 +21,9 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.config import java.net.InetSocketAddress -internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress) +internal data class KafkaConsumerConfiguration( + val apiAddress: InetSocketAddress, + val kafkaTopics: Set<String>, + val kafkaBootstrapServers: String, + val disableProcessing: Boolean +) |