aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer/src
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-06-25 11:32:36 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-06-26 08:57:37 +0200
commit006be7f70368ce91986037ae7a032ba00836c6c2 (patch)
treef24bd7eeca2e8245dd30fda05c433debb67068bb /sources/hv-collector-kafka-consumer/src
parent7808010c1a18531ee9b618f934d31816193cac38 (diff)
Add environment configuration to kafka consumer
- HV-VES-specific environment prefix moved inside HvVes modules to allow simpler no-prefix API for other modules - created OptionDSL for brevity Change-Id: I2fabbda1280cc0f913f8a0a04b4a055f39ed1fae Issue-ID: DCAEGEN2-1626 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src')
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt24
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt7
2 files changed, 27 insertions, 4 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
index 4d65e916..be7b5cca 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import java.net.InetSocketAddress
internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT)
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ KAFKA_TOPICS,
+ KAFKA_SERVERS,
+ DISABLE_PROCESSING
+ )
override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
binding {
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- KafkaConsumerConfiguration(InetSocketAddress(listenPort))
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+ .map { it.split(',').toSet() }
+ .bind()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+ KafkaConsumerConfiguration(
+ InetSocketAddress(listenPort),
+ kafkaTopics,
+ kafkaBootstrapServers,
+ disableProcessing
+ )
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
index ef06a0d1..cdd4c30a 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
@@ -21,4 +21,9 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.config
import java.net.InetSocketAddress
-internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress)
+internal data class KafkaConsumerConfiguration(
+ val apiAddress: InetSocketAddress,
+ val kafkaTopics: Set<String>,
+ val kafkaBootstrapServers: String,
+ val disableProcessing: Boolean
+)