From 583bb983ae956f366c034fdad20df79ccef196fd Mon Sep 17 00:00:00 2001 From: Izabela Zawadzka Date: Thu, 4 Jul 2019 08:59:04 +0200 Subject: Measure message travel time Issue-ID: DCAEGEN2-1654 Signed-off-by: Izabela Zawadzka Change-Id: Ifd6597209c5be51d5b4ff5faf7b3b1b1a2871403 --- sources/hv-collector-kafka-consumer/pom.xml | 4 + .../veshv/kafkaconsumer/impl/KafkaSource.kt | 48 ------ .../kafkaconsumer/impl/OffsetKafkaConsumer.kt | 63 ++++++++ .../kafkaconsumer/impl/ProcessingKafkaConsumer.kt | 56 +++++++ .../veshv/kafkaconsumer/state/OffsetConsumer.kt | 39 ----- .../veshv/kafkaconsumer/impl/KafkaSourceTest.kt | 154 ------------------ .../kafkaconsumer/impl/OffsetKafkaConsumerTest.kt | 176 +++++++++++++++++++++ .../impl/ProcessingKafkaConsumerTest.kt | 130 +++++++++++++++ .../kafkaconsumer/state/OffsetConsumerTest.kt | 52 ------ .../dcae/collectors/veshv/tests/utils/vesEvents.kt | 9 +- 10 files changed, 434 insertions(+), 297 deletions(-) delete mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt create mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt create mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt delete mode 100644 sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt delete mode 100644 sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt create mode 100644 sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt create mode 100644 sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt delete mode 100644 sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index 7ffb5ebf..cdfb6365 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -96,6 +96,10 @@ org.jetbrains.kotlin kotlin-stdlib-jdk8 + + org.onap.dcaegen2.services.sdk + hvvesclient-protobuf + org.jetbrains.kotlinx kotlinx-coroutines-core diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt deleted file mode 100644 index dd24345d..00000000 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.impl - -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer - -internal class KafkaSource(private val kafkaConsumer: KafkaConsumer, - private val topics: Set, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { - suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = - GlobalScope.launch(dispatcher) { - kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() - while (isActive) { - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - offsetConsumer.update(topicPartition, offset) - } - kafkaConsumer.commitSync() - delay(updateInterval) - } - } -} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt new file mode 100644 index 00000000..d105c4a7 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer, + private val topics: Set, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + + suspend fun start(updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } + + private fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic partition $topicPartition" + } + metrics.notifyOffsetChanged(offset, topicPartition) + } + + companion object { + val logger = Logger(OffsetKafkaConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt new file mode 100644 index 00000000..f47a66d0 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventOuterClass +import java.time.Duration + +internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer, + private val topics: Set, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ + + suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + GlobalScope.launch(dispatcher){ + kafkaConsumer.subscribe(topics) + while (isActive){ + kafkaConsumer.poll(timeout).forEach(::update) + kafkaConsumer.commitSync() + delay(updateInterval) + } + } + + private fun update(record: ConsumerRecord) { + val vesEvent = VesEventOuterClass.VesEvent.parseFrom(record.value()) + metrics.notifyMessageTravelTime(vesEvent.commonEventHeader.lastEpochMicrosec) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt deleted file mode 100644 index 57a5f33f..00000000 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.state - -import org.apache.kafka.common.TopicPartition -import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics -import org.onap.dcae.collectors.veshv.utils.logging.Logger - - -internal class OffsetConsumer(private val metrics: Metrics) { - - fun update(topicPartition: TopicPartition, offset: Long) { - logger.trace { - "Current consumer offset $offset for topic partition $topicPartition" - } - metrics.notifyOffsetChanged(offset, topicPartition) - } - - companion object { - val logger = Logger(OffsetConsumer::class) - } -} diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt deleted file mode 100644 index b0eb7a52..00000000 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt +++ /dev/null @@ -1,154 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.impl - -import com.nhaarman.mockitokotlin2.argumentCaptor -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.times -import com.nhaarman.mockitokotlin2.verify -import com.nhaarman.mockitokotlin2.verifyZeroInteractions -import com.nhaarman.mockitokotlin2.whenever -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.test.TestCoroutineDispatcher -import kotlinx.coroutines.test.runBlockingTest -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.TopicPartition -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer - -@ExperimentalCoroutinesApi -object KafkaSourceTest : Spek({ - given("KafkaSource") { - val testDispatcher = TestCoroutineDispatcher() - val mockedKafkaConsumer = mock>() - afterEachTest { - testDispatcher.cleanupTestCoroutines() - } - given("single topicName and partition") { - val topics = setOf("topicName") - val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) - val mockedOffsetConsumer = mock() - on("started KafkaSource") { - val topicPartition = createTopicPartition("topicName") - val topicPartitions = setOf(topicPartition) - whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) - whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) - .thenReturn(mapOf(topicPartition to newOffset)) - - runBlockingTest { - val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) - job.cancelAndJoin() - } - - it("should call update function on topicName") { - verify(mockedOffsetConsumer).update(topicPartition, newOffset) - } - } - } - - given("two topics with partition") { - val topics = setOf(topicName1, topicName2) - val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) - val mockedOffsetConsumer = mock() - - on("started KafkaSource for two iteration of while loop") { - val topicPartitionArgumentCaptor = argumentCaptor() - val offsetArgumentCaptor = argumentCaptor() - val topicPartitionArgumentCaptorAfterInterval = argumentCaptor() - val offsetArgumentCaptorAfterInterval = argumentCaptor() - val topicPartition1 = createTopicPartition(topicName1) - val topicPartition2 = createTopicPartition(topicName2) - val topicPartitions = setOf(topicPartition1, topicPartition2) - whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) - val partitionToOffset1 = - mapOf(topicPartition1 to newOffset, - topicPartition2 to anotherNewOffset) - val partitionToOffset2 = - mapOf(topicPartition1 to anotherNewOffset, - topicPartition2 to newOffset) - whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) - .thenReturn(partitionToOffset1, partitionToOffset2) - - runBlockingTest { - val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) - verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(), - offsetArgumentCaptor.capture()) - - testDispatcher.advanceTimeBy(updateIntervalInMs) - - verify(mockedOffsetConsumer, times(topicsAmountAfterInterval)) - .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture()) - - it("should calls update function with proper arguments - before interval") { - assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1) - assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) - assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2) - assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) - } - it("should calls update function with proper arguments - after interval") { - assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1) - assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset) - assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2) - assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset) - } - job.cancelAndJoin() - } - } - } - - given("empty topicName list") { - val emptyTopics = emptySet() - val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher) - val mockedOffsetConsumer = mock() - - on("call of function start") { - val emptyTopicPartitions = setOf(null) - whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions) - whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) - .thenReturn(emptyMap()) - - runBlockingTest { - val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) - job.cancelAndJoin() - } - - it("should not interact with OffsetConsumer") { - verifyZeroInteractions(mockedOffsetConsumer) - } - } - } - - } -}) - -private const val updateIntervalInMs = 10L -private const val partitionNumber = 0 -private const val newOffset = 2L -private const val anotherNewOffset = 10L -private const val topicName1 = "topicName1" -private const val topicName2 = "topicName2" -private const val topicsAmount = 2 -private const val topicsAmountAfterInterval = 4 -fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber) \ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt new file mode 100644 index 00000000..3bb9ebae --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt @@ -0,0 +1,176 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import com.nhaarman.mockitokotlin2.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.resetMain +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.test.setMain +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics + +@ExperimentalCoroutinesApi +object OffsetKafkaConsumerTest : Spek({ + given("OffsetKafkaConsumer") { + val testDispatcher = TestCoroutineDispatcher() + val mockedKafkaConsumer = mock>() + val mockedMetrics = mock() + + afterEachTest { + testDispatcher.cleanupTestCoroutines() + reset(mockedMetrics) + } + + given("single topicName and partition") { + val topicName = "topicName" + val topics = setOf(topicName) + val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) + + on("started OffsetKafkaConsumer") { + val topicPartition = createTopicPartition(topicName) + val topicPartitions = setOf(topicPartition) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(mapOf(topicPartition to newOffset)) + + runBlockingTest(testDispatcher) { + val job = offsetKafkaConsumer.start(updateIntervalInMs) + job.cancelAndJoin() + } + + it("should notify offset changed with $topicName") { + verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition) + } + } + } + + given("two topics with partition") { + val topics = setOf(topicName1, topicName2) + val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) + + on("started OffsetKafkaConsumer for two iteration of while loop") { + val offsetArgumentCaptor = argumentCaptor() + val topicPartitionArgumentCaptor = argumentCaptor() + + val topicPartition1 = createTopicPartition(topicName1) + val topicPartition2 = createTopicPartition(topicName2) + val topicPartitions = setOf(topicPartition1, topicPartition2) + + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + + val partitionToOffset1 = + mapOf(topicPartition1 to newOffset, + topicPartition2 to anotherNewOffset) + val partitionToOffset2 = + mapOf(topicPartition1 to anotherNewOffset, + topicPartition2 to newOffset) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(partitionToOffset1, partitionToOffset2) + + runBlockingTest(testDispatcher) { + val job = kafkaSource.start(updateIntervalInMs) + verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged( + offsetArgumentCaptor.capture(), + topicPartitionArgumentCaptor.capture() + ) + + it("should notify offset changed with proper arguments - before interval"){ + assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) + assertThat(topicPartitionArgumentCaptor.firstValue.topic()) + .isEqualToIgnoringCase(topicPartition1.topic()) + assertThat(topicPartitionArgumentCaptor.firstValue.partition()) + .isEqualTo(topicPartition1.partition()) + + assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptor.secondValue.topic()) + .isEqualToIgnoringCase(topicPartition2.topic()) + assertThat(topicPartitionArgumentCaptor.secondValue.partition()) + .isEqualTo(topicPartition2.partition()) + } + reset(mockedMetrics) + advanceTimeBy(updateIntervalInMs) + + job.cancelAndJoin() + + verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged( + offsetArgumentCaptor.capture(), + topicPartitionArgumentCaptor.capture() + ) + + it("should notify offset changed with proper arguments - after interval") { + assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptor.thirdValue.topic()) + .isEqualToIgnoringCase(topicPartition1.topic()) + assertThat(topicPartitionArgumentCaptor.thirdValue.partition()) + .isEqualTo(topicPartition1.partition()) + + assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset) + assertThat(topicPartitionArgumentCaptor.lastValue.topic()) + .isEqualToIgnoringCase(topicPartition2.topic()) + assertThat(topicPartitionArgumentCaptor.lastValue.partition()) + .isEqualTo(topicPartition2.partition()) + } + } + } + } + + given("empty topicName list") { + val emptyTopics = emptySet() + val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher) + + on("call of function start") { + val emptyTopicPartitions = setOf(null) + whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions) + whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) + .thenReturn(emptyMap()) + + runBlockingTest(testDispatcher) { + val job = offsetKafkaConsumer.start(updateIntervalInMs) + job.cancelAndJoin() + } + + it("should not interact with metrics") { + verifyZeroInteractions(mockedMetrics) + } + } + } + + } +}) + +private const val updateIntervalInMs = 10L +private const val partitionNumber = 0 +private const val newOffset = 2L +private const val anotherNewOffset = 10L +private const val topicName1 = "topicName1" +private const val topicName2 = "topicName2" +private const val topicsAmount = 2 +fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber) \ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt new file mode 100644 index 00000000..5e445516 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import com.nhaarman.mockitokotlin2.* +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.runBlockingTest +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.assertj.core.api.Assertions +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.tests.utils.vesEvent +import java.time.Duration + +@ExperimentalCoroutinesApi +object ProcessingKafkaConsumerTest: Spek({ + given("ProcessingKafkaConsumer") { + val testDispatcher = TestCoroutineDispatcher() + val mockedKafkaConsumer = mock>() + val mockedMetrics = mock() + val processingKafkaConsumer = ProcessingKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) + + afterEachTest { + testDispatcher.cleanupTestCoroutines() + reset(mockedMetrics) + } + + given("empty consumer records"){ + on("started ProcessingKafkaConsumer") { + whenever(mockedKafkaConsumer.poll(pollTimeoutInMs)).thenReturn(ConsumerRecords.empty()) + runBlockingTest(testDispatcher) { + val job = processingKafkaConsumer.start(updateIntervalInMs, pollTimeoutInMs) + job.cancelAndJoin() + } + + it("should not interact with metrics") { + verifyZeroInteractions(mockedMetrics) + } + } + } + + given("single consumer record") { + on("started ProcessingKafkaConsumer") { + val record = mock>() + val records = ConsumerRecords(mapOf( + topicPartition to listOf(record) + )) + + whenever(mockedKafkaConsumer.poll(pollTimeoutInMs)).thenReturn(records) + whenever(record.value()) + .thenReturn(vesEvent( commonHeader(lastEpochMicrosec = messageSentTime)).toByteArray()) + + runBlockingTest(testDispatcher) { + val job = processingKafkaConsumer.start(updateIntervalInMs,pollTimeoutInMs) + job.cancelAndJoin() + } + + + it("should notify message travel time changed with correct sent time"){ + verify(mockedMetrics).notifyMessageTravelTime(messageSentTime) + } + } + } + + given("multiple consumer records with partition"){ + val sentTimeArgumentCaptor = argumentCaptor() + + on("started ProcessingKafkaConsumer") { + val record1 = mock>() + val record2 = mock>() + val records = ConsumerRecords(mapOf( + topicPartition to listOf(record1, record2) + )) + + whenever(mockedKafkaConsumer.poll(pollTimeoutInMs)).thenReturn(records) + whenever(record1.value()) + .thenReturn(vesEvent( commonHeader(lastEpochMicrosec = messageSentTime)).toByteArray()) + whenever(record2.value()) + .thenReturn(vesEvent( commonHeader(lastEpochMicrosec = anotherMessageSentTime)).toByteArray()) + + runBlockingTest(testDispatcher) { + val job = processingKafkaConsumer.start(updateIntervalInMs,pollTimeoutInMs) + + verify(mockedMetrics, times(records.count())).notifyMessageTravelTime(sentTimeArgumentCaptor.capture()) + + it("should notify message travel time changed twice with correct arguments"){ + Assertions.assertThat(sentTimeArgumentCaptor.firstValue).isEqualTo(messageSentTime) + Assertions.assertThat(sentTimeArgumentCaptor.secondValue).isEqualTo(anotherMessageSentTime) + } + job.cancelAndJoin() + } + } + } + } +}) + +private const val updateIntervalInMs = 10L +private const val messageSentTime = 1L +private const val anotherMessageSentTime = 1L +private const val topicName = "topicName" +private val pollTimeoutInMs = Duration.ofMillis(5L) +private val topics = setOf(topicName) +private val topicPartition = TopicPartition(topicName, 0) \ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt deleted file mode 100644 index 5ccb483a..00000000 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.state - -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.apache.kafka.common.TopicPartition -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics - -object OffsetConsumerTest : Spek({ - describe("OffsetConsumer with metrics") { - val mockedMetrics = mock() - val offsetConsumer = OffsetConsumer(mockedMetrics) - given("topicName with partition"){ - val topicPartition = TopicPartition(topicName, partitionNumber) - - on("new update method call") { - offsetConsumer.update(topicPartition, newOffset) - - it("should notify message newOffset metric") { - verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition) - } - } - } - } -}) - -private const val partitionNumber = 1 -private const val newOffset: Long = 99 -private const val topicName = "sample-topicName" diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index ed0cab63..ba60d1b0 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -37,7 +37,7 @@ fun vesEvent(domain: VesEventDomain = PERF3GPP, eventFields: ByteString = ByteString.EMPTY, vesEventListenerVersion: String = "7.0.2" ): VesEventOuterClass.VesEvent = vesEvent( - commonHeader(domain, id, vesEventListenerVersion), + commonHeader(domain, id, vesEventListenerVersion, lastEpochMicrosec = 100000005), eventFields ) @@ -51,8 +51,9 @@ fun vesEvent(commonEventHeader: CommonEventHeader, fun commonHeader(domain: VesEventDomain = PERF3GPP, id: String = randomUUID().toString(), vesEventListenerVersion: String = "7.0.2", - priority: Priority = Priority.NORMAL -): CommonEventHeader = + priority: Priority = Priority.NORMAL, + lastEpochMicrosec: Long = 100000005 + ): CommonEventHeader = CommonEventHeader.newBuilder() .setVersion("sample-version") .setDomain(domain.domainName) @@ -62,7 +63,7 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP, .setEventName("sample-event-name") .setEventType("sample-event-type") .setStartEpochMicrosec(100000000) - .setLastEpochMicrosec(100000005) + .setLastEpochMicrosec(lastEpochMicrosec) .setNfNamingCode("sample-nf-naming-code") .setNfcNamingCode("sample-nfc-naming-code") .setNfVendorName("vendor-name") -- cgit 1.2.3-korg