aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
-rw-r--r--sources/hv-collector-kafka-consumer/pom.xml35
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt58
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt29
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt48
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt26
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt (renamed from sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt)17
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt59
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt54
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt42
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/resources/logback.xml94
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt154
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt85
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt52
13 files changed, 732 insertions, 21 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml
index 45a32729..7ffb5ebf 100644
--- a/sources/hv-collector-kafka-consumer/pom.xml
+++ b/sources/hv-collector-kafka-consumer/pom.xml
@@ -15,7 +15,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -65,6 +65,13 @@
<groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-commandline</artifactId>
<version>${project.parent.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-kafka</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
@@ -73,23 +80,35 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <optional>true</optional>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>runtime</scope>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-test</artifactId>
+ <scope>test</scope>
</dependency>
-
</dependencies>
</project>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
new file mode 100644
index 00000000..be7b5cca
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -0,0 +1,58 @@
+/*
+* ============LICENSE_START=======================================================
+* dcaegen2-collectors-veshv
+* ================================================================================
+* Copyright (C) 2019 NOKIA
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.dcae.collectors.veshv.kafkaconsumer.config
+
+import arrow.core.Option
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
+import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
+import java.net.InetSocketAddress
+
+internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+ LISTEN_PORT,
+ KAFKA_TOPICS,
+ KAFKA_SERVERS,
+ DISABLE_PROCESSING
+ )
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
+ binding {
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+ .map { it.split(',').toSet() }
+ .bind()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+ KafkaConsumerConfiguration(
+ InetSocketAddress(listenPort),
+ kafkaTopics,
+ kafkaBootstrapServers,
+ disableProcessing
+ )
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
new file mode 100644
index 00000000..cdd4c30a
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.config
+
+import java.net.InetSocketAddress
+
+internal data class KafkaConsumerConfiguration(
+ val apiAddress: InetSocketAddress,
+ val kafkaTopics: Set<String>,
+ val kafkaBootstrapServers: String,
+ val disableProcessing: Boolean
+)
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
new file mode 100644
index 00000000..dd24345d
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
+ private val topics: Set<String>,
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+ suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job =
+ GlobalScope.launch(dispatcher) {
+ kafkaConsumer.subscribe(topics)
+ val topicPartitions = kafkaConsumer.assignment()
+ while (isActive) {
+ kafkaConsumer.endOffsets(topicPartitions)
+ .forEach { (topicPartition, offset) ->
+ offsetConsumer.update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
index fa15587c..7e77bae9 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,4 +19,26 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer
-fun main(args: Array<String>) = println("Guten tag")
+import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
+
+private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.kafkaconsumer"
+const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
+
+fun main(args: Array<String>): Unit =
+ ArgKafkaConsumerConfiguration().parse(args)
+ .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp)
+ .let(ExitCode::doExit)
+
+
+private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+ PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
+ .start().block()!!.await().block() // TODO refactor netty server logic
+
+ return ExitSuccess
+}
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
index b7ea126f..e576a88f 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,14 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.kafkaconsumer
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import kotlin.test.assertTrue
-
-object SampleTest : Spek({
- describe("sample test") {
- assertTrue(true)
- }
-})
+interface Metrics {
+ fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0)
+ fun notifyMessageTravelTime(messageSentTimeMicros: Long)
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
new file mode 100644
index 00000000..da1225e9
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.core.instrument.Timer
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
+import reactor.core.publisher.Mono
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicLong
+
+internal class MicrometerMetrics constructor(
+ private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+) : Metrics {
+
+ private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
+ private val travelTime = Timer.builder(name("travel.time"))
+ .publishPercentileHistogram(true)
+ .register(registry)
+
+ fun lastStatus(): Mono<String> = Mono.fromCallable {
+ registry.scrape()
+ }
+
+ override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) {
+ // TODO use topic and partition
+ currentOffset.lazySet(offset)
+ }
+
+ override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
+ travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
+ }
+
+ companion object {
+ val INSTANCE by lazy { MicrometerMetrics() }
+
+ private const val PREFIX = "hv-kafka-consumer"
+ private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt
new file mode 100644
index 00000000..29a17fc1
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http
+
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
+import reactor.netty.http.server.HttpServerResponse
+import java.net.InetSocketAddress
+
+internal class PrometheusApiServer(private val listenAddress: InetSocketAddress,
+ private val metrics: MicrometerMetrics) {
+
+ private val logger = Logger(PrometheusApiServer::class)
+
+ fun start(): Mono<NettyServerHandle> =
+ HttpServer.create()
+ .tcpConfiguration { it.addressSupplier { listenAddress } }
+ .route { it.get("/monitoring/prometheus", ::metricsHandler) }
+ .bind()
+ .map { NettyServerHandle(it) }
+ .doOnSuccess(::logServerStarted)
+
+
+ private fun metricsHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
+ resp.sendString(metrics.lastStatus())
+
+
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info {
+ "Kafka Consumer API server is up and listening on ${handle.host}:${handle.port}"
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
new file mode 100644
index 00000000..1481a224
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.state
+
+import org.apache.kafka.common.TopicPartition
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+
+internal class OffsetConsumer(private val metrics: Metrics) {
+
+ fun update(topicPartition: TopicPartition, offset: Long) {
+ logger.trace {
+ "Current consumer offset $offset for topic ${topicPartition.topic()} " +
+ "on partition ${topicPartition.partition()}"
+ }
+ metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition())
+ }
+
+ fun reset() = Unit
+
+ companion object {
+ val logger = Logger(OffsetConsumer::class)
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml
new file mode 100644
index 00000000..da0f7f4b
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2019 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+-->
+<configuration>
+ <property name="COMPONENT_NAME"
+ value="hv-ves-kafka-consumer-app"/>
+ <property name="COMPONENT_SHORT_NAME"
+ value="kafka-consumer-app"/>
+
+ <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
+ <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
+ <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
+
+ <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+ <property name="p_thr" value="%thread"/>
+ <property name="p_lvl" value="%highlight(%-5level)"/>
+ <property name="p_log" value="%50.50logger"/>
+ <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+ <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+ <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+ <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+ <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| %rootException%n"/>
+ <property name="READABLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| ${p_mak}\t
+| %rootException\t
+| ${p_mdc}\t
+| ${p_thr}%n"/>
+ <property name="ONAP_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_thr}\t
+| ${p_lvl}\t
+| ${p_log}\t
+| ${p_mdc}\t
+| ${p_msg}\t
+| ${p_exc}\t
+| ${p_mak}%n"/>
+ <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${LOG_PATTERN_IN_USE}</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${LOG_PATTERN_IN_USE}</pattern>
+ </encoder>
+ <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
new file mode 100644
index 00000000..b0eb7a52
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
@@ -0,0 +1,154 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import com.nhaarman.mockitokotlin2.argumentCaptor
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.verifyZeroInteractions
+import com.nhaarman.mockitokotlin2.whenever
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.test.TestCoroutineDispatcher
+import kotlinx.coroutines.test.runBlockingTest
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+@ExperimentalCoroutinesApi
+object KafkaSourceTest : Spek({
+ given("KafkaSource") {
+ val testDispatcher = TestCoroutineDispatcher()
+ val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
+ afterEachTest {
+ testDispatcher.cleanupTestCoroutines()
+ }
+ given("single topicName and partition") {
+ val topics = setOf("topicName")
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+ on("started KafkaSource") {
+ val topicPartition = createTopicPartition("topicName")
+ val topicPartitions = setOf(topicPartition)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should call update function on topicName") {
+ verify(mockedOffsetConsumer).update(topicPartition, newOffset)
+ }
+ }
+ }
+
+ given("two topics with partition") {
+ val topics = setOf(topicName1, topicName2)
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+ on("started KafkaSource for two iteration of while loop") {
+ val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
+ val offsetArgumentCaptor = argumentCaptor<Long>()
+ val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>()
+ val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>()
+ val topicPartition1 = createTopicPartition(topicName1)
+ val topicPartition2 = createTopicPartition(topicName2)
+ val topicPartitions = setOf(topicPartition1, topicPartition2)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ val partitionToOffset1 =
+ mapOf(topicPartition1 to newOffset,
+ topicPartition2 to anotherNewOffset)
+ val partitionToOffset2 =
+ mapOf(topicPartition1 to anotherNewOffset,
+ topicPartition2 to newOffset)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(partitionToOffset1, partitionToOffset2)
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(),
+ offsetArgumentCaptor.capture())
+
+ testDispatcher.advanceTimeBy(updateIntervalInMs)
+
+ verify(mockedOffsetConsumer, times(topicsAmountAfterInterval))
+ .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture())
+
+ it("should calls update function with proper arguments - before interval") {
+ assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1)
+ assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+ assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2)
+ assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+ }
+ it("should calls update function with proper arguments - after interval") {
+ assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1)
+ assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset)
+ assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2)
+ assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset)
+ }
+ job.cancelAndJoin()
+ }
+ }
+ }
+
+ given("empty topicName list") {
+ val emptyTopics = emptySet<String>()
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+ on("call of function start") {
+ val emptyTopicPartitions = setOf(null)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
+ .thenReturn(emptyMap())
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should not interact with OffsetConsumer") {
+ verifyZeroInteractions(mockedOffsetConsumer)
+ }
+ }
+ }
+
+ }
+})
+
+private const val updateIntervalInMs = 10L
+private const val partitionNumber = 0
+private const val newOffset = 2L
+private const val anotherNewOffset = 10L
+private const val topicName1 = "topicName1"
+private const val topicName2 = "topicName2"
+private const val topicsAmount = 2
+private const val topicsAmountAfterInterval = 4
+fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber) \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
new file mode 100644
index 00000000..96ba588f
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
+import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+object MicrometerMetricsTest : Spek({
+ val PREFIX = "hv-kafka-consumer"
+ val doublePrecision = Percentage.withPercentage(0.5)
+ lateinit var registry: PrometheusMeterRegistry
+ lateinit var cut: MicrometerMetrics
+
+ beforeEachTest {
+ registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+ cut = MicrometerMetrics(registry)
+ }
+
+ describe("Timers") {
+ val arbitraryMessageTravelTime = 100L
+ val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000
+ val timerName = "$PREFIX.travel.time"
+
+ on("notifyMessageTravelTime") {
+ it("should update timer $timerName") {
+
+ val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000
+ cut.notifyMessageTravelTime(messageSentTimeMicros)
+ val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000
+
+ registry.verifyTimer(timerName) { timer ->
+ val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble()
+ val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble()
+ assertThat(timer.totalTime(TimeUnit.MICROSECONDS))
+ .isLessThanOrEqualTo(travelTimeAfterNotify)
+ .isGreaterThanOrEqualTo(travelTimeBeforeNotify)
+
+ }
+ }
+ }
+ }
+
+ describe("Gauges") {
+ val gaugeName = "$PREFIX.consumer.offset"
+
+ on("notifyOffsetChanged") {
+ val offset = 966L
+
+ it("should update $gaugeName") {
+ cut.notifyOffsetChanged(offset, "sample_topic", 1)
+
+ registry.verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
new file mode 100644
index 00000000..242f27be
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.state
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import org.apache.kafka.common.TopicPartition
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
+
+object OffsetConsumerTest : Spek({
+ describe("OffsetConsumer with metrics") {
+ val mockedMetrics = mock<Metrics>()
+ val offsetConsumer = OffsetConsumer(mockedMetrics)
+ given("topicName with partition"){
+ val topicPartition = TopicPartition(topicName, partitionNumber)
+
+ on("new update method call") {
+ offsetConsumer.update(topicPartition, newOffset)
+
+ it("should notify message newOffset metric") {
+ verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber)
+ }
+ }
+ }
+ }
+})
+
+private const val partitionNumber = 1
+private const val newOffset: Long = 99
+private const val topicName = "sample-topicName"