summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--build/hv-collector-analysis/pom.xml2
-rw-r--r--build/hv-collector-coverage/pom.xml7
-rw-r--r--build/pom.xml2
-rw-r--r--pom.xml15
-rw-r--r--sources/hv-collector-commandline/pom.xml2
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt211
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt47
-rw-r--r--sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt4
-rw-r--r--sources/hv-collector-configuration/pom.xml2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt6
-rw-r--r--sources/hv-collector-core/pom.xml2
-rw-r--r--sources/hv-collector-ct/pom.xml2
-rw-r--r--sources/hv-collector-dcae-app-simulator/pom.xml12
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt14
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt4
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt29
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt15
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt4
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt5
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt54
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt8
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt64
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt26
-rw-r--r--sources/hv-collector-domain/pom.xml2
-rw-r--r--sources/hv-collector-health-check/pom.xml2
-rw-r--r--sources/hv-collector-kafka-consumer/pom.xml35
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt58
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt29
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt48
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt26
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt (renamed from sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt)17
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt59
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt54
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt42
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/resources/logback.xml94
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt154
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt85
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt52
-rw-r--r--sources/hv-collector-kafka/pom.xml79
-rw-r--r--sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt53
-rw-r--r--sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt63
-rw-r--r--sources/hv-collector-main/pom.xml2
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt68
-rw-r--r--sources/hv-collector-server/pom.xml2
-rw-r--r--sources/hv-collector-ssl/pom.xml2
-rw-r--r--sources/hv-collector-test-utils/pom.xml7
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt55
-rw-r--r--sources/hv-collector-utils/pom.xml2
-rw-r--r--sources/hv-collector-ves-message-generator/pom.xml2
-rw-r--r--sources/hv-collector-xnf-simulator/pom.xml2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt2
-rw-r--r--sources/pom.xml5
-rw-r--r--version.properties2
53 files changed, 1353 insertions, 287 deletions
diff --git a/build/hv-collector-analysis/pom.xml b/build/hv-collector-analysis/pom.xml
index 32b66e0d..118509b5 100644
--- a/build/hv-collector-analysis/pom.xml
+++ b/build/hv-collector-analysis/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-build</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml
index bfde3ae6..1c368a4f 100644
--- a/build/hv-collector-coverage/pom.xml
+++ b/build/hv-collector-coverage/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-build</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -135,6 +135,11 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-kafka</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-kafka-consumer</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/build/pom.xml b/build/pom.xml
index c244bc5f..15702390 100644
--- a/build/pom.xml
+++ b/build/pom.xml
@@ -32,7 +32,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/pom.xml b/pom.xml
index 662e17e8..2098e6fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,13 +33,13 @@
<parent>
<groupId>org.onap.oparent</groupId>
<artifactId>oparent</artifactId>
- <version>1.2.1</version>
+ <version>2.0.0</version>
<relativePath/>
</parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<name>dcaegen2-collectors-veshv</name>
<description>VES HighVolume Collector</description>
<packaging>pom</packaging>
@@ -50,6 +50,7 @@
</modules>
<properties>
+ <coroutines.version>1.3.0-M2</coroutines.version>
<kotlin.version>1.3.31</kotlin.version>
<arrow.version>0.9.0</arrow.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
@@ -432,7 +433,7 @@
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
- <version>1.1.1</version>
+ <version>${coroutines.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
@@ -520,7 +521,7 @@
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
- <version>1.0.8</version>
+ <version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
@@ -587,6 +588,12 @@
<version>3.1.7.RELEASE</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-test</artifactId>
+ <version>${coroutines.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml
index f37f275e..90ba8085 100644
--- a/sources/hv-collector-commandline/pom.xml
+++ b/sources/hv-collector-commandline/pom.xml
@@ -7,7 +7,7 @@
<parent>
<artifactId>hv-collector-sources</artifactId>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>hv-collector-commandline</artifactId>
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
index d08f6c09..9d875571 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
@@ -23,109 +23,118 @@ import org.apache.commons.cli.Option
enum class CommandLineOption(val option: Option, val required: Boolean = false) {
- HEALTH_CHECK_API_PORT(
- Option.builder("H")
- .longOpt("health-check-api-port")
- .hasArg()
- .desc("Health check rest api listen port")
- .build()
- ),
- CONFIGURATION_FILE(
- Option.builder("c")
- .longOpt("configuration-file")
- .hasArg()
- .desc("Json file containing HV-VES configuration")
- .build(),
- required = true
- ),
- LISTEN_PORT(
- Option.builder("p")
- .longOpt("listen-port")
- .hasArg()
- .desc("Listen port")
- .build(),
- required = true
- ),
- VES_HV_PORT(
- Option.builder("v")
- .longOpt("ves-port")
- .hasArg()
- .desc("VesHvCollector port")
- .build(),
- required = true
- ),
- VES_HV_HOST(
- Option.builder("h")
- .longOpt("ves-host")
- .hasArg()
- .desc("VesHvCollector host")
- .build(),
- required = true
- ),
- KAFKA_SERVERS(
- Option.builder("s")
- .longOpt("kafka-bootstrap-servers")
- .hasArg()
- .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
- .build(),
- required = true
- ),
- KAFKA_TOPICS(
- Option.builder("f")
- .longOpt("kafka-topics")
- .hasArg()
- .desc("Comma-separated Kafka topics")
- .build(),
- required = true
- ),
- SSL_DISABLE(
- Option.builder("l")
- .longOpt("ssl-disable")
- .desc("Disable SSL encryption")
- .build()
- ),
- KEY_STORE_FILE(
- Option.builder("k")
- .longOpt("key-store")
- .hasArg()
- .desc("Key store in PKCS12 format")
- .build()
- ),
- KEY_STORE_PASSWORD_FILE(
- Option.builder("kp")
- .longOpt("key-store-password-file")
- .hasArg()
- .desc("File with key store password")
- .build()
- ),
- TRUST_STORE_FILE(
- Option.builder("t")
- .longOpt("trust-store")
- .hasArg()
- .desc("File with trusted certificate bundle in PKCS12 format")
- .build()
- ),
- TRUST_STORE_PASSWORD_FILE(
- Option.builder("tp")
- .longOpt("trust-store-password-file")
- .hasArg()
- .desc("File with trust store password")
- .build()
- ),
- MAXIMUM_PAYLOAD_SIZE_BYTES(
- Option.builder("m")
- .longOpt("max-payload-size")
- .hasArg()
- .desc("Maximum supported payload size in bytes")
- .build()
- );
+ CONFIGURATION_FILE(required = true,
+ option = option {
+ shortOpt = "c"
+ longOpt = "configuration-file"
+ desc = "Json file containing HV-VES configuration"
+ hasArgument = true
+ }),
+ LISTEN_PORT(required = true,
+ option = option {
+ shortOpt = "p"
+ longOpt = "listen-port"
+ desc = "Listen port"
+ hasArgument = true
+ }),
+ VES_HV_PORT(required = true,
+ option = option {
+ shortOpt = "v"
+ longOpt = "ves-port"
+ desc = "VesHvCollector port"
+ hasArgument = true
+ }),
+ VES_HV_HOST(required = true,
+ option = option {
+ shortOpt = "h"
+ longOpt = "ves-host"
+ desc = "VesHvCollector host"
+ hasArgument = true
+ }),
+ KAFKA_SERVERS(required = true,
+ option = option {
+ shortOpt = "s"
+ longOpt = "kafka-bootstrap-servers"
+ desc = "Comma-separated Kafka bootstrap servers in <host>:<port> format"
+ hasArgument = true
+ }),
+ KAFKA_TOPICS(required = true,
+ option = option {
+ shortOpt = "f"
+ longOpt = "kafka-topics"
+ desc = "Comma-separated Kafka topics"
+ hasArgument = true
+ }),
+ HEALTH_CHECK_API_PORT(option {
+ shortOpt = "H"
+ longOpt = "health-check-api-port"
+ desc = "Health check rest api listen port"
+ hasArgument = true
+ }),
+ SSL_DISABLE(option {
+ shortOpt = "l"
+ longOpt = "ssl-disable"
+ desc = "Disable SSL encryption"
+ }),
+ KEY_STORE_FILE(option {
+ shortOpt = "k"
+ longOpt = "key-store"
+ desc = "Key store in PKCS12 format"
+ hasArgument = true
+ }),
+ KEY_STORE_PASSWORD_FILE(option {
+ shortOpt = "kp"
+ longOpt = "key-store-password-file"
+ desc = "File with key store password"
+ hasArgument = true
+ }),
+ TRUST_STORE_FILE(option {
+ shortOpt = "t"
+ longOpt = "trust-store"
+ desc = "File with trusted certificate bundle in PKCS12 format"
+ hasArgument = true
+ }),
+ TRUST_STORE_PASSWORD_FILE(option {
+ shortOpt = "tp"
+ longOpt = "trust-store-password-file"
+ desc = "File with trust store password"
+ hasArgument = true
+ }),
+ MAXIMUM_PAYLOAD_SIZE_BYTES(option {
+ shortOpt = "m"
+ longOpt = "max-payload-size"
+ desc = "Maximum supported payload size in bytes"
+ hasArgument = true
+ }),
+ DISABLE_PROCESSING(option {
+ shortOpt = "d"
+ longOpt = "disable-processing"
+ desc = "Message queue consumer option. Indicates whether messages should be fully processed"
+ });
- fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
+ fun environmentVariableName(prefix: String = ""): String =
option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
- "${prefix}_${mainPart}"
+ if (prefix.isNotBlank()) {
+ "${prefix}_${mainPart}"
+ } else {
+ mainPart
+ }
}
+}
+
- companion object {
- private const val DEFAULT_ENV_PREFIX = "VESHV"
- }
+private class OptionDSL {
+ lateinit var shortOpt: String
+ lateinit var longOpt: String
+ lateinit var desc: String
+ var hasArgument: Boolean = false
}
+
+private fun option(conf: OptionDSL.() -> Unit): Option {
+ val dsl = OptionDSL().apply(conf)
+ return Option.builder(dsl.shortOpt)
+ .longOpt(dsl.longOpt)
+ .hasArg(dsl.hasArgument)
+ .desc(dsl.desc)
+ .build()
+} \ No newline at end of file
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
index 6d8ba3ff..20ca97ed 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
@@ -40,28 +40,43 @@ fun handleWrongArgumentError(programName: String, err: WrongArgumentError): Exit
return ExitFailure(2)
}
-fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
- longValue(cmdLineOpt).getOrElse { default }
+inline class EnvPrefix(val it: String)
-fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
- optionValue(cmdLineOpt).getOrElse { default }
+private val DEFAULT_PREFIX = EnvPrefix("")
-fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
- intValue(cmdLineOpt).getOrElse { default }
+fun CommandLine.longValue(cmdLineOpt: CommandLineOption,
+ default: Long,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Long =
+ longValue(cmdLineOpt, envPrefix).getOrElse { default }
-fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
- optionValue(cmdLineOpt).map(String::toInt)
+fun CommandLine.stringValue(cmdLineOpt: CommandLineOption,
+ default: String,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): String =
+ optionValue(cmdLineOpt, envPrefix).getOrElse { default }
-fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
- optionValue(cmdLineOpt).map(String::toLong)
+fun CommandLine.intValue(cmdLineOpt: CommandLineOption,
+ default: Int,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Int =
+ intValue(cmdLineOpt, envPrefix).getOrElse { default }
-fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
- optionValue(cmdLineOpt)
+fun CommandLine.intValue(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Int> =
+ optionValue(cmdLineOpt, envPrefix).map(String::toInt)
-fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
+fun CommandLine.longValue(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Long> =
+ optionValue(cmdLineOpt, envPrefix).map(String::toLong)
+
+fun CommandLine.stringValue(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<String> =
+ optionValue(cmdLineOpt, envPrefix)
+
+fun CommandLine.hasOption(cmdLineOpt: CommandLineOption,
+ envPrefix: EnvPrefix = DEFAULT_PREFIX): Boolean =
this.hasOption(cmdLineOpt.option.opt) ||
- System.getenv(cmdLineOpt.environmentVariableName()) != null
+ System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) != null
-private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
+private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption, envPrefix: EnvPrefix) = Option.fromNullablesChain(
getOptionValue(cmdLineOpt.option.opt),
- { System.getenv(cmdLineOpt.environmentVariableName()) })
+ { System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) })
+
diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
index 6614e77f..e6776974 100644
--- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
+++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
@@ -53,8 +53,8 @@ class CommandLineOptionTest : Spek({
on("calling environmentVariableName") {
val result = opt.environmentVariableName()
- it("should return prefixed upper snake cased long option name") {
- assertThat(result).isEqualTo("VESHV_SSL_DISABLE")
+ it("should return upper snake cased long option name without prefix") {
+ assertThat(result).isEqualTo("SSL_DISABLE")
}
}
}
diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml
index ecda41c1..71131bbb 100644
--- a/sources/hv-collector-configuration/pom.xml
+++ b/sources/hv-collector-configuration/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>hv-collector-configuration</artifactId>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index c6730a4c..27560642 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -29,6 +29,7 @@ import org.apache.commons.cli.DefaultParser
import org.apache.commons.cli.Options
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
+import org.onap.dcae.collectors.veshv.commandline.EnvPrefix
import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
import org.onap.dcae.collectors.veshv.commandline.intValue
import org.onap.dcae.collectors.veshv.commandline.stringValue
@@ -42,7 +43,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
fun getConfigurationFile(args: Array<out String>): Either<WrongArgumentError, File> =
parse(args) {
- it.stringValue(CONFIGURATION_FILE).map(::File)
+ it.stringValue(CONFIGURATION_FILE, HV_VES_ENV_PREFIX).map(::File)
}.toEither {
WrongArgumentError(
message = "Base configuration filepath missing on command line",
@@ -51,7 +52,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
fun getHealthcheckPort(args: Array<out String>): Int =
parse(args) {
- it.intValue(HEALTH_CHECK_API_PORT)
+ it.intValue(HEALTH_CHECK_API_PORT, HV_VES_ENV_PREFIX)
}.getOrElse {
logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" }
DEFAULT_HEALTHCHECK_PORT
@@ -76,6 +77,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
.let { parser.parse(it, args) }
companion object {
+ private val HV_VES_ENV_PREFIX = EnvPrefix("VESHV")
private const val DEFAULT_HEALTHCHECK_PORT: Int = 6060
private val logger = Logger(HvVesCommandLineParser::class)
}
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index 263fb33a..9fc8b7e0 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-ct/pom.xml b/sources/hv-collector-ct/pom.xml
index 84116844..6a2cf1f9 100644
--- a/sources/hv-collector-ct/pom.xml
+++ b/sources/hv-collector-ct/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml
index 5c32623b..019e4929 100644
--- a/sources/hv-collector-dcae-app-simulator/pom.xml
+++ b/sources/hv-collector-dcae-app-simulator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -87,13 +87,15 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-test-utils</artifactId>
+ <artifactId>hv-collector-kafka</artifactId>
<version>${project.parent.version}</version>
- <scope>test</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 122d9bf0..beacfd79 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -28,9 +28,9 @@ import java.util.Collections.synchronizedMap
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
- private val messageStreamValidation: MessageStreamValidation) {
- private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf())
+internal class DcaeAppSimulator(private val consumerFactory: DcaeAppConsumerFactory,
+ private val messageStreamValidation: MessageStreamValidation) {
+ private val consumers: MutableMap<String, Consumer> = synchronizedMap(mutableMapOf())
fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
@@ -42,9 +42,9 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
}
logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" }
- synchronized(consumerState) {
- consumerState.clear()
- consumerState.putAll(consumerFactory.createConsumersForTopics(topics))
+ synchronized(consumers) {
+ consumers.clear()
+ consumers.putAll(consumerFactory.createConsumersFor(topics))
}
}
@@ -69,7 +69,7 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
fun validate(jsonDescription: InputStream, topic: String) =
messageStreamValidation.validate(jsonDescription, currentMessages(topic))
- private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic])
+ private fun consumerState(topic: String) = Option.fromNullable(consumers[topic])
private fun currentMessages(topic: String): List<ByteArray> =
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index 2458b203..992be6e3 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -143,14 +143,14 @@ internal class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
private val responseValid by lazy {
Responses.statusResponse(
name = "valid",
- message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE
+ message = VALID_RESPONSE_MESSAGE
)
}
private val responseInvalid by lazy {
Responses.statusResponse(
name = "invalid",
- message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE,
+ message = INVALID_RESPONSE_MESSAGE,
httpStatus = HttpStatus.BAD_REQUEST
)
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index a108eba7..1b664edc 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -19,12 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.kafka.receiver.KafkaReceiver
@@ -36,7 +31,6 @@ import reactor.kafka.receiver.ReceiverRecord
* @since May 2018
*/
internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
-
fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
receiver.receive()
.doOnNext { it.receiverOffset().acknowledge() }
@@ -45,31 +39,12 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
companion object {
private val logger = Logger(KafkaSource::class)
- private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
- private const val USERNAME = "admin"
- private const val PASSWORD = "admin_secret"
- private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
- private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
-
fun create(bootstrapServers: String, topics: Set<String>) =
KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
fun createReceiverOptions(bootstrapServers: String,
topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
- val props = mapOf<String, Any>(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
- ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
- ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
- ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000",
-
-
- CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
- SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,
- SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG
- )
+ val props = KafkaPropertiesFactory.create(bootstrapServers)
return ReceiverOptions.create<ByteArray, ByteArray>(props)
.addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
.addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 2de89aae..6ee640a4 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -19,9 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.ReceiverRecord
import java.util.concurrent.ConcurrentLinkedQueue
/**
@@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider {
override fun reset() = consumedMessages.clear()
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
@@ -61,10 +61,11 @@ internal class Consumer : ConsumerStateProvider {
}
}
-internal class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> =
- KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
- val topicToConsumer = kafkaTopics.associate { it to Consumer() }
+internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
+
+ fun createConsumersFor(topics: Set<String>) =
+ KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource ->
+ val topicToConsumer = topics.associateWith { Consumer() }
kafkaSource.start()
.map {
val topic = it.topic()
@@ -75,6 +76,6 @@ internal class ConsumerFactory(private val kafkaBootstrapServers: String) {
}
companion object {
- private val logger = Logger(ConsumerFactory::class)
+ private val logger = Logger(DcaeAppConsumerFactory::class)
}
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 7f4e62bb..25178594 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
@@ -43,7 +43,7 @@ fun main(args: Array<String>): Unit =
private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess {
logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" }
- val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
+ val consumerFactory = DcaeAppConsumerFactory(config.kafkaBootstrapServers)
val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes)
val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
index a594215b..728eb2fd 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
@@ -19,12 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import org.apache.kafka.clients.consumer.ConsumerRecord
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import reactor.kafka.receiver.ReceiverRecord
/**
@@ -77,6 +75,3 @@ private fun assertState(cut: Consumer, vararg values: ByteArray) {
assertThat(cut.currentState().messagesCount)
.isEqualTo(values.size)
}
-
-private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
- ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt
new file mode 100644
index 00000000..71f31aba
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+object DcaeAppConsumerFactoryTest : Spek({
+ describe("DcaeAppConsumerFactory") {
+ val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41"
+ val dcaeAppConsumerFactory = DcaeAppConsumerFactory(kafkaBootstrapServers)
+
+ on("creation of consumer") {
+ val kafkaTopics = setOf("topic1", "topic2")
+ val consumer = dcaeAppConsumerFactory.createConsumersFor(kafkaTopics)
+
+ it("should create consumer") {
+ assertThat(consumer).isNotEmpty.hasSize(2)
+ assertThat(consumer).containsOnlyKeys("topic1", "topic2")
+ }
+ }
+
+ on("empty kafkaTopics set") {
+ val emptyKafkaTopics = emptySet<String>()
+ val consumer = dcaeAppConsumerFactory.createConsumersFor(emptyKafkaTopics)
+
+ it("should not create consumer") {
+ assertThat(consumer).isEmpty()
+ }
+ }
+
+
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index e3e61c81..4ebfb469 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -46,7 +46,7 @@ import kotlin.test.assertFailsWith
* @since August 2018
*/
internal class DcaeAppSimulatorTest : Spek({
- lateinit var consumerFactory: ConsumerFactory
+ lateinit var consumerFactory: DcaeAppConsumerFactory
lateinit var messageStreamValidation: MessageStreamValidation
lateinit var perf3gpp_consumer: Consumer
lateinit var faults_consumer: Consumer
@@ -59,7 +59,7 @@ internal class DcaeAppSimulatorTest : Spek({
faults_consumer = mock()
cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
- whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf(
+ whenever(consumerFactory.createConsumersFor(anySet())).thenReturn(mapOf(
PERF3GPP_TOPIC to perf3gpp_consumer,
FAULTS_TOPICS to faults_consumer))
}
@@ -81,12 +81,12 @@ internal class DcaeAppSimulatorTest : Spek({
it("should subscribe to given topics") {
cut.listenToTopics(TWO_TOPICS)
- verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
+ verify(consumerFactory).createConsumersFor(TWO_TOPICS)
}
it("should subscribe to given topics when called with comma separated list") {
cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS")
- verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
+ verify(consumerFactory).createConsumersFor(TWO_TOPICS)
}
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
index de74f628..5bfbc91c 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
@@ -19,36 +19,66 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import com.nhaarman.mockitokotlin2.doNothing
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import reactor.core.publisher.Flux
+import reactor.kafka.receiver.KafkaReceiver
+import reactor.kafka.receiver.ReceiverOffset
+import reactor.kafka.receiver.ReceiverRecord
+import reactor.test.StepVerifier
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
* @since August 2018
*/
internal class KafkaSourceTest : Spek({
- val servers = "kafka1:9080,kafka2:9080"
- val topics = setOf("topic1", "topic2")
- describe("receiver options") {
- val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable()
+ describe("KafkaSource"){
+ given("mocked Kafka Receiver"){
+ val mockedKafkaReceiver = mock<KafkaReceiver<ByteArray, ByteArray>>()
+ val mockedReceiverRecord = mock<ReceiverRecord<ByteArray, ByteArray>>()
+ whenever(mockedKafkaReceiver.receive()).thenReturn(Flux.just(mockedReceiverRecord))
+ on("function that starts KafkaSource") {
+ val mockedReceiverOffset = mock<ReceiverOffset>()
+ whenever(mockedReceiverRecord.receiverOffset()).thenReturn(mockedReceiverOffset)
+ doNothing().`when`(mockedReceiverOffset).acknowledge()
- fun verifyProperty(key: String, expectedValue: Any) {
- it("should have $key option set") {
- assertThat(options.consumerProperty(key))
- .isEqualTo(expectedValue)
+ val testedFunction = { KafkaSource(mockedKafkaReceiver).start() }
+ it("should emmit receiver record") {
+ StepVerifier.create(testedFunction())
+ .expectSubscription()
+ .expectNext(mockedReceiverRecord)
+ .expectComplete()
+ .verify()
+ }
+ }
+ }
+ }
+
+ given("parameters for factory methods") {
+ val servers = "kafka1:9080,kafka2:9080"
+ val topics = setOf("topic1", "topic2")
+
+ on("createReceiverOptions call with topics set") {
+ val options = KafkaSource.createReceiverOptions(servers, topics)
+ it("should generate options with provided topics") {
+ assertThat(options!!.subscriptionTopics()).contains("topic1", "topic2")
}
}
- verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
- verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator")
- verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators")
- verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
- verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
- verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ on("create call"){
+ val kafkaSource = KafkaSource.create(servers, topics)
+ it("should generate KafkaSource object") {
+ assertThat(kafkaSource).isInstanceOf(KafkaSource::class.java)
+ }
+ }
}
-}) \ No newline at end of file
+
+})
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt
new file mode 100644
index 00000000..ac26cf17
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import reactor.kafka.receiver.ReceiverRecord
+
+internal fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+ ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml
index 61110dfb..14e43d87 100644
--- a/sources/hv-collector-domain/pom.xml
+++ b/sources/hv-collector-domain/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>hv-collector-domain</artifactId>
diff --git a/sources/hv-collector-health-check/pom.xml b/sources/hv-collector-health-check/pom.xml
index 2dbe264d..9514d009 100644
--- a/sources/hv-collector-health-check/pom.xml
+++ b/sources/hv-collector-health-check/pom.xml
@@ -15,7 +15,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>hv-collector-health-check</artifactId>
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml
index 45a32729..7ffb5ebf 100644
--- a/sources/hv-collector-kafka-consumer/pom.xml
+++ b/sources/hv-collector-kafka-consumer/pom.xml
@@ -15,7 +15,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -65,6 +65,13 @@
<groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-commandline</artifactId>
<version>${project.parent.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-kafka</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
@@ -73,23 +80,35 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <optional>true</optional>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>runtime</scope>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-test</artifactId>
+ <scope>test</scope>
</dependency>
-
</dependencies>
</project>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
new file mode 100644
index 00000000..be7b5cca
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -0,0 +1,58 @@
+/*
+* ============LICENSE_START=======================================================
+* dcaegen2-collectors-veshv
+* ================================================================================
+* Copyright (C) 2019 NOKIA
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.dcae.collectors.veshv.kafkaconsumer.config
+
+import arrow.core.Option
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
+import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
+import java.net.InetSocketAddress
+
+internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ KAFKA_TOPICS,
+ KAFKA_SERVERS,
+ DISABLE_PROCESSING
+ )
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
+ binding {
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+ .map { it.split(',').toSet() }
+ .bind()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+ KafkaConsumerConfiguration(
+ InetSocketAddress(listenPort),
+ kafkaTopics,
+ kafkaBootstrapServers,
+ disableProcessing
+ )
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
new file mode 100644
index 00000000..cdd4c30a
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.config
+
+import java.net.InetSocketAddress
+
+internal data class KafkaConsumerConfiguration(
+ val apiAddress: InetSocketAddress,
+ val kafkaTopics: Set<String>,
+ val kafkaBootstrapServers: String,
+ val disableProcessing: Boolean
+)
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
new file mode 100644
index 00000000..dd24345d
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
+ private val topics: Set<String>,
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+ suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job =
+ GlobalScope.launch(dispatcher) {
+ kafkaConsumer.subscribe(topics)
+ val topicPartitions = kafkaConsumer.assignment()
+ while (isActive) {
+ kafkaConsumer.endOffsets(topicPartitions)
+ .forEach { (topicPartition, offset) ->
+ offsetConsumer.update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
index fa15587c..7e77bae9 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,4 +19,26 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer
-fun main(args: Array<String>) = println("Guten tag")
+import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
+
+private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.kafkaconsumer"
+const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
+
+fun main(args: Array<String>): Unit =
+ ArgKafkaConsumerConfiguration().parse(args)
+ .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp)
+ .let(ExitCode::doExit)
+
+
+private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+ PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
+ .start().block()!!.await().block() // TODO refactor netty server logic
+
+ return ExitSuccess
+}
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
index b7ea126f..e576a88f 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,14 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.kafkaconsumer
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import kotlin.test.assertTrue
-
-object SampleTest : Spek({
- describe("sample test") {
- assertTrue(true)
- }
-})
+interface Metrics {
+ fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0)
+ fun notifyMessageTravelTime(messageSentTimeMicros: Long)
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
new file mode 100644
index 00000000..da1225e9
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.core.instrument.Timer
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
+import reactor.core.publisher.Mono
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicLong
+
+internal class MicrometerMetrics constructor(
+ private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+) : Metrics {
+
+ private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
+ private val travelTime = Timer.builder(name("travel.time"))
+ .publishPercentileHistogram(true)
+ .register(registry)
+
+ fun lastStatus(): Mono<String> = Mono.fromCallable {
+ registry.scrape()
+ }
+
+ override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) {
+ // TODO use topic and partition
+ currentOffset.lazySet(offset)
+ }
+
+ override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
+ travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
+ }
+
+ companion object {
+ val INSTANCE by lazy { MicrometerMetrics() }
+
+ private const val PREFIX = "hv-kafka-consumer"
+ private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt
new file mode 100644
index 00000000..29a17fc1
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http
+
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
+import reactor.netty.http.server.HttpServerResponse
+import java.net.InetSocketAddress
+
+internal class PrometheusApiServer(private val listenAddress: InetSocketAddress,
+ private val metrics: MicrometerMetrics) {
+
+ private val logger = Logger(PrometheusApiServer::class)
+
+ fun start(): Mono<NettyServerHandle> =
+ HttpServer.create()
+ .tcpConfiguration { it.addressSupplier { listenAddress } }
+ .route { it.get("/monitoring/prometheus", ::metricsHandler) }
+ .bind()
+ .map { NettyServerHandle(it) }
+ .doOnSuccess(::logServerStarted)
+
+
+ private fun metricsHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
+ resp.sendString(metrics.lastStatus())
+
+
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info {
+ "Kafka Consumer API server is up and listening on ${handle.host}:${handle.port}"
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
new file mode 100644
index 00000000..1481a224
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.state
+
+import org.apache.kafka.common.TopicPartition
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+
+internal class OffsetConsumer(private val metrics: Metrics) {
+
+ fun update(topicPartition: TopicPartition, offset: Long) {
+ logger.trace {
+ "Current consumer offset $offset for topic ${topicPartition.topic()} " +
+ "on partition ${topicPartition.partition()}"
+ }
+ metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition())
+ }
+
+ fun reset() = Unit
+
+ companion object {
+ val logger = Logger(OffsetConsumer::class)
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml
new file mode 100644
index 00000000..da0f7f4b
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2019 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+-->
+<configuration>
+ <property name="COMPONENT_NAME"
+ value="hv-ves-kafka-consumer-app"/>
+ <property name="COMPONENT_SHORT_NAME"
+ value="kafka-consumer-app"/>
+
+ <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
+ <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
+ <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
+
+ <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+ <property name="p_thr" value="%thread"/>
+ <property name="p_lvl" value="%highlight(%-5level)"/>
+ <property name="p_log" value="%50.50logger"/>
+ <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+ <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+ <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+ <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+ <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| %rootException%n"/>
+ <property name="READABLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| ${p_mak}\t
+| %rootException\t
+| ${p_mdc}\t
+| ${p_thr}%n"/>
+ <property name="ONAP_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_thr}\t
+| ${p_lvl}\t
+| ${p_log}\t
+| ${p_mdc}\t
+| ${p_msg}\t
+| ${p_exc}\t
+| ${p_mak}%n"/>
+ <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${LOG_PATTERN_IN_USE}</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${LOG_PATTERN_IN_USE}</pattern>
+ </encoder>
+ <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
new file mode 100644
index 00000000..b0eb7a52
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
@@ -0,0 +1,154 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import com.nhaarman.mockitokotlin2.argumentCaptor
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.verifyZeroInteractions
+import com.nhaarman.mockitokotlin2.whenever
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.test.TestCoroutineDispatcher
+import kotlinx.coroutines.test.runBlockingTest
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+@ExperimentalCoroutinesApi
+object KafkaSourceTest : Spek({
+ given("KafkaSource") {
+ val testDispatcher = TestCoroutineDispatcher()
+ val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
+ afterEachTest {
+ testDispatcher.cleanupTestCoroutines()
+ }
+ given("single topicName and partition") {
+ val topics = setOf("topicName")
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+ on("started KafkaSource") {
+ val topicPartition = createTopicPartition("topicName")
+ val topicPartitions = setOf(topicPartition)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should call update function on topicName") {
+ verify(mockedOffsetConsumer).update(topicPartition, newOffset)
+ }
+ }
+ }
+
+ given("two topics with partition") {
+ val topics = setOf(topicName1, topicName2)
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+ on("started KafkaSource for two iteration of while loop") {
+ val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
+ val offsetArgumentCaptor = argumentCaptor<Long>()
+ val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>()
+ val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>()
+ val topicPartition1 = createTopicPartition(topicName1)
+ val topicPartition2 = createTopicPartition(topicName2)
+ val topicPartitions = setOf(topicPartition1, topicPartition2)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ val partitionToOffset1 =
+ mapOf(topicPartition1 to newOffset,
+ topicPartition2 to anotherNewOffset)
+ val partitionToOffset2 =
+ mapOf(topicPartition1 to anotherNewOffset,
+ topicPartition2 to newOffset)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(partitionToOffset1, partitionToOffset2)
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(),
+ offsetArgumentCaptor.capture())
+
+ testDispatcher.advanceTimeBy(updateIntervalInMs)
+
+ verify(mockedOffsetConsumer, times(topicsAmountAfterInterval))
+ .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture())
+
+ it("should calls update function with proper arguments - before interval") {
+ assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1)
+ assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+ assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2)
+ assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+ }
+ it("should calls update function with proper arguments - after interval") {
+ assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1)
+ assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset)
+ assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2)
+ assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset)
+ }
+ job.cancelAndJoin()
+ }
+ }
+ }
+
+ given("empty topicName list") {
+ val emptyTopics = emptySet<String>()
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+ on("call of function start") {
+ val emptyTopicPartitions = setOf(null)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
+ .thenReturn(emptyMap())
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should not interact with OffsetConsumer") {
+ verifyZeroInteractions(mockedOffsetConsumer)
+ }
+ }
+ }
+
+ }
+})
+
+private const val updateIntervalInMs = 10L
+private const val partitionNumber = 0
+private const val newOffset = 2L
+private const val anotherNewOffset = 10L
+private const val topicName1 = "topicName1"
+private const val topicName2 = "topicName2"
+private const val topicsAmount = 2
+private const val topicsAmountAfterInterval = 4
+fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber) \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
new file mode 100644
index 00000000..96ba588f
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
+import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+object MicrometerMetricsTest : Spek({
+ val PREFIX = "hv-kafka-consumer"
+ val doublePrecision = Percentage.withPercentage(0.5)
+ lateinit var registry: PrometheusMeterRegistry
+ lateinit var cut: MicrometerMetrics
+
+ beforeEachTest {
+ registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+ cut = MicrometerMetrics(registry)
+ }
+
+ describe("Timers") {
+ val arbitraryMessageTravelTime = 100L
+ val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000
+ val timerName = "$PREFIX.travel.time"
+
+ on("notifyMessageTravelTime") {
+ it("should update timer $timerName") {
+
+ val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000
+ cut.notifyMessageTravelTime(messageSentTimeMicros)
+ val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000
+
+ registry.verifyTimer(timerName) { timer ->
+ val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble()
+ val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble()
+ assertThat(timer.totalTime(TimeUnit.MICROSECONDS))
+ .isLessThanOrEqualTo(travelTimeAfterNotify)
+ .isGreaterThanOrEqualTo(travelTimeBeforeNotify)
+
+ }
+ }
+ }
+ }
+
+ describe("Gauges") {
+ val gaugeName = "$PREFIX.consumer.offset"
+
+ on("notifyOffsetChanged") {
+ val offset = 966L
+
+ it("should update $gaugeName") {
+ cut.notifyOffsetChanged(offset, "sample_topic", 1)
+
+ registry.verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
new file mode 100644
index 00000000..242f27be
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.state
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import org.apache.kafka.common.TopicPartition
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
+
+object OffsetConsumerTest : Spek({
+ describe("OffsetConsumer with metrics") {
+ val mockedMetrics = mock<Metrics>()
+ val offsetConsumer = OffsetConsumer(mockedMetrics)
+ given("topicName with partition"){
+ val topicPartition = TopicPartition(topicName, partitionNumber)
+
+ on("new update method call") {
+ offsetConsumer.update(topicPartition, newOffset)
+
+ it("should notify message newOffset metric") {
+ verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber)
+ }
+ }
+ }
+ }
+})
+
+private const val partitionNumber = 1
+private const val newOffset: Long = 99
+private const val topicName = "sample-topicName"
diff --git a/sources/hv-collector-kafka/pom.xml b/sources/hv-collector-kafka/pom.xml
new file mode 100644
index 00000000..73265971
--- /dev/null
+++ b/sources/hv-collector-kafka/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+
+ <artifactId>hv-collector-kafka</artifactId>
+
+ <description>VES HighVolume Collector :: Kafka</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.nhaarman.mockitokotlin2</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt
new file mode 100644
index 00000000..b654274e
--- /dev/null
+++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafka.api
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+
+object KafkaPropertiesFactory {
+ private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+ private const val USERNAME = "admin"
+ private const val PASSWORD = "admin_secret"
+ private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
+ private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+
+ fun create(bootstrapServers: String): Map<String, Any> {
+ val props = mapOf<String, Any>(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+ ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer",
+ ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+ ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000",
+
+
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
+ SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,
+ SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG
+ )
+ return props
+ }
+} \ No newline at end of file
diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt
new file mode 100644
index 00000000..9760fb98
--- /dev/null
+++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafka.api
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+
+internal class KafkaPropertiesFactoryTest : Spek({
+ val servers = "kafka1:9080,kafka2:9080"
+
+ describe("KafkaPropertiesFactory") {
+ val options = KafkaPropertiesFactory.create(servers)
+
+ fun verifyProperty(key: String, expectedValue: Any) {
+ it("should have $key option set") {
+ assertThat(options.getValue(key))
+ .isEqualTo(expectedValue)
+ }
+ }
+
+ verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
+ verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-consumer")
+ verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-consumers")
+ verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+ verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+ verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ verifyProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000")
+ verifyProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+ verifyProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+ verifyProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG)
+ }
+})
+
+private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+private const val USERNAME = "admin"
+private const val PASSWORD = "admin_secret"
+internal val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+internal const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
index 1b4de0ce..f7da1229 100644
--- a/sources/hv-collector-main/pom.xml
+++ b/sources/hv-collector-main/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>hv-collector-main</artifactId>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index f260f158..66f3a5fc 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -20,12 +20,10 @@
package org.onap.dcae.collectors.veshv.main
import arrow.core.Option
-import arrow.core.Try
import io.micrometer.core.instrument.Counter
-import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.Meter
+import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
-import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.assertj.core.api.Assertions.assertThat
@@ -43,6 +41,9 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
+import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
+import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import org.onap.ves.VesEventOuterClass
@@ -65,29 +66,6 @@ object MicrometerMetricsTest : Spek({
cut = MicrometerMetrics(registry)
}
- fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
-
- fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
- Try {
- map(search)
- }.fold(
- { ex -> assertThat(ex).doesNotThrowAnyException() },
- verifier
- )
-
- fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
- verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
-
- fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
- verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
-
- fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
- verifyMeter(search, RequiredSearch::counter, verifier)
-
- fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
- verifyCounter(registrySearch(name), verifier)
-
-
fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
fun <T : Meter> verifyAllMetersAreUnchangedBut(
clazz: KClass<T>,
@@ -120,7 +98,7 @@ object MicrometerMetricsTest : Spek({
val bytes = 128
cut.notifyBytesReceived(bytes)
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
}
}
@@ -139,7 +117,7 @@ object MicrometerMetricsTest : Spek({
it("should increment counter") {
cut.notifyMessageReceived(emptyWireProtocolFrame())
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
}
@@ -152,7 +130,7 @@ object MicrometerMetricsTest : Spek({
val bytes = 888
cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
}
}
@@ -177,7 +155,7 @@ object MicrometerMetricsTest : Spek({
it("should increment counter") {
cut.notifyMessageSent(routedMessage(topicName1))
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(
@@ -196,11 +174,11 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageSent(routedMessage(topicName2))
cut.notifyMessageSent(routedMessage(topicName2))
- verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
+ registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
+ registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -214,7 +192,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
- verifyTimer(counterName) { timer ->
+ registry.verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
}
verifyCountersAndTimersAreUnchangedBut(
@@ -233,7 +211,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
- verifyTimer(counterName) { timer ->
+ registry.verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS))
.isGreaterThanOrEqualTo(latencyMs.toDouble())
.isLessThanOrEqualTo(latencyMs + 10000.0)
@@ -256,7 +234,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
@@ -271,11 +249,11 @@ object MicrometerMetricsTest : Spek({
cut.notifyMessageDropped(INVALID_MESSAGE)
cut.notifyMessageDropped(INVALID_MESSAGE)
- verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -290,7 +268,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyClientConnected()
cut.notifyClientConnected()
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName)
@@ -307,7 +285,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyClientDisconnected()
cut.notifyClientDisconnected()
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName)
@@ -324,7 +302,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
@@ -338,11 +316,11 @@ object MicrometerMetricsTest : Spek({
cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
- verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
@@ -359,7 +337,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyClientConnected()
cut.notifyClientDisconnected()
- verifyGauge(gaugeName) {
+ registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(2.0, doublePrecision)
}
}
@@ -368,7 +346,7 @@ object MicrometerMetricsTest : Spek({
cut.notifyClientDisconnected()
cut.notifyClientDisconnected()
- verifyGauge(gaugeName) {
+ registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
@@ -376,7 +354,7 @@ object MicrometerMetricsTest : Spek({
it("should calculate negative difference between connected and disconnected clients") {
cut.notifyClientDisconnected()
- verifyGauge(gaugeName) {
+ registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
diff --git a/sources/hv-collector-server/pom.xml b/sources/hv-collector-server/pom.xml
index 5c323393..6bf081cf 100644
--- a/sources/hv-collector-server/pom.xml
+++ b/sources/hv-collector-server/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-ssl/pom.xml b/sources/hv-collector-ssl/pom.xml
index 043b8c1e..c5f18c8c 100644
--- a/sources/hv-collector-ssl/pom.xml
+++ b/sources/hv-collector-ssl/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-test-utils/pom.xml b/sources/hv-collector-test-utils/pom.xml
index bf70e180..f187cc6f 100644
--- a/sources/hv-collector-test-utils/pom.xml
+++ b/sources/hv-collector-test-utils/pom.xml
@@ -14,7 +14,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -86,5 +86,10 @@
<artifactId>logback-classic</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt
new file mode 100644
index 00000000..1aefdb34
--- /dev/null
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Try
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Tags
+import io.micrometer.core.instrument.Timer
+import io.micrometer.core.instrument.search.RequiredSearch
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.assertj.core.api.Assertions
+
+
+fun <T> PrometheusMeterRegistry.verifyGauge(name: String, verifier: (Gauge) -> T) =
+ verifyMeter(findMeter(name), RequiredSearch::gauge, verifier)
+
+fun <T> PrometheusMeterRegistry.verifyTimer(name: String, verifier: (Timer) -> T) =
+ verifyMeter(findMeter(name), RequiredSearch::timer, verifier)
+
+fun <T> PrometheusMeterRegistry.verifyCounter(name: String, verifier: (Counter) -> T) =
+ verifyCounter(findMeter(name), verifier)
+
+fun <T> PrometheusMeterRegistry.verifyCounter(name: String, tags: Tags, verifier: (Counter) -> T) =
+ verifyCounter(findMeter(name).tags(tags), verifier)
+
+private fun PrometheusMeterRegistry.findMeter(meterName: String) = RequiredSearch.`in`(this).name(meterName)
+
+private fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
+ verifyMeter(search, RequiredSearch::counter, verifier)
+
+private inline fun <M, T> verifyMeter(search: RequiredSearch,
+ map: (RequiredSearch) -> M,
+ verifier: (M) -> T) =
+ Try { map(search) }.fold(
+ { ex -> Assertions.assertThat(ex).doesNotThrowAnyException() },
+ verifier
+ ) \ No newline at end of file
diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml
index 76609ae6..5c117125 100644
--- a/sources/hv-collector-utils/pom.xml
+++ b/sources/hv-collector-utils/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-ves-message-generator/pom.xml b/sources/hv-collector-ves-message-generator/pom.xml
index 2c50d73c..1dc1b534 100644
--- a/sources/hv-collector-ves-message-generator/pom.xml
+++ b/sources/hv-collector-ves-message-generator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml
index 65b7d6f7..53912ee8 100644
--- a/sources/hv-collector-xnf-simulator/pom.xml
+++ b/sources/hv-collector-xnf-simulator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 04a0c14a..7a28818c 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -61,7 +61,7 @@ private fun startServers(config: SimulatorConfiguration): ExitCode {
)
val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
.start(config.listenAddress)
- .block()
+ .block()!!
logger.info { "Started xNF Simulator API server" }
HealthState.INSTANCE.changeState(HealthDescription.IDLE)
diff --git a/sources/pom.xml b/sources/pom.xml
index c7ba4886..1a6e1188 100644
--- a/sources/pom.xml
+++ b/sources/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -125,7 +125,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hv-collector-analysis</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</dependency>
</dependencies>
</plugin>
@@ -142,6 +142,7 @@
<module>hv-collector-dcae-app-simulator</module>
<module>hv-collector-domain</module>
<module>hv-collector-health-check</module>
+ <module>hv-collector-kafka</module>
<module>hv-collector-kafka-consumer</module>
<module>hv-collector-main</module>
<module>hv-collector-server</module>
diff --git a/version.properties b/version.properties
index 00ef5645..7d6815b1 100644
--- a/version.properties
+++ b/version.properties
@@ -1,5 +1,5 @@
major=1
-minor=2
+minor=3
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}