From 006be7f70368ce91986037ae7a032ba00836c6c2 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Tue, 25 Jun 2019 11:32:36 +0200 Subject: Add environment configuration to kafka consumer - HV-VES-specific environment prefix moved inside HvVes modules to allow simpler no-prefix API for other modules - created OptionDSL for brevity Change-Id: I2fabbda1280cc0f913f8a0a04b4a055f39ed1fae Issue-ID: DCAEGEN2-1626 Signed-off-by: Filip Krzywka --- .../veshv/commandline/CommandLineOption.kt | 211 +++++++++++---------- .../collectors/veshv/commandline/extensions.kt | 47 +++-- .../veshv/commandline/CommandLineOptionTest.kt | 4 +- .../veshv/config/impl/HvVesCommandLineParser.kt | 6 +- .../config/ArgKafkaConsumerConfiguration.kt | 24 ++- .../config/KafkaConsumerConfiguration.kt | 7 +- 6 files changed, 174 insertions(+), 125 deletions(-) diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt index d08f6c09..9d875571 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt @@ -23,109 +23,118 @@ import org.apache.commons.cli.Option enum class CommandLineOption(val option: Option, val required: Boolean = false) { - HEALTH_CHECK_API_PORT( - Option.builder("H") - .longOpt("health-check-api-port") - .hasArg() - .desc("Health check rest api listen port") - .build() - ), - CONFIGURATION_FILE( - Option.builder("c") - .longOpt("configuration-file") - .hasArg() - .desc("Json file containing HV-VES configuration") - .build(), - required = true - ), - LISTEN_PORT( - Option.builder("p") - .longOpt("listen-port") - .hasArg() - .desc("Listen port") - .build(), - required = true - ), - VES_HV_PORT( - Option.builder("v") - .longOpt("ves-port") - .hasArg() - .desc("VesHvCollector port") - .build(), - required = true - ), - VES_HV_HOST( - Option.builder("h") - .longOpt("ves-host") - .hasArg() - .desc("VesHvCollector host") - .build(), - required = true - ), - KAFKA_SERVERS( - Option.builder("s") - .longOpt("kafka-bootstrap-servers") - .hasArg() - .desc("Comma-separated Kafka bootstrap servers in : format") - .build(), - required = true - ), - KAFKA_TOPICS( - Option.builder("f") - .longOpt("kafka-topics") - .hasArg() - .desc("Comma-separated Kafka topics") - .build(), - required = true - ), - SSL_DISABLE( - Option.builder("l") - .longOpt("ssl-disable") - .desc("Disable SSL encryption") - .build() - ), - KEY_STORE_FILE( - Option.builder("k") - .longOpt("key-store") - .hasArg() - .desc("Key store in PKCS12 format") - .build() - ), - KEY_STORE_PASSWORD_FILE( - Option.builder("kp") - .longOpt("key-store-password-file") - .hasArg() - .desc("File with key store password") - .build() - ), - TRUST_STORE_FILE( - Option.builder("t") - .longOpt("trust-store") - .hasArg() - .desc("File with trusted certificate bundle in PKCS12 format") - .build() - ), - TRUST_STORE_PASSWORD_FILE( - Option.builder("tp") - .longOpt("trust-store-password-file") - .hasArg() - .desc("File with trust store password") - .build() - ), - MAXIMUM_PAYLOAD_SIZE_BYTES( - Option.builder("m") - .longOpt("max-payload-size") - .hasArg() - .desc("Maximum supported payload size in bytes") - .build() - ); + CONFIGURATION_FILE(required = true, + option = option { + shortOpt = "c" + longOpt = "configuration-file" + desc = "Json file containing HV-VES configuration" + hasArgument = true + }), + LISTEN_PORT(required = true, + option = option { + shortOpt = "p" + longOpt = "listen-port" + desc = "Listen port" + hasArgument = true + }), + VES_HV_PORT(required = true, + option = option { + shortOpt = "v" + longOpt = "ves-port" + desc = "VesHvCollector port" + hasArgument = true + }), + VES_HV_HOST(required = true, + option = option { + shortOpt = "h" + longOpt = "ves-host" + desc = "VesHvCollector host" + hasArgument = true + }), + KAFKA_SERVERS(required = true, + option = option { + shortOpt = "s" + longOpt = "kafka-bootstrap-servers" + desc = "Comma-separated Kafka bootstrap servers in : format" + hasArgument = true + }), + KAFKA_TOPICS(required = true, + option = option { + shortOpt = "f" + longOpt = "kafka-topics" + desc = "Comma-separated Kafka topics" + hasArgument = true + }), + HEALTH_CHECK_API_PORT(option { + shortOpt = "H" + longOpt = "health-check-api-port" + desc = "Health check rest api listen port" + hasArgument = true + }), + SSL_DISABLE(option { + shortOpt = "l" + longOpt = "ssl-disable" + desc = "Disable SSL encryption" + }), + KEY_STORE_FILE(option { + shortOpt = "k" + longOpt = "key-store" + desc = "Key store in PKCS12 format" + hasArgument = true + }), + KEY_STORE_PASSWORD_FILE(option { + shortOpt = "kp" + longOpt = "key-store-password-file" + desc = "File with key store password" + hasArgument = true + }), + TRUST_STORE_FILE(option { + shortOpt = "t" + longOpt = "trust-store" + desc = "File with trusted certificate bundle in PKCS12 format" + hasArgument = true + }), + TRUST_STORE_PASSWORD_FILE(option { + shortOpt = "tp" + longOpt = "trust-store-password-file" + desc = "File with trust store password" + hasArgument = true + }), + MAXIMUM_PAYLOAD_SIZE_BYTES(option { + shortOpt = "m" + longOpt = "max-payload-size" + desc = "Maximum supported payload size in bytes" + hasArgument = true + }), + DISABLE_PROCESSING(option { + shortOpt = "d" + longOpt = "disable-processing" + desc = "Message queue consumer option. Indicates whether messages should be fully processed" + }); - fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String = + fun environmentVariableName(prefix: String = ""): String = option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> - "${prefix}_${mainPart}" + if (prefix.isNotBlank()) { + "${prefix}_${mainPart}" + } else { + mainPart + } } +} + - companion object { - private const val DEFAULT_ENV_PREFIX = "VESHV" - } +private class OptionDSL { + lateinit var shortOpt: String + lateinit var longOpt: String + lateinit var desc: String + var hasArgument: Boolean = false } + +private fun option(conf: OptionDSL.() -> Unit): Option { + val dsl = OptionDSL().apply(conf) + return Option.builder(dsl.shortOpt) + .longOpt(dsl.longOpt) + .hasArg(dsl.hasArgument) + .desc(dsl.desc) + .build() +} \ No newline at end of file diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt index 6d8ba3ff..20ca97ed 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt @@ -40,28 +40,43 @@ fun handleWrongArgumentError(programName: String, err: WrongArgumentError): Exit return ExitFailure(2) } -fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = - longValue(cmdLineOpt).getOrElse { default } +inline class EnvPrefix(val it: String) -fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String = - optionValue(cmdLineOpt).getOrElse { default } +private val DEFAULT_PREFIX = EnvPrefix("") -fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int = - intValue(cmdLineOpt).getOrElse { default } +fun CommandLine.longValue(cmdLineOpt: CommandLineOption, + default: Long, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Long = + longValue(cmdLineOpt, envPrefix).getOrElse { default } -fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option = - optionValue(cmdLineOpt).map(String::toInt) +fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, + default: String, + envPrefix: EnvPrefix = DEFAULT_PREFIX): String = + optionValue(cmdLineOpt, envPrefix).getOrElse { default } -fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option = - optionValue(cmdLineOpt).map(String::toLong) +fun CommandLine.intValue(cmdLineOpt: CommandLineOption, + default: Int, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Int = + intValue(cmdLineOpt, envPrefix).getOrElse { default } -fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option = - optionValue(cmdLineOpt) +fun CommandLine.intValue(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Option = + optionValue(cmdLineOpt, envPrefix).map(String::toInt) -fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean = +fun CommandLine.longValue(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Option = + optionValue(cmdLineOpt, envPrefix).map(String::toLong) + +fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Option = + optionValue(cmdLineOpt, envPrefix) + +fun CommandLine.hasOption(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Boolean = this.hasOption(cmdLineOpt.option.opt) || - System.getenv(cmdLineOpt.environmentVariableName()) != null + System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) != null -private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain( +private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption, envPrefix: EnvPrefix) = Option.fromNullablesChain( getOptionValue(cmdLineOpt.option.opt), - { System.getenv(cmdLineOpt.environmentVariableName()) }) + { System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) }) + diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt index 6614e77f..e6776974 100644 --- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt +++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt @@ -53,8 +53,8 @@ class CommandLineOptionTest : Spek({ on("calling environmentVariableName") { val result = opt.environmentVariableName() - it("should return prefixed upper snake cased long option name") { - assertThat(result).isEqualTo("VESHV_SSL_DISABLE") + it("should return upper snake cased long option name without prefix") { + assertThat(result).isEqualTo("SSL_DISABLE") } } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt index c6730a4c..27560642 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt @@ -29,6 +29,7 @@ import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.Options import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT +import org.onap.dcae.collectors.veshv.commandline.EnvPrefix import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue @@ -42,7 +43,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De fun getConfigurationFile(args: Array): Either = parse(args) { - it.stringValue(CONFIGURATION_FILE).map(::File) + it.stringValue(CONFIGURATION_FILE, HV_VES_ENV_PREFIX).map(::File) }.toEither { WrongArgumentError( message = "Base configuration filepath missing on command line", @@ -51,7 +52,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De fun getHealthcheckPort(args: Array): Int = parse(args) { - it.intValue(HEALTH_CHECK_API_PORT) + it.intValue(HEALTH_CHECK_API_PORT, HV_VES_ENV_PREFIX) }.getOrElse { logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" } DEFAULT_HEALTHCHECK_PORT @@ -76,6 +77,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De .let { parser.parse(it, args) } companion object { + private val HV_VES_ENV_PREFIX = EnvPrefix("VESHV") private const val DEFAULT_HEALTHCHECK_PORT: Int = 6060 private val logger = Logger(HvVesCommandLineParser::class) } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt index 4d65e916..be7b5cca 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt @@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.* +import org.onap.dcae.collectors.veshv.commandline.hasOption import org.onap.dcae.collectors.veshv.commandline.intValue +import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import java.net.InetSocketAddress internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration(DefaultParser()) { - override val cmdLineOptionsList: List = listOf(LISTEN_PORT) + override val cmdLineOptionsList: List = listOf( + LISTEN_PORT, + KAFKA_TOPICS, + KAFKA_SERVERS, + DISABLE_PROCESSING + ) override fun getConfiguration(cmdLine: CommandLine): Option = binding { val listenPort = cmdLine.intValue(LISTEN_PORT).bind() - KafkaConsumerConfiguration(InetSocketAddress(listenPort)) + val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS) + .map { it.split(',').toSet() } + .bind() + val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind() + val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING) + + KafkaConsumerConfiguration( + InetSocketAddress(listenPort), + kafkaTopics, + kafkaBootstrapServers, + disableProcessing + ) } } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt index ef06a0d1..cdd4c30a 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt @@ -21,4 +21,9 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.config import java.net.InetSocketAddress -internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress) +internal data class KafkaConsumerConfiguration( + val apiAddress: InetSocketAddress, + val kafkaTopics: Set, + val kafkaBootstrapServers: String, + val disableProcessing: Boolean +) -- cgit 1.2.3-korg