diff options
Diffstat (limited to 'sources')
48 files changed, 1333 insertions, 279 deletions
diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml index f37f275e..90ba8085 100644 --- a/sources/hv-collector-commandline/pom.xml +++ b/sources/hv-collector-commandline/pom.xml @@ -7,7 +7,7 @@ <parent> <artifactId>hv-collector-sources</artifactId> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> </parent> <artifactId>hv-collector-commandline</artifactId> diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt index d08f6c09..9d875571 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt @@ -23,109 +23,118 @@ import org.apache.commons.cli.Option enum class CommandLineOption(val option: Option, val required: Boolean = false) { - HEALTH_CHECK_API_PORT( - Option.builder("H") - .longOpt("health-check-api-port") - .hasArg() - .desc("Health check rest api listen port") - .build() - ), - CONFIGURATION_FILE( - Option.builder("c") - .longOpt("configuration-file") - .hasArg() - .desc("Json file containing HV-VES configuration") - .build(), - required = true - ), - LISTEN_PORT( - Option.builder("p") - .longOpt("listen-port") - .hasArg() - .desc("Listen port") - .build(), - required = true - ), - VES_HV_PORT( - Option.builder("v") - .longOpt("ves-port") - .hasArg() - .desc("VesHvCollector port") - .build(), - required = true - ), - VES_HV_HOST( - Option.builder("h") - .longOpt("ves-host") - .hasArg() - .desc("VesHvCollector host") - .build(), - required = true - ), - KAFKA_SERVERS( - Option.builder("s") - .longOpt("kafka-bootstrap-servers") - .hasArg() - .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format") - .build(), - required = true - ), - KAFKA_TOPICS( - Option.builder("f") - .longOpt("kafka-topics") - .hasArg() - .desc("Comma-separated Kafka topics") - .build(), - required = true - ), - SSL_DISABLE( - Option.builder("l") - .longOpt("ssl-disable") - .desc("Disable SSL encryption") - .build() - ), - KEY_STORE_FILE( - Option.builder("k") - .longOpt("key-store") - .hasArg() - .desc("Key store in PKCS12 format") - .build() - ), - KEY_STORE_PASSWORD_FILE( - Option.builder("kp") - .longOpt("key-store-password-file") - .hasArg() - .desc("File with key store password") - .build() - ), - TRUST_STORE_FILE( - Option.builder("t") - .longOpt("trust-store") - .hasArg() - .desc("File with trusted certificate bundle in PKCS12 format") - .build() - ), - TRUST_STORE_PASSWORD_FILE( - Option.builder("tp") - .longOpt("trust-store-password-file") - .hasArg() - .desc("File with trust store password") - .build() - ), - MAXIMUM_PAYLOAD_SIZE_BYTES( - Option.builder("m") - .longOpt("max-payload-size") - .hasArg() - .desc("Maximum supported payload size in bytes") - .build() - ); + CONFIGURATION_FILE(required = true, + option = option { + shortOpt = "c" + longOpt = "configuration-file" + desc = "Json file containing HV-VES configuration" + hasArgument = true + }), + LISTEN_PORT(required = true, + option = option { + shortOpt = "p" + longOpt = "listen-port" + desc = "Listen port" + hasArgument = true + }), + VES_HV_PORT(required = true, + option = option { + shortOpt = "v" + longOpt = "ves-port" + desc = "VesHvCollector port" + hasArgument = true + }), + VES_HV_HOST(required = true, + option = option { + shortOpt = "h" + longOpt = "ves-host" + desc = "VesHvCollector host" + hasArgument = true + }), + KAFKA_SERVERS(required = true, + option = option { + shortOpt = "s" + longOpt = "kafka-bootstrap-servers" + desc = "Comma-separated Kafka bootstrap servers in <host>:<port> format" + hasArgument = true + }), + KAFKA_TOPICS(required = true, + option = option { + shortOpt = "f" + longOpt = "kafka-topics" + desc = "Comma-separated Kafka topics" + hasArgument = true + }), + HEALTH_CHECK_API_PORT(option { + shortOpt = "H" + longOpt = "health-check-api-port" + desc = "Health check rest api listen port" + hasArgument = true + }), + SSL_DISABLE(option { + shortOpt = "l" + longOpt = "ssl-disable" + desc = "Disable SSL encryption" + }), + KEY_STORE_FILE(option { + shortOpt = "k" + longOpt = "key-store" + desc = "Key store in PKCS12 format" + hasArgument = true + }), + KEY_STORE_PASSWORD_FILE(option { + shortOpt = "kp" + longOpt = "key-store-password-file" + desc = "File with key store password" + hasArgument = true + }), + TRUST_STORE_FILE(option { + shortOpt = "t" + longOpt = "trust-store" + desc = "File with trusted certificate bundle in PKCS12 format" + hasArgument = true + }), + TRUST_STORE_PASSWORD_FILE(option { + shortOpt = "tp" + longOpt = "trust-store-password-file" + desc = "File with trust store password" + hasArgument = true + }), + MAXIMUM_PAYLOAD_SIZE_BYTES(option { + shortOpt = "m" + longOpt = "max-payload-size" + desc = "Maximum supported payload size in bytes" + hasArgument = true + }), + DISABLE_PROCESSING(option { + shortOpt = "d" + longOpt = "disable-processing" + desc = "Message queue consumer option. Indicates whether messages should be fully processed" + }); - fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String = + fun environmentVariableName(prefix: String = ""): String = option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> - "${prefix}_${mainPart}" + if (prefix.isNotBlank()) { + "${prefix}_${mainPart}" + } else { + mainPart + } } +} + - companion object { - private const val DEFAULT_ENV_PREFIX = "VESHV" - } +private class OptionDSL { + lateinit var shortOpt: String + lateinit var longOpt: String + lateinit var desc: String + var hasArgument: Boolean = false } + +private fun option(conf: OptionDSL.() -> Unit): Option { + val dsl = OptionDSL().apply(conf) + return Option.builder(dsl.shortOpt) + .longOpt(dsl.longOpt) + .hasArg(dsl.hasArgument) + .desc(dsl.desc) + .build() +}
\ No newline at end of file diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt index 6d8ba3ff..20ca97ed 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt @@ -40,28 +40,43 @@ fun handleWrongArgumentError(programName: String, err: WrongArgumentError): Exit return ExitFailure(2) } -fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = - longValue(cmdLineOpt).getOrElse { default } +inline class EnvPrefix(val it: String) -fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String = - optionValue(cmdLineOpt).getOrElse { default } +private val DEFAULT_PREFIX = EnvPrefix("") -fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int = - intValue(cmdLineOpt).getOrElse { default } +fun CommandLine.longValue(cmdLineOpt: CommandLineOption, + default: Long, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Long = + longValue(cmdLineOpt, envPrefix).getOrElse { default } -fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> = - optionValue(cmdLineOpt).map(String::toInt) +fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, + default: String, + envPrefix: EnvPrefix = DEFAULT_PREFIX): String = + optionValue(cmdLineOpt, envPrefix).getOrElse { default } -fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> = - optionValue(cmdLineOpt).map(String::toLong) +fun CommandLine.intValue(cmdLineOpt: CommandLineOption, + default: Int, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Int = + intValue(cmdLineOpt, envPrefix).getOrElse { default } -fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> = - optionValue(cmdLineOpt) +fun CommandLine.intValue(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Int> = + optionValue(cmdLineOpt, envPrefix).map(String::toInt) -fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean = +fun CommandLine.longValue(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Long> = + optionValue(cmdLineOpt, envPrefix).map(String::toLong) + +fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<String> = + optionValue(cmdLineOpt, envPrefix) + +fun CommandLine.hasOption(cmdLineOpt: CommandLineOption, + envPrefix: EnvPrefix = DEFAULT_PREFIX): Boolean = this.hasOption(cmdLineOpt.option.opt) || - System.getenv(cmdLineOpt.environmentVariableName()) != null + System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) != null -private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain( +private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption, envPrefix: EnvPrefix) = Option.fromNullablesChain( getOptionValue(cmdLineOpt.option.opt), - { System.getenv(cmdLineOpt.environmentVariableName()) }) + { System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) }) + diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt index 6614e77f..e6776974 100644 --- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt +++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt @@ -53,8 +53,8 @@ class CommandLineOptionTest : Spek({ on("calling environmentVariableName") { val result = opt.environmentVariableName() - it("should return prefixed upper snake cased long option name") { - assertThat(result).isEqualTo("VESHV_SSL_DISABLE") + it("should return upper snake cased long option name without prefix") { + assertThat(result).isEqualTo("SSL_DISABLE") } } } diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml index ecda41c1..71131bbb 100644 --- a/sources/hv-collector-configuration/pom.xml +++ b/sources/hv-collector-configuration/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> </parent> <artifactId>hv-collector-configuration</artifactId> diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt index c6730a4c..27560642 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt @@ -29,6 +29,7 @@ import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.Options import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT +import org.onap.dcae.collectors.veshv.commandline.EnvPrefix import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue @@ -42,7 +43,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De fun getConfigurationFile(args: Array<out String>): Either<WrongArgumentError, File> = parse(args) { - it.stringValue(CONFIGURATION_FILE).map(::File) + it.stringValue(CONFIGURATION_FILE, HV_VES_ENV_PREFIX).map(::File) }.toEither { WrongArgumentError( message = "Base configuration filepath missing on command line", @@ -51,7 +52,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De fun getHealthcheckPort(args: Array<out String>): Int = parse(args) { - it.intValue(HEALTH_CHECK_API_PORT) + it.intValue(HEALTH_CHECK_API_PORT, HV_VES_ENV_PREFIX) }.getOrElse { logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" } DEFAULT_HEALTHCHECK_PORT @@ -76,6 +77,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De .let { parser.parse(it, args) } companion object { + private val HV_VES_ENV_PREFIX = EnvPrefix("VESHV") private const val DEFAULT_HEALTHCHECK_PORT: Int = 6060 private val logger = Logger(HvVesCommandLineParser::class) } diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index 263fb33a..9fc8b7e0 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-ct/pom.xml b/sources/hv-collector-ct/pom.xml index 84116844..6a2cf1f9 100644 --- a/sources/hv-collector-ct/pom.xml +++ b/sources/hv-collector-ct/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml index 5c32623b..019e4929 100644 --- a/sources/hv-collector-dcae-app-simulator/pom.xml +++ b/sources/hv-collector-dcae-app-simulator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -87,13 +87,15 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-test-utils</artifactId> + <artifactId>hv-collector-kafka</artifactId> <version>${project.parent.version}</version> - <scope>test</scope> + <scope>compile</scope> </dependency> <dependency> - <groupId>io.projectreactor.kafka</groupId> - <artifactId>reactor-kafka</artifactId> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>com.google.guava</groupId> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 122d9bf0..beacfd79 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -28,9 +28,9 @@ import java.util.Collections.synchronizedMap * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf()) +internal class DcaeAppSimulator(private val consumerFactory: DcaeAppConsumerFactory, + private val messageStreamValidation: MessageStreamValidation) { + private val consumers: MutableMap<String, Consumer> = synchronizedMap(mutableMapOf()) fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) @@ -42,9 +42,9 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, } logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" } - synchronized(consumerState) { - consumerState.clear() - consumerState.putAll(consumerFactory.createConsumersForTopics(topics)) + synchronized(consumers) { + consumers.clear() + consumers.putAll(consumerFactory.createConsumersFor(topics)) } } @@ -69,7 +69,7 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, fun validate(jsonDescription: InputStream, topic: String) = messageStreamValidation.validate(jsonDescription, currentMessages(topic)) - private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic]) + private fun consumerState(topic: String) = Option.fromNullable(consumers[topic]) private fun currentMessages(topic: String): List<ByteArray> = diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 2458b203..992be6e3 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -143,14 +143,14 @@ internal class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { private val responseValid by lazy { Responses.statusResponse( name = "valid", - message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE + message = VALID_RESPONSE_MESSAGE ) } private val responseInvalid by lazy { Responses.statusResponse( name = "invalid", - message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE, + message = INVALID_RESPONSE_MESSAGE, httpStatus = HttpStatus.BAD_REQUEST ) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index a108eba7..1b664edc 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,12 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.kafka.receiver.KafkaReceiver @@ -36,7 +31,6 @@ import reactor.kafka.receiver.ReceiverRecord * @since May 2018 */ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> = receiver.receive() .doOnNext { it.receiverOffset().acknowledge() } @@ -45,31 +39,12 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr companion object { private val logger = Logger(KafkaSource::class) - private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" - private const val USERNAME = "admin" - private const val PASSWORD = "admin_secret" - private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" - private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name - fun create(bootstrapServers: String, topics: Set<String>) = KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { - val props = mapOf<String, Any>( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", - ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", - ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", - - - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, - SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, - SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG - ) + val props = KafkaPropertiesFactory.create(bootstrapServers) return ReceiverOptions.create<ByteArray, ByteArray>(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 2de89aae..6ee640a4 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -19,9 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl +import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.ReceiverRecord import java.util.concurrent.ConcurrentLinkedQueue /** @@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider { override fun reset() = consumedMessages.clear() - fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + fun update(record: ConsumerRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -61,10 +61,11 @@ internal class Consumer : ConsumerStateProvider { } } -internal class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> - val topicToConsumer = kafkaTopics.associate { it to Consumer() } +internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) { + + fun createConsumersFor(topics: Set<String>) = + KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource -> + val topicToConsumer = topics.associateWith { Consumer() } kafkaSource.start() .map { val topic = it.topic() @@ -75,6 +76,6 @@ internal class ConsumerFactory(private val kafkaBootstrapServers: String) { } companion object { - private val logger = Logger(ConsumerFactory::class) + private val logger = Logger(DcaeAppConsumerFactory::class) } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 7f4e62bb..25178594 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer @@ -43,7 +43,7 @@ fun main(args: Array<String>): Unit = private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess { logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" } - val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val consumerFactory = DcaeAppConsumerFactory(config.kafkaBootstrapServers) val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt index a594215b..728eb2fd 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -19,12 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import org.apache.kafka.clients.consumer.ConsumerRecord import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import reactor.kafka.receiver.ReceiverRecord /** @@ -77,6 +75,3 @@ private fun assertState(cut: Consumer, vararg values: ByteArray) { assertThat(cut.currentState().messagesCount) .isEqualTo(values.size) } - -private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = - ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt new file mode 100644 index 00000000..71f31aba --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +object DcaeAppConsumerFactoryTest : Spek({ + describe("DcaeAppConsumerFactory") { + val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41" + val dcaeAppConsumerFactory = DcaeAppConsumerFactory(kafkaBootstrapServers) + + on("creation of consumer") { + val kafkaTopics = setOf("topic1", "topic2") + val consumer = dcaeAppConsumerFactory.createConsumersFor(kafkaTopics) + + it("should create consumer") { + assertThat(consumer).isNotEmpty.hasSize(2) + assertThat(consumer).containsOnlyKeys("topic1", "topic2") + } + } + + on("empty kafkaTopics set") { + val emptyKafkaTopics = emptySet<String>() + val consumer = dcaeAppConsumerFactory.createConsumersFor(emptyKafkaTopics) + + it("should not create consumer") { + assertThat(consumer).isEmpty() + } + } + + + } +})
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index e3e61c81..4ebfb469 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -46,7 +46,7 @@ import kotlin.test.assertFailsWith * @since August 2018 */ internal class DcaeAppSimulatorTest : Spek({ - lateinit var consumerFactory: ConsumerFactory + lateinit var consumerFactory: DcaeAppConsumerFactory lateinit var messageStreamValidation: MessageStreamValidation lateinit var perf3gpp_consumer: Consumer lateinit var faults_consumer: Consumer @@ -59,7 +59,7 @@ internal class DcaeAppSimulatorTest : Spek({ faults_consumer = mock() cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf( + whenever(consumerFactory.createConsumersFor(anySet())).thenReturn(mapOf( PERF3GPP_TOPIC to perf3gpp_consumer, FAULTS_TOPICS to faults_consumer)) } @@ -81,12 +81,12 @@ internal class DcaeAppSimulatorTest : Spek({ it("should subscribe to given topics") { cut.listenToTopics(TWO_TOPICS) - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } it("should subscribe to given topics when called with comma separated list") { cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS") - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt index de74f628..5bfbc91c 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt @@ -19,36 +19,66 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import com.nhaarman.mockitokotlin2.doNothing +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import reactor.core.publisher.Flux +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOffset +import reactor.kafka.receiver.ReceiverRecord +import reactor.test.StepVerifier /** * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> * @since August 2018 */ internal class KafkaSourceTest : Spek({ - val servers = "kafka1:9080,kafka2:9080" - val topics = setOf("topic1", "topic2") - describe("receiver options") { - val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() + describe("KafkaSource"){ + given("mocked Kafka Receiver"){ + val mockedKafkaReceiver = mock<KafkaReceiver<ByteArray, ByteArray>>() + val mockedReceiverRecord = mock<ReceiverRecord<ByteArray, ByteArray>>() + whenever(mockedKafkaReceiver.receive()).thenReturn(Flux.just(mockedReceiverRecord)) + on("function that starts KafkaSource") { + val mockedReceiverOffset = mock<ReceiverOffset>() + whenever(mockedReceiverRecord.receiverOffset()).thenReturn(mockedReceiverOffset) + doNothing().`when`(mockedReceiverOffset).acknowledge() - fun verifyProperty(key: String, expectedValue: Any) { - it("should have $key option set") { - assertThat(options.consumerProperty(key)) - .isEqualTo(expectedValue) + val testedFunction = { KafkaSource(mockedKafkaReceiver).start() } + it("should emmit receiver record") { + StepVerifier.create(testedFunction()) + .expectSubscription() + .expectNext(mockedReceiverRecord) + .expectComplete() + .verify() + } + } + } + } + + given("parameters for factory methods") { + val servers = "kafka1:9080,kafka2:9080" + val topics = setOf("topic1", "topic2") + + on("createReceiverOptions call with topics set") { + val options = KafkaSource.createReceiverOptions(servers, topics) + it("should generate options with provided topics") { + assertThat(options!!.subscriptionTopics()).contains("topic1", "topic2") } } - verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) - verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") - verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") - verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + on("create call"){ + val kafkaSource = KafkaSource.create(servers, topics) + it("should generate KafkaSource object") { + assertThat(kafkaSource).isInstanceOf(KafkaSource::class.java) + } + } } -})
\ No newline at end of file + +}) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt new file mode 100644 index 00000000..ac26cf17 --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import org.apache.kafka.clients.consumer.ConsumerRecord +import reactor.kafka.receiver.ReceiverRecord + +internal fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = + ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml index 61110dfb..14e43d87 100644 --- a/sources/hv-collector-domain/pom.xml +++ b/sources/hv-collector-domain/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> </parent> <artifactId>hv-collector-domain</artifactId> diff --git a/sources/hv-collector-health-check/pom.xml b/sources/hv-collector-health-check/pom.xml index 2dbe264d..9514d009 100644 --- a/sources/hv-collector-health-check/pom.xml +++ b/sources/hv-collector-health-check/pom.xml @@ -15,7 +15,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> </parent> <artifactId>hv-collector-health-check</artifactId> diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index 45a32729..7ffb5ebf 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -15,7 +15,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -65,6 +65,13 @@ <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-commandline</artifactId> <version>${project.parent.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-kafka</artifactId> + <version>${project.parent.version}</version> + <scope>compile</scope> </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> @@ -73,23 +80,35 @@ <scope>test</scope> </dependency> <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-prometheus</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <optional>true</optional> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>runtime</scope> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-test</artifactId> + <scope>test</scope> </dependency> - </dependencies> </project> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt new file mode 100644 index 00000000..be7b5cca --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt @@ -0,0 +1,58 @@ +/* +* ============LICENSE_START======================================================= +* dcaegen2-collectors-veshv +* ================================================================================ +* Copyright (C) 2019 NOKIA +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.dcae.collectors.veshv.kafkaconsumer.config + +import arrow.core.Option +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.* +import org.onap.dcae.collectors.veshv.commandline.hasOption +import org.onap.dcae.collectors.veshv.commandline.intValue +import org.onap.dcae.collectors.veshv.commandline.stringValue +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding +import java.net.InetSocketAddress + +internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) { + override val cmdLineOptionsList: List<CommandLineOption> = listOf( + LISTEN_PORT, + KAFKA_TOPICS, + KAFKA_SERVERS, + DISABLE_PROCESSING + ) + + override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> = + binding { + val listenPort = cmdLine.intValue(LISTEN_PORT).bind() + val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS) + .map { it.split(',').toSet() } + .bind() + val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind() + val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING) + + KafkaConsumerConfiguration( + InetSocketAddress(listenPort), + kafkaTopics, + kafkaBootstrapServers, + disableProcessing + ) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt new file mode 100644 index 00000000..cdd4c30a --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.config + +import java.net.InetSocketAddress + +internal data class KafkaConsumerConfiguration( + val apiAddress: InetSocketAddress, + val kafkaTopics: Set<String>, + val kafkaBootstrapServers: String, + val disableProcessing: Boolean +) diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt new file mode 100644 index 00000000..dd24345d --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer + +internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + offsetConsumer.update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt index fa15587c..7e77bae9 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,4 +19,26 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer -fun main(args: Array<String>) = println("Guten tag") +import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess + +private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.kafkaconsumer" +const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" + +fun main(args: Array<String>): Unit = + ArgKafkaConsumerConfiguration().parse(args) + .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp) + .let(ExitCode::doExit) + + +private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess { + PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE) + .start().block()!!.await().block() // TODO refactor netty server logic + + return ExitSuccess +} diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index b7ea126f..e576a88f 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,14 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.kafkaconsumer +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import kotlin.test.assertTrue - -object SampleTest : Spek({ - describe("sample test") { - assertTrue(true) - } -}) +interface Metrics { + fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0) + fun notifyMessageTravelTime(messageSentTimeMicros: Long) +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt new file mode 100644 index 00000000..da1225e9 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.core.instrument.Timer +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.utils.TimeUtils +import reactor.core.publisher.Mono +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong + +internal class MicrometerMetrics constructor( + private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) +) : Metrics { + + private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) + private val travelTime = Timer.builder(name("travel.time")) + .publishPercentileHistogram(true) + .register(registry) + + fun lastStatus(): Mono<String> = Mono.fromCallable { + registry.scrape() + } + + override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) { + // TODO use topic and partition + currentOffset.lazySet(offset) + } + + override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { + travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now())) + } + + companion object { + val INSTANCE by lazy { MicrometerMetrics() } + + private const val PREFIX = "hv-kafka-consumer" + private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt new file mode 100644 index 00000000..29a17fc1 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http + +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRequest +import reactor.netty.http.server.HttpServerResponse +import java.net.InetSocketAddress + +internal class PrometheusApiServer(private val listenAddress: InetSocketAddress, + private val metrics: MicrometerMetrics) { + + private val logger = Logger(PrometheusApiServer::class) + + fun start(): Mono<NettyServerHandle> = + HttpServer.create() + .tcpConfiguration { it.addressSupplier { listenAddress } } + .route { it.get("/monitoring/prometheus", ::metricsHandler) } + .bind() + .map { NettyServerHandle(it) } + .doOnSuccess(::logServerStarted) + + + private fun metricsHandler(_req: HttpServerRequest, resp: HttpServerResponse) = + resp.sendString(metrics.lastStatus()) + + + private fun logServerStarted(handle: ServerHandle) = + logger.info { + "Kafka Consumer API server is up and listening on ${handle.host}:${handle.port}" + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt new file mode 100644 index 00000000..1481a224 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + + +internal class OffsetConsumer(private val metrics: Metrics) { + + fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic ${topicPartition.topic()} " + + "on partition ${topicPartition.partition()}" + } + metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition()) + } + + fun reset() = Unit + + companion object { + val logger = Logger(OffsetConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..da0f7f4b --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2019 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> +<configuration> + <property name="COMPONENT_NAME" + value="hv-ves-kafka-consumer-app"/> + <property name="COMPONENT_SHORT_NAME" + value="kafka-consumer-app"/> + + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt new file mode 100644 index 00000000..b0eb7a52 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt @@ -0,0 +1,154 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.verifyZeroInteractions +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.runBlockingTest +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer + +@ExperimentalCoroutinesApi +object KafkaSourceTest : Spek({ + given("KafkaSource") { + val testDispatcher = TestCoroutineDispatcher() + val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>() + afterEachTest { + testDispatcher.cleanupTestCoroutines() + } + given("single topicName and partition") { + val topics = setOf("topicName") + val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + on("started KafkaSource") { + val topicPartition = createTopicPartition("topicName") + val topicPartitions = setOf(topicPartition) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset)) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + job.cancelAndJoin() + } + + it("should call update function on topicName") { + verify(mockedOffsetConsumer).update(topicPartition, newOffset) + } + } + } + + given("two topics with partition") { + val topics = setOf(topicName1, topicName2) + val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + + on("started KafkaSource for two iteration of while loop") { + val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>() + val offsetArgumentCaptor = argumentCaptor<Long>() + val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>() + val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>() + val topicPartition1 = createTopicPartition(topicName1) + val topicPartition2 = createTopicPartition(topicName2) + val topicPartitions = setOf(topicPartition1, topicPartition2) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + val partitionToOffset1 = + mapOf(topicPartition1 to newOffset, + topicPartition2 to anotherNewOffset) + val partitionToOffset2 = + mapOf(topicPartition1 to anotherNewOffset, + topicPartition2 to newOffset) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(partitionToOffset1, partitionToOffset2) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(), + offsetArgumentCaptor.capture()) + + testDispatcher.advanceTimeBy(updateIntervalInMs) + + verify(mockedOffsetConsumer, times(topicsAmountAfterInterval)) + .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture()) + + it("should calls update function with proper arguments - before interval") { + assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1) + assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) + assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2) + assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) + } + it("should calls update function with proper arguments - after interval") { + assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1) + assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2) + assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset) + } + job.cancelAndJoin() + } + } + } + + given("empty topicName list") { + val emptyTopics = emptySet<String>() + val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + + on("call of function start") { + val emptyTopicPartitions = setOf(null) + whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions) + whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) + .thenReturn(emptyMap()) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + job.cancelAndJoin() + } + + it("should not interact with OffsetConsumer") { + verifyZeroInteractions(mockedOffsetConsumer) + } + } + } + + } +}) + +private const val updateIntervalInMs = 10L +private const val partitionNumber = 0 +private const val newOffset = 2L +private const val anotherNewOffset = 10L +private const val topicName1 = "topicName1" +private const val topicName2 = "topicName2" +private const val topicsAmount = 2 +private const val topicsAmountAfterInterval = 4 +fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt new file mode 100644 index 00000000..96ba588f --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer +import java.time.Instant +import java.util.concurrent.TimeUnit + +object MicrometerMetricsTest : Spek({ + val PREFIX = "hv-kafka-consumer" + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: PrometheusMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) + cut = MicrometerMetrics(registry) + } + + describe("Timers") { + val arbitraryMessageTravelTime = 100L + val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000 + val timerName = "$PREFIX.travel.time" + + on("notifyMessageTravelTime") { + it("should update timer $timerName") { + + val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000 + cut.notifyMessageTravelTime(messageSentTimeMicros) + val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000 + + registry.verifyTimer(timerName) { timer -> + val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble() + val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble() + assertThat(timer.totalTime(TimeUnit.MICROSECONDS)) + .isLessThanOrEqualTo(travelTimeAfterNotify) + .isGreaterThanOrEqualTo(travelTimeBeforeNotify) + + } + } + } + } + + describe("Gauges") { + val gaugeName = "$PREFIX.consumer.offset" + + on("notifyOffsetChanged") { + val offset = 966L + + it("should update $gaugeName") { + cut.notifyOffsetChanged(offset, "sample_topic", 1) + + registry.verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) + } + } + } + } +}) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt new file mode 100644 index 00000000..242f27be --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import org.apache.kafka.common.TopicPartition +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics + +object OffsetConsumerTest : Spek({ + describe("OffsetConsumer with metrics") { + val mockedMetrics = mock<Metrics>() + val offsetConsumer = OffsetConsumer(mockedMetrics) + given("topicName with partition"){ + val topicPartition = TopicPartition(topicName, partitionNumber) + + on("new update method call") { + offsetConsumer.update(topicPartition, newOffset) + + it("should notify message newOffset metric") { + verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber) + } + } + } + } +}) + +private const val partitionNumber = 1 +private const val newOffset: Long = 99 +private const val topicName = "sample-topicName" diff --git a/sources/hv-collector-kafka/pom.xml b/sources/hv-collector-kafka/pom.xml new file mode 100644 index 00000000..73265971 --- /dev/null +++ b/sources/hv-collector-kafka/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + + <artifactId>hv-collector-kafka</artifactId> + + <description>VES HighVolume Collector :: Kafka</description> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + </dependency> + <dependency> + <groupId>com.nhaarman.mockitokotlin2</groupId> + <artifactId>mockito-kotlin</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> +</project> diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt new file mode 100644 index 00000000..b654274e --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +object KafkaPropertiesFactory { + private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" + private const val USERNAME = "admin" + private const val PASSWORD = "admin_secret" + private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" + private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name + + fun create(bootstrapServers: String): Map<String, Any> { + val props = mapOf<String, Any>( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", + + + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, + SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, + SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG + ) + return props + } +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt new file mode 100644 index 00000000..9760fb98 --- /dev/null +++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it + +internal class KafkaPropertiesFactoryTest : Spek({ + val servers = "kafka1:9080,kafka2:9080" + + describe("KafkaPropertiesFactory") { + val options = KafkaPropertiesFactory.create(servers) + + fun verifyProperty(key: String, expectedValue: Any) { + it("should have $key option set") { + assertThat(options.getValue(key)) + .isEqualTo(expectedValue) + } + } + + verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) + verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-consumer") + verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-consumers") + verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + verifyProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000") + verifyProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) + verifyProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM) + verifyProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG) + } +}) + +private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" +private const val USERNAME = "admin" +private const val PASSWORD = "admin_secret" +internal val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name +internal const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index 1b4de0ce..f7da1229 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> </parent> <artifactId>hv-collector-main</artifactId> diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index f260f158..66f3a5fc 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -20,12 +20,10 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Option -import arrow.core.Try import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.Meter +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.Timer -import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.assertj.core.api.Assertions.assertThat @@ -43,6 +41,9 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import org.onap.ves.VesEventOuterClass @@ -65,29 +66,6 @@ object MicrometerMetricsTest : Spek({ cut = MicrometerMetrics(registry) } - fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName) - - fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = - Try { - map(search) - }.fold( - { ex -> assertThat(ex).doesNotThrowAnyException() }, - verifier - ) - - fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) = - verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier) - - fun <T> verifyTimer(name: String, verifier: (Timer) -> T) = - verifyMeter(registrySearch(name), RequiredSearch::timer, verifier) - - fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = - verifyMeter(search, RequiredSearch::counter, verifier) - - fun <T> verifyCounter(name: String, verifier: (Counter) -> T) = - verifyCounter(registrySearch(name), verifier) - - fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) { fun <T : Meter> verifyAllMetersAreUnchangedBut( clazz: KClass<T>, @@ -120,7 +98,7 @@ object MicrometerMetricsTest : Spek({ val bytes = 128 cut.notifyBytesReceived(bytes) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) } } @@ -139,7 +117,7 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { cut.notifyMessageReceived(emptyWireProtocolFrame()) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } } @@ -152,7 +130,7 @@ object MicrometerMetricsTest : Spek({ val bytes = 888 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes)) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) } } @@ -177,7 +155,7 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { cut.notifyMessageSent(routedMessage(topicName1)) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut( @@ -196,11 +174,11 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageSent(routedMessage(topicName2)) cut.notifyMessageSent(routedMessage(topicName2)) - verifyCounter(registrySearch(counterName).tag("topic", topicName1)) { + registry.verifyCounter(counterName, Tags.of("topic", topicName1)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch(counterName).tag("topic", topicName2)) { + registry.verifyCounter(counterName, Tags.of("topic", topicName2)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -214,7 +192,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs))) - verifyTimer(counterName) { timer -> + registry.verifyTimer(counterName) { timer -> assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) } verifyCountersAndTimersAreUnchangedBut( @@ -233,7 +211,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs))) - verifyTimer(counterName) { timer -> + registry.verifyTimer(counterName) { timer -> assertThat(timer.mean(TimeUnit.MILLISECONDS)) .isGreaterThanOrEqualTo(latencyMs.toDouble()) .isLessThanOrEqualTo(latencyMs + 10000.0) @@ -256,7 +234,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause") @@ -271,11 +249,11 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageDropped(INVALID_MESSAGE) cut.notifyMessageDropped(INVALID_MESSAGE) - verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -290,7 +268,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientConnected() cut.notifyClientConnected() - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName) @@ -307,7 +285,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientDisconnected() cut.notifyClientDisconnected() - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName) @@ -324,7 +302,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER) cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause") @@ -338,11 +316,11 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) - verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -359,7 +337,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientConnected() cut.notifyClientDisconnected() - verifyGauge(gaugeName) { + registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(2.0, doublePrecision) } } @@ -368,7 +346,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientDisconnected() cut.notifyClientDisconnected() - verifyGauge(gaugeName) { + registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } @@ -376,7 +354,7 @@ object MicrometerMetricsTest : Spek({ it("should calculate negative difference between connected and disconnected clients") { cut.notifyClientDisconnected() - verifyGauge(gaugeName) { + registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } diff --git a/sources/hv-collector-server/pom.xml b/sources/hv-collector-server/pom.xml index 5c323393..6bf081cf 100644 --- a/sources/hv-collector-server/pom.xml +++ b/sources/hv-collector-server/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-ssl/pom.xml b/sources/hv-collector-ssl/pom.xml index 043b8c1e..c5f18c8c 100644 --- a/sources/hv-collector-ssl/pom.xml +++ b/sources/hv-collector-ssl/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-test-utils/pom.xml b/sources/hv-collector-test-utils/pom.xml index bf70e180..f187cc6f 100644 --- a/sources/hv-collector-test-utils/pom.xml +++ b/sources/hv-collector-test-utils/pom.xml @@ -14,7 +14,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -86,5 +86,10 @@ <artifactId>logback-classic</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-prometheus</artifactId> + <scope>compile</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt new file mode 100644 index 00000000..1aefdb34 --- /dev/null +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.utils + +import arrow.core.Try +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Tags +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.search.RequiredSearch +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions + + +fun <T> PrometheusMeterRegistry.verifyGauge(name: String, verifier: (Gauge) -> T) = + verifyMeter(findMeter(name), RequiredSearch::gauge, verifier) + +fun <T> PrometheusMeterRegistry.verifyTimer(name: String, verifier: (Timer) -> T) = + verifyMeter(findMeter(name), RequiredSearch::timer, verifier) + +fun <T> PrometheusMeterRegistry.verifyCounter(name: String, verifier: (Counter) -> T) = + verifyCounter(findMeter(name), verifier) + +fun <T> PrometheusMeterRegistry.verifyCounter(name: String, tags: Tags, verifier: (Counter) -> T) = + verifyCounter(findMeter(name).tags(tags), verifier) + +private fun PrometheusMeterRegistry.findMeter(meterName: String) = RequiredSearch.`in`(this).name(meterName) + +private fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = + verifyMeter(search, RequiredSearch::counter, verifier) + +private inline fun <M, T> verifyMeter(search: RequiredSearch, + map: (RequiredSearch) -> M, + verifier: (M) -> T) = + Try { map(search) }.fold( + { ex -> Assertions.assertThat(ex).doesNotThrowAnyException() }, + verifier + )
\ No newline at end of file diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml index 76609ae6..5c117125 100644 --- a/sources/hv-collector-utils/pom.xml +++ b/sources/hv-collector-utils/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-ves-message-generator/pom.xml b/sources/hv-collector-ves-message-generator/pom.xml index 2c50d73c..1dc1b534 100644 --- a/sources/hv-collector-ves-message-generator/pom.xml +++ b/sources/hv-collector-ves-message-generator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml index 65b7d6f7..53912ee8 100644 --- a/sources/hv-collector-xnf-simulator/pom.xml +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 04a0c14a..7a28818c 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -61,7 +61,7 @@ private fun startServers(config: SimulatorConfiguration): ExitCode { ) val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations()) .start(config.listenAddress) - .block() + .block()!! logger.info { "Started xNF Simulator API server" } HealthState.INSTANCE.changeState(HealthDescription.IDLE) diff --git a/sources/pom.xml b/sources/pom.xml index c7ba4886..1a6e1188 100644 --- a/sources/pom.xml +++ b/sources/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -125,7 +125,7 @@ <dependency> <groupId>${project.groupId}</groupId> <artifactId>hv-collector-analysis</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> </dependency> </dependencies> </plugin> @@ -142,6 +142,7 @@ <module>hv-collector-dcae-app-simulator</module> <module>hv-collector-domain</module> <module>hv-collector-health-check</module> + <module>hv-collector-kafka</module> <module>hv-collector-kafka-consumer</module> <module>hv-collector-main</module> <module>hv-collector-server</module> |