diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-07-08 09:10:38 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-07-08 09:10:38 +0000 |
commit | dbf9712c95867185931b6f15d12ded7b45437a24 (patch) | |
tree | 9c54dc498dd7a1527cbdfc1fa08304371236ddb9 | |
parent | b5c4874e865f08bd7d880a515528312d7c30ba44 (diff) | |
parent | 583bb983ae956f366c034fdad20df79ccef196fd (diff) |
Merge "Measure message travel time"
8 files changed, 293 insertions, 156 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index 7ffb5ebf..cdfb6365 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -97,6 +97,10 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> + <groupId>org.onap.dcaegen2.services.sdk</groupId> + <artifactId>hvvesclient-protobuf</artifactId> + </dependency> + <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> <scope>compile</scope> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt new file mode 100644 index 00000000..d105c4a7 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + + suspend fun start(updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } + + private fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic partition $topicPartition" + } + metrics.notifyOffsetChanged(offset, topicPartition) + } + + companion object { + val logger = Logger(OffsetKafkaConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt index dd24345d..f47a66d0 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope @@ -26,23 +27,30 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventOuterClass +import java.time.Duration + +internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val metrics: Metrics, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ -internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, - private val topics: Set<String>, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { - suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = - GlobalScope.launch(dispatcher) { + suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + GlobalScope.launch(dispatcher){ kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() - while (isActive) { - kafkaConsumer.endOffsets(topicPartitions) - .forEach { (topicPartition, offset) -> - offsetConsumer.update(topicPartition, offset) - } + while (isActive){ + kafkaConsumer.poll(timeout).forEach(::update) kafkaConsumer.commitSync() delay(updateInterval) } } + + private fun update(record: ConsumerRecord<ByteArray, ByteArray>) { + val vesEvent = VesEventOuterClass.VesEvent.parseFrom(record.value()) + metrics.notifyMessageTravelTime(vesEvent.commonEventHeader.lastEpochMicrosec) + } } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt deleted file mode 100644 index 57a5f33f..00000000 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.state - -import org.apache.kafka.common.TopicPartition -import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics -import org.onap.dcae.collectors.veshv.utils.logging.Logger - - -internal class OffsetConsumer(private val metrics: Metrics) { - - fun update(topicPartition: TopicPartition, offset: Long) { - logger.trace { - "Current consumer offset $offset for topic partition $topicPartition" - } - metrics.notifyOffsetChanged(offset, topicPartition) - } - - companion object { - val logger = Logger(OffsetConsumer::class) - } -} diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt index b0eb7a52..3bb9ebae 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.impl -import com.nhaarman.mockitokotlin2.argumentCaptor -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.times -import com.nhaarman.mockitokotlin2.verify -import com.nhaarman.mockitokotlin2.verifyZeroInteractions -import com.nhaarman.mockitokotlin2.whenever +import com.nhaarman.mockitokotlin2.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.resetMain import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.test.setMain import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition import org.assertj.core.api.Assertions.assertThat @@ -36,52 +35,57 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics @ExperimentalCoroutinesApi -object KafkaSourceTest : Spek({ - given("KafkaSource") { +object OffsetKafkaConsumerTest : Spek({ + given("OffsetKafkaConsumer") { val testDispatcher = TestCoroutineDispatcher() val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>() + val mockedMetrics = mock<Metrics>() + afterEachTest { testDispatcher.cleanupTestCoroutines() + reset(mockedMetrics) } + given("single topicName and partition") { - val topics = setOf("topicName") - val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) - val mockedOffsetConsumer = mock<OffsetConsumer>() - on("started KafkaSource") { - val topicPartition = createTopicPartition("topicName") + val topicName = "topicName" + val topics = setOf(topicName) + val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) + + on("started OffsetKafkaConsumer") { + val topicPartition = createTopicPartition(topicName) val topicPartitions = setOf(topicPartition) whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset)) - runBlockingTest { - val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + runBlockingTest(testDispatcher) { + val job = offsetKafkaConsumer.start(updateIntervalInMs) job.cancelAndJoin() } - it("should call update function on topicName") { - verify(mockedOffsetConsumer).update(topicPartition, newOffset) + it("should notify offset changed with $topicName") { + verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition) } } } given("two topics with partition") { val topics = setOf(topicName1, topicName2) - val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) - val mockedOffsetConsumer = mock<OffsetConsumer>() + val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) - on("started KafkaSource for two iteration of while loop") { - val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>() + on("started OffsetKafkaConsumer for two iteration of while loop") { val offsetArgumentCaptor = argumentCaptor<Long>() - val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>() - val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>() + val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>() + val topicPartition1 = createTopicPartition(topicName1) val topicPartition2 = createTopicPartition(topicName2) val topicPartitions = setOf(topicPartition1, topicPartition2) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + val partitionToOffset1 = mapOf(topicPartition1 to newOffset, topicPartition2 to anotherNewOffset) @@ -91,37 +95,56 @@ object KafkaSourceTest : Spek({ whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) .thenReturn(partitionToOffset1, partitionToOffset2) - runBlockingTest { - val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) - verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(), - offsetArgumentCaptor.capture()) + runBlockingTest(testDispatcher) { + val job = kafkaSource.start(updateIntervalInMs) + verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged( + offsetArgumentCaptor.capture(), + topicPartitionArgumentCaptor.capture() + ) - testDispatcher.advanceTimeBy(updateIntervalInMs) - - verify(mockedOffsetConsumer, times(topicsAmountAfterInterval)) - .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture()) - - it("should calls update function with proper arguments - before interval") { - assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1) + it("should notify offset changed with proper arguments - before interval"){ assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) - assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2) + assertThat(topicPartitionArgumentCaptor.firstValue.topic()) + .isEqualToIgnoringCase(topicPartition1.topic()) + assertThat(topicPartitionArgumentCaptor.firstValue.partition()) + .isEqualTo(topicPartition1.partition()) + assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptor.secondValue.topic()) + .isEqualToIgnoringCase(topicPartition2.topic()) + assertThat(topicPartitionArgumentCaptor.secondValue.partition()) + .isEqualTo(topicPartition2.partition()) } - it("should calls update function with proper arguments - after interval") { - assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1) - assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset) - assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2) - assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset) - } + reset(mockedMetrics) + advanceTimeBy(updateIntervalInMs) + job.cancelAndJoin() + + verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged( + offsetArgumentCaptor.capture(), + topicPartitionArgumentCaptor.capture() + ) + + it("should notify offset changed with proper arguments - after interval") { + assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptor.thirdValue.topic()) + .isEqualToIgnoringCase(topicPartition1.topic()) + assertThat(topicPartitionArgumentCaptor.thirdValue.partition()) + .isEqualTo(topicPartition1.partition()) + + assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset) + assertThat(topicPartitionArgumentCaptor.lastValue.topic()) + .isEqualToIgnoringCase(topicPartition2.topic()) + assertThat(topicPartitionArgumentCaptor.lastValue.partition()) + .isEqualTo(topicPartition2.partition()) + } } } } given("empty topicName list") { val emptyTopics = emptySet<String>() - val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher) - val mockedOffsetConsumer = mock<OffsetConsumer>() + val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher) on("call of function start") { val emptyTopicPartitions = setOf(null) @@ -129,13 +152,13 @@ object KafkaSourceTest : Spek({ whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) .thenReturn(emptyMap()) - runBlockingTest { - val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + runBlockingTest(testDispatcher) { + val job = offsetKafkaConsumer.start(updateIntervalInMs) job.cancelAndJoin() } - it("should not interact with OffsetConsumer") { - verifyZeroInteractions(mockedOffsetConsumer) + it("should not interact with metrics") { + verifyZeroInteractions(mockedMetrics) } } } @@ -150,5 +173,4 @@ private const val anotherNewOffset = 10L private const val topicName1 = "topicName1" private const val topicName2 = "topicName2" private const val topicsAmount = 2 -private const val topicsAmountAfterInterval = 4 fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt new file mode 100644 index 00000000..5e445516 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumerTest.kt @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import com.nhaarman.mockitokotlin2.* +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.runBlockingTest +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.assertj.core.api.Assertions +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.tests.utils.vesEvent +import java.time.Duration + +@ExperimentalCoroutinesApi +object ProcessingKafkaConsumerTest: Spek({ + given("ProcessingKafkaConsumer") { + val testDispatcher = TestCoroutineDispatcher() + val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>() + val mockedMetrics = mock<Metrics>() + val processingKafkaConsumer = ProcessingKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher) + + afterEachTest { + testDispatcher.cleanupTestCoroutines() + reset(mockedMetrics) + } + + given("empty consumer records"){ + on("started ProcessingKafkaConsumer") { + whenever(mockedKafkaConsumer.poll(pollTimeoutInMs)).thenReturn(ConsumerRecords.empty()) + runBlockingTest(testDispatcher) { + val job = processingKafkaConsumer.start(updateIntervalInMs, pollTimeoutInMs) + job.cancelAndJoin() + } + + it("should not interact with metrics") { + verifyZeroInteractions(mockedMetrics) + } + } + } + + given("single consumer record") { + on("started ProcessingKafkaConsumer") { + val record = mock<ConsumerRecord<ByteArray, ByteArray>>() + val records = ConsumerRecords(mapOf( + topicPartition to listOf(record) + )) + + whenever(mockedKafkaConsumer.poll(pollTimeoutInMs)).thenReturn(records) + whenever(record.value()) + .thenReturn(vesEvent( commonHeader(lastEpochMicrosec = messageSentTime)).toByteArray()) + + runBlockingTest(testDispatcher) { + val job = processingKafkaConsumer.start(updateIntervalInMs,pollTimeoutInMs) + job.cancelAndJoin() + } + + + it("should notify message travel time changed with correct sent time"){ + verify(mockedMetrics).notifyMessageTravelTime(messageSentTime) + } + } + } + + given("multiple consumer records with partition"){ + val sentTimeArgumentCaptor = argumentCaptor<Long>() + + on("started ProcessingKafkaConsumer") { + val record1 = mock<ConsumerRecord<ByteArray, ByteArray>>() + val record2 = mock<ConsumerRecord<ByteArray, ByteArray>>() + val records = ConsumerRecords(mapOf( + topicPartition to listOf(record1, record2) + )) + + whenever(mockedKafkaConsumer.poll(pollTimeoutInMs)).thenReturn(records) + whenever(record1.value()) + .thenReturn(vesEvent( commonHeader(lastEpochMicrosec = messageSentTime)).toByteArray()) + whenever(record2.value()) + .thenReturn(vesEvent( commonHeader(lastEpochMicrosec = anotherMessageSentTime)).toByteArray()) + + runBlockingTest(testDispatcher) { + val job = processingKafkaConsumer.start(updateIntervalInMs,pollTimeoutInMs) + + verify(mockedMetrics, times(records.count())).notifyMessageTravelTime(sentTimeArgumentCaptor.capture()) + + it("should notify message travel time changed twice with correct arguments"){ + Assertions.assertThat(sentTimeArgumentCaptor.firstValue).isEqualTo(messageSentTime) + Assertions.assertThat(sentTimeArgumentCaptor.secondValue).isEqualTo(anotherMessageSentTime) + } + job.cancelAndJoin() + } + } + } + } +}) + +private const val updateIntervalInMs = 10L +private const val messageSentTime = 1L +private const val anotherMessageSentTime = 1L +private const val topicName = "topicName" +private val pollTimeoutInMs = Duration.ofMillis(5L) +private val topics = setOf(topicName) +private val topicPartition = TopicPartition(topicName, 0)
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt deleted file mode 100644 index 5ccb483a..00000000 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer.state - -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.apache.kafka.common.TopicPartition -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics - -object OffsetConsumerTest : Spek({ - describe("OffsetConsumer with metrics") { - val mockedMetrics = mock<Metrics>() - val offsetConsumer = OffsetConsumer(mockedMetrics) - given("topicName with partition"){ - val topicPartition = TopicPartition(topicName, partitionNumber) - - on("new update method call") { - offsetConsumer.update(topicPartition, newOffset) - - it("should notify message newOffset metric") { - verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition) - } - } - } - } -}) - -private const val partitionNumber = 1 -private const val newOffset: Long = 99 -private const val topicName = "sample-topicName" diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index ed0cab63..ba60d1b0 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -37,7 +37,7 @@ fun vesEvent(domain: VesEventDomain = PERF3GPP, eventFields: ByteString = ByteString.EMPTY, vesEventListenerVersion: String = "7.0.2" ): VesEventOuterClass.VesEvent = vesEvent( - commonHeader(domain, id, vesEventListenerVersion), + commonHeader(domain, id, vesEventListenerVersion, lastEpochMicrosec = 100000005), eventFields ) @@ -51,8 +51,9 @@ fun vesEvent(commonEventHeader: CommonEventHeader, fun commonHeader(domain: VesEventDomain = PERF3GPP, id: String = randomUUID().toString(), vesEventListenerVersion: String = "7.0.2", - priority: Priority = Priority.NORMAL -): CommonEventHeader = + priority: Priority = Priority.NORMAL, + lastEpochMicrosec: Long = 100000005 + ): CommonEventHeader = CommonEventHeader.newBuilder() .setVersion("sample-version") .setDomain(domain.domainName) @@ -62,7 +63,7 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP, .setEventName("sample-event-name") .setEventType("sample-event-type") .setStartEpochMicrosec(100000000) - .setLastEpochMicrosec(100000005) + .setLastEpochMicrosec(lastEpochMicrosec) .setNfNamingCode("sample-nf-naming-code") .setNfcNamingCode("sample-nfc-naming-code") .setNfVendorName("vendor-name") |