aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-06-25 11:32:36 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-06-26 08:57:37 +0200
commit006be7f70368ce91986037ae7a032ba00836c6c2 (patch)
treef24bd7eeca2e8245dd30fda05c433debb67068bb
parent7808010c1a18531ee9b618f934d31816193cac38 (diff)
Add environment configuration to kafka consumer
- HV-VES-specific environment prefix moved inside HvVes modules to allow simpler no-prefix API for other modules - created OptionDSL for brevity Change-Id: I2fabbda1280cc0f913f8a0a04b4a055f39ed1fae Issue-ID: DCAEGEN2-1626 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt211
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt47
-rw-r--r--sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt6
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt24
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt7
6 files changed, 174 insertions, 125 deletions
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
index d08f6c09..9d875571 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
@@ -23,109 +23,118 @@ import org.apache.commons.cli.Option
enum class CommandLineOption(val option: Option, val required: Boolean = false) {
- HEALTH_CHECK_API_PORT(
- Option.builder("H")
- .longOpt("health-check-api-port")
- .hasArg()
- .desc("Health check rest api listen port")
- .build()
- ),
- CONFIGURATION_FILE(
- Option.builder("c")
- .longOpt("configuration-file")
- .hasArg()
- .desc("Json file containing HV-VES configuration")
- .build(),
- required = true
- ),
- LISTEN_PORT(
- Option.builder("p")
- .longOpt("listen-port")
- .hasArg()
- .desc("Listen port")
- .build(),
- required = true
- ),
- VES_HV_PORT(
- Option.builder("v")
- .longOpt("ves-port")
- .hasArg()
- .desc("VesHvCollector port")
- .build(),
- required = true
- ),
- VES_HV_HOST(
- Option.builder("h")
- .longOpt("ves-host")
- .hasArg()
- .desc("VesHvCollector host")
- .build(),
- required = true
- ),
- KAFKA_SERVERS(
- Option.builder("s")
- .longOpt("kafka-bootstrap-servers")
- .hasArg()
- .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
- .build(),
- required = true
- ),
- KAFKA_TOPICS(
- Option.builder("f")
- .longOpt("kafka-topics")
- .hasArg()
- .desc("Comma-separated Kafka topics")
- .build(),
- required = true
- ),
- SSL_DISABLE(
- Option.builder("l")
- .longOpt("ssl-disable")
- .desc("Disable SSL encryption")
- .build()
- ),
- KEY_STORE_FILE(
- Option.builder("k")
- .longOpt("key-store")
- .hasArg()
- .desc("Key store in PKCS12 format")
- .build()
- ),
- KEY_STORE_PASSWORD_FILE(
- Option.builder("kp")
- .longOpt("key-store-password-file")
- .hasArg()
- .desc("File with key store password")
- .build()
- ),
- TRUST_STORE_FILE(
- Option.builder("t")
- .longOpt("trust-store")
- .hasArg()
- .desc("File with trusted certificate bundle in PKCS12 format")
- .build()
- ),
- TRUST_STORE_PASSWORD_FILE(
- Option.builder("tp")
- .longOpt("trust-store-password-file")
- .hasArg()
- .desc("File with trust store password")
- .build()
- ),
- MAXIMUM_PAYLOAD_SIZE_BYTES(
- Option.builder("m")
- .longOpt("max-payload-size")
- .hasArg()
- .desc("Maximum supported payload size in bytes")
- .build()
- );
+ CONFIGURATION_FILE(required = true,
+ option = option {
+ shortOpt = "c"
+ longOpt = "configuration-file"
+ desc = "Json file containing HV-VES configuration"
+ hasArgument = true
+ }),
+ LISTEN_PORT(required = true,
+ option = option {
+ shortOpt = "p"
+ longOpt = "listen-port"
+ desc = "Listen port"
+ hasArgument = true
+ }),
+ VES_HV_PORT(required = true,
+ option = option {
+ shortOpt = "v"
+ longOpt = "ves-port"
+ desc = "VesHvCollector port"
+ hasArgument = true
+ }),
+ VES_HV_HOST(required = true,
+ option = option {
+ shortOpt = "h"
+ longOpt = "ves-host"
+ desc = "VesHvCollector host"
+ hasArgument = true
+ }),
+ KAFKA_SERVERS(required = true,
+ option = option {
+ shortOpt = "s"
+ longOpt = "kafka-bootstrap-servers"
+ desc = "Comma-separated Kafka bootstrap servers in <host>:<port> format"
+ hasArgument = true
+ }),
+ KAFKA_TOPICS(required = true,
+ option = option {
+ shortOpt = "f"
+ longOpt = "kafka-topics"
+ desc = "Comma-separated Kafka topics"
+ hasArgument = true
+ }),
+ HEALTH_CHECK_API_PORT(option {
+ shortOpt = "H"
+ longOpt = "health-check-api-port"
+ desc = "Health check rest api listen port"
+ hasArgument = true
+ }),
+ SSL_DISABLE(option {
+ shortOpt = "l"
+ longOpt = "ssl-disable"
+ desc = "Disable SSL encryption"
+ }),
+ KEY_STORE_FILE(option {
+ shortOpt = "k"
+ longOpt = "key-store"
+ desc = "Key store in PKCS12 format"
+ hasArgument = true
+ }),
+ KEY_STORE_PASSWORD_FILE(option {
+ shortOpt = "kp"
+ longOpt = "key-store-password-file"
+ desc = "File with key store password"
+ hasArgument = true
+ }),
+ TRUST_STORE_FILE(option {
+ shortOpt = "t"
+ longOpt = "trust-store"
+ desc = "File with trusted certificate bundle in PKCS12 format"
+ hasArgument = true
+ }),
+ TRUST_STORE_PASSWORD_FILE(option {
+ shortOpt = "tp"
+ longOpt = "trust-store-password-file"
+ desc = "File with trust store password"
+ hasArgument = true
+ }),
+ MAXIMUM_PAYLOAD_SIZE_BYTES(option {
+ shortOpt = "m"
+ longOpt = "max-payload-size"
+ desc = "Maximum supported payload size in bytes"
+ hasArgument = true
+ }),
+ DISABLE_PROCESSING(option {
+ shortOpt = "d"
+ longOpt = "disable-processing"
+ desc = "Message queue consumer option. Indicates whether messages should be fully processed"
+ });
- fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
+ fun environmentVariableName(prefix: String = ""): String =
option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
- "${prefix}_${mainPart}"
+ if (prefix.isNotBlank()) {
+ "${prefix}_${mainPart}"
+ } else {
+ mainPart
+ }
}
+}
+
- companion object {
- private const val DEFAULT_ENV_PREFIX = "VESHV"
- }
+private class OptionDSL {
+ lateinit var shortOpt: String
+ lateinit var longOpt: String
+ lateinit var desc: String
+ var hasArgument: Boolean = false
}
+
+private fun option(conf: OptionDSL.() -> Unit): Option {
+ val dsl = OptionDSL().apply(conf)
+ return Option.builder(dsl.shortOpt)
+ .longOpt(dsl.longOpt)
+ .hasArg(dsl.hasArgument)
+ .desc(dsl.desc)
+ .build()
+} \ No newline at end of file
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
index 6d8ba3ff..20ca97ed 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
@@ -40,28 +40,43 @@ fun handleWrongArgumentError(programName: String, err: WrongArgumentError): Exit
return ExitFailure(2)
}
-fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
- longValue(cmdLineOpt).getOrElse { default }
+inline class EnvPrefix(val it: String)
-fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
- optionValue(cmdLineOpt).getOrElse { default }
+private val DEFAULT_PREFIX = EnvPrefix("")
-fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
- intValue(cmdLineOpt).getOrElse { default }
+fun CommandLine.longValue(cmdLineOpt: CommandLineOption,
+ default: Long,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Long =
+ longValue(cmdLineOpt, envPrefix).getOrElse { default }
-fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
- optionValue(cmdLineOpt).map(String::toInt)
+fun CommandLine.stringValue(cmdLineOpt: CommandLineOption,
+ default: String,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): String =
+ optionValue(cmdLineOpt, envPrefix).getOrElse { default }
-fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
- optionValue(cmdLineOpt).map(String::toLong)
+fun CommandLine.intValue(cmdLineOpt: CommandLineOption,
+ default: Int,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Int =
+ intValue(cmdLineOpt, envPrefix).getOrElse { default }
-fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
- optionValue(cmdLineOpt)
+fun CommandLine.intValue(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Int> =
+ optionValue(cmdLineOpt, envPrefix).map(String::toInt)
-fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
+fun CommandLine.longValue(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Long> =
+ optionValue(cmdLineOpt, envPrefix).map(String::toLong)
+
+fun CommandLine.stringValue(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<String> =
+ optionValue(cmdLineOpt, envPrefix)
+
+fun CommandLine.hasOption(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Boolean =
this.hasOption(cmdLineOpt.option.opt) ||
- System.getenv(cmdLineOpt.environmentVariableName()) != null
+ System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) != null
-private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
+private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption, envPrefix: EnvPrefix) = Option.fromNullablesChain(
getOptionValue(cmdLineOpt.option.opt),
- { System.getenv(cmdLineOpt.environmentVariableName()) })
+ { System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) })
+
diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
index 6614e77f..e6776974 100644
--- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
+++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
@@ -53,8 +53,8 @@ class CommandLineOptionTest : Spek({
on("calling environmentVariableName") {
val result = opt.environmentVariableName()
- it("should return prefixed upper snake cased long option name") {
- assertThat(result).isEqualTo("VESHV_SSL_DISABLE")
+ it("should return upper snake cased long option name without prefix") {
+ assertThat(result).isEqualTo("SSL_DISABLE")
}
}
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index c6730a4c..27560642 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -29,6 +29,7 @@ import org.apache.commons.cli.DefaultParser
import org.apache.commons.cli.Options
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
+import org.onap.dcae.collectors.veshv.commandline.EnvPrefix
import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
import org.onap.dcae.collectors.veshv.commandline.intValue
import org.onap.dcae.collectors.veshv.commandline.stringValue
@@ -42,7 +43,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
fun getConfigurationFile(args: Array<out String>): Either<WrongArgumentError, File> =
parse(args) {
- it.stringValue(CONFIGURATION_FILE).map(::File)
+ it.stringValue(CONFIGURATION_FILE, HV_VES_ENV_PREFIX).map(::File)
}.toEither {
WrongArgumentError(
message = "Base configuration filepath missing on command line",
@@ -51,7 +52,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
fun getHealthcheckPort(args: Array<out String>): Int =
parse(args) {
- it.intValue(HEALTH_CHECK_API_PORT)
+ it.intValue(HEALTH_CHECK_API_PORT, HV_VES_ENV_PREFIX)
}.getOrElse {
logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" }
DEFAULT_HEALTHCHECK_PORT
@@ -76,6 +77,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
.let { parser.parse(it, args) }
companion object {
+ private val HV_VES_ENV_PREFIX = EnvPrefix("VESHV")
private const val DEFAULT_HEALTHCHECK_PORT: Int = 6060
private val logger = Logger(HvVesCommandLineParser::class)
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
index 4d65e916..be7b5cca 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import java.net.InetSocketAddress
internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT)
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ KAFKA_TOPICS,
+ KAFKA_SERVERS,
+ DISABLE_PROCESSING
+ )
override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
binding {
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- KafkaConsumerConfiguration(InetSocketAddress(listenPort))
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+ .map { it.split(',').toSet() }
+ .bind()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+ KafkaConsumerConfiguration(
+ InetSocketAddress(listenPort),
+ kafkaTopics,
+ kafkaBootstrapServers,
+ disableProcessing
+ )
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
index ef06a0d1..cdd4c30a 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
@@ -21,4 +21,9 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.config
import java.net.InetSocketAddress
-internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress)
+internal data class KafkaConsumerConfiguration(
+ val apiAddress: InetSocketAddress,
+ val kafkaTopics: Set<String>,
+ val kafkaBootstrapServers: String,
+ val disableProcessing: Boolean
+)