summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
-rw-r--r--sources/hv-collector-kafka-consumer/pom.xml25
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt48
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt6
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt3
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt19
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt154
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt2
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt22
8 files changed, 248 insertions, 31 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml
index f09a3a21..7ffb5ebf 100644
--- a/sources/hv-collector-kafka-consumer/pom.xml
+++ b/sources/hv-collector-kafka-consumer/pom.xml
@@ -79,27 +79,36 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>runtime</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor.netty</groupId>
- <artifactId>reactor-netty</artifactId>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-test</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
new file mode 100644
index 00000000..dd24345d
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
+ private val topics: Set<String>,
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+ suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job =
+ GlobalScope.launch(dispatcher) {
+ kafkaConsumer.subscribe(topics)
+ val topicPartitions = kafkaConsumer.assignment()
+ while (isActive) {
+ kafkaConsumer.endOffsets(topicPartitions)
+ .forEach { (topicPartition, offset) ->
+ offsetConsumer.update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
index 2fabf30e..e576a88f 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
-internal interface Metrics {
- fun notifyOffsetChanged(offset: Long)
+interface Metrics {
+ fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0)
fun notifyMessageTravelTime(messageSentTimeMicros: Long)
-} \ No newline at end of file
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index 748e43fc..da1225e9 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -41,7 +41,8 @@ internal class MicrometerMetrics constructor(
registry.scrape()
}
- override fun notifyOffsetChanged(offset: Long) {
+ override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) {
+ // TODO use topic and partition
currentOffset.lazySet(offset)
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
index 2c6707f9..1481a224 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
@@ -19,23 +19,24 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.state
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-internal class OffsetConsumer(private val metrics: Metrics): KafkaConsumer {
+internal class OffsetConsumer(private val metrics: Metrics) {
- override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
- val offset = record.offset()
- logger.trace { "Current consumer offset $offset" }
- metrics.notifyOffsetChanged(offset)
+ fun update(topicPartition: TopicPartition, offset: Long) {
+ logger.trace {
+ "Current consumer offset $offset for topic ${topicPartition.topic()} " +
+ "on partition ${topicPartition.partition()}"
+ }
+ metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition())
}
- override fun reset() = Unit
+ fun reset() = Unit
companion object {
- private val logger = Logger(OffsetConsumer::class)
+ val logger = Logger(OffsetConsumer::class)
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
new file mode 100644
index 00000000..b0eb7a52
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
@@ -0,0 +1,154 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import com.nhaarman.mockitokotlin2.argumentCaptor
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.verifyZeroInteractions
+import com.nhaarman.mockitokotlin2.whenever
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.test.TestCoroutineDispatcher
+import kotlinx.coroutines.test.runBlockingTest
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+@ExperimentalCoroutinesApi
+object KafkaSourceTest : Spek({
+ given("KafkaSource") {
+ val testDispatcher = TestCoroutineDispatcher()
+ val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
+ afterEachTest {
+ testDispatcher.cleanupTestCoroutines()
+ }
+ given("single topicName and partition") {
+ val topics = setOf("topicName")
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+ on("started KafkaSource") {
+ val topicPartition = createTopicPartition("topicName")
+ val topicPartitions = setOf(topicPartition)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should call update function on topicName") {
+ verify(mockedOffsetConsumer).update(topicPartition, newOffset)
+ }
+ }
+ }
+
+ given("two topics with partition") {
+ val topics = setOf(topicName1, topicName2)
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+ on("started KafkaSource for two iteration of while loop") {
+ val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
+ val offsetArgumentCaptor = argumentCaptor<Long>()
+ val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>()
+ val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>()
+ val topicPartition1 = createTopicPartition(topicName1)
+ val topicPartition2 = createTopicPartition(topicName2)
+ val topicPartitions = setOf(topicPartition1, topicPartition2)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+ val partitionToOffset1 =
+ mapOf(topicPartition1 to newOffset,
+ topicPartition2 to anotherNewOffset)
+ val partitionToOffset2 =
+ mapOf(topicPartition1 to anotherNewOffset,
+ topicPartition2 to newOffset)
+ whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+ .thenReturn(partitionToOffset1, partitionToOffset2)
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(),
+ offsetArgumentCaptor.capture())
+
+ testDispatcher.advanceTimeBy(updateIntervalInMs)
+
+ verify(mockedOffsetConsumer, times(topicsAmountAfterInterval))
+ .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture())
+
+ it("should calls update function with proper arguments - before interval") {
+ assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1)
+ assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+ assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2)
+ assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+ }
+ it("should calls update function with proper arguments - after interval") {
+ assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1)
+ assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset)
+ assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2)
+ assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset)
+ }
+ job.cancelAndJoin()
+ }
+ }
+ }
+
+ given("empty topicName list") {
+ val emptyTopics = emptySet<String>()
+ val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher)
+ val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+ on("call of function start") {
+ val emptyTopicPartitions = setOf(null)
+ whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
+ whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
+ .thenReturn(emptyMap())
+
+ runBlockingTest {
+ val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+ job.cancelAndJoin()
+ }
+
+ it("should not interact with OffsetConsumer") {
+ verifyZeroInteractions(mockedOffsetConsumer)
+ }
+ }
+ }
+
+ }
+})
+
+private const val updateIntervalInMs = 10L
+private const val partitionNumber = 0
+private const val newOffset = 2L
+private const val anotherNewOffset = 10L
+private const val topicName1 = "topicName1"
+private const val topicName2 = "topicName2"
+private const val topicsAmount = 2
+private const val topicsAmountAfterInterval = 4
+fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber) \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
index 41587867..96ba588f 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -74,7 +74,7 @@ object MicrometerMetricsTest : Spek({
val offset = 966L
it("should update $gaugeName") {
- cut.notifyOffsetChanged(offset)
+ cut.notifyOffsetChanged(offset, "sample_topic", 1)
registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
index 6fb42d81..242f27be 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
@@ -21,10 +21,10 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.state
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.verify
-import com.nhaarman.mockitokotlin2.whenever
-import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
@@ -33,16 +33,20 @@ object OffsetConsumerTest : Spek({
describe("OffsetConsumer with metrics") {
val mockedMetrics = mock<Metrics>()
val offsetConsumer = OffsetConsumer(mockedMetrics)
+ given("topicName with partition"){
+ val topicPartition = TopicPartition(topicName, partitionNumber)
- on("new update method call") {
- val consumerRecord = mock<ConsumerRecord<ByteArray, ByteArray>>()
- whenever(consumerRecord.offset()).thenReturn(1)
+ on("new update method call") {
+ offsetConsumer.update(topicPartition, newOffset)
- offsetConsumer.update(consumerRecord)
-
- it("should notify message offset metric") {
- verify(mockedMetrics).notifyOffsetChanged(1)
+ it("should notify message newOffset metric") {
+ verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber)
+ }
}
}
}
})
+
+private const val partitionNumber = 1
+private const val newOffset: Long = 99
+private const val topicName = "sample-topicName"