diff options
47 files changed, 848 insertions, 198 deletions
@@ -50,6 +50,7 @@ </modules> <properties> + <coroutines.version>1.3.0-M2</coroutines.version> <kotlin.version>1.3.31</kotlin.version> <arrow.version>0.9.0</arrow.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> @@ -432,7 +433,7 @@ <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> - <version>1.1.1</version> + <version>${coroutines.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> @@ -587,6 +588,12 @@ <version>3.1.7.RELEASE</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-test</artifactId> + <version>${coroutines.version}</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement> </project> diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index 98934b0d..1b664edc 100644 --- a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -17,14 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.kafka.impl +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.kafka.receiver.KafkaReceiver @@ -44,31 +39,12 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr companion object { private val logger = Logger(KafkaSource::class) - private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" - private const val USERNAME = "admin" - private const val PASSWORD = "admin_secret" - private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" - private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name - fun create(bootstrapServers: String, topics: Set<String>) = KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { - val props = mapOf<String, Any>( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer", - ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", - ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", - - - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, - SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, - SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG - ) + val props = KafkaPropertiesFactory.create(bootstrapServers) return ReceiverOptions.create<ByteArray, ByteArray>(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 8a7aafbe..6ee640a4 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -20,8 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory -import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.concurrent.ConcurrentLinkedQueue @@ -41,9 +40,10 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr internal interface ConsumerStateProvider { fun currentState(): ConsumerState + fun reset() } -internal class Consumer : KafkaConsumer, ConsumerStateProvider { +internal class Consumer : ConsumerStateProvider { private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() @@ -51,7 +51,7 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider { override fun reset() = consumedMessages.clear() - override fun update(record: ConsumerRecord<ByteArray, ByteArray>) { + fun update(record: ConsumerRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -63,9 +63,19 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider { internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) { - private val consumerProvider = { Consumer() } - fun createConsumersFor(topics: Set<String>) = - ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider) - .mapValues { it.value as Consumer } + KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource -> + val topicToConsumer = topics.associateWith { Consumer() } + kafkaSource.start() + .map { + val topic = it.topic() + topicToConsumer.get(topic)?.update(it) + ?: logger.warn { "No consumer configured for topic $topic" } + }.subscribe() + topicToConsumer + } + + companion object { + private val logger = Logger(DcaeAppConsumerFactory::class) + } } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt index a594215b..728eb2fd 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -19,12 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import org.apache.kafka.clients.consumer.ConsumerRecord import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import reactor.kafka.receiver.ReceiverRecord /** @@ -77,6 +75,3 @@ private fun assertState(cut: Consumer, vararg values: ByteArray) { assertThat(cut.currentState().messagesCount) .isEqualTo(values.size) } - -private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = - ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt new file mode 100644 index 00000000..71f31aba --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +object DcaeAppConsumerFactoryTest : Spek({ + describe("DcaeAppConsumerFactory") { + val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41" + val dcaeAppConsumerFactory = DcaeAppConsumerFactory(kafkaBootstrapServers) + + on("creation of consumer") { + val kafkaTopics = setOf("topic1", "topic2") + val consumer = dcaeAppConsumerFactory.createConsumersFor(kafkaTopics) + + it("should create consumer") { + assertThat(consumer).isNotEmpty.hasSize(2) + assertThat(consumer).containsOnlyKeys("topic1", "topic2") + } + } + + on("empty kafkaTopics set") { + val emptyKafkaTopics = emptySet<String>() + val consumer = dcaeAppConsumerFactory.createConsumersFor(emptyKafkaTopics) + + it("should not create consumer") { + assertThat(consumer).isEmpty() + } + } + + + } +})
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt new file mode 100644 index 00000000..5bfbc91c --- /dev/null +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt @@ -0,0 +1,84 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import com.nhaarman.mockitokotlin2.doNothing +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import reactor.core.publisher.Flux +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOffset +import reactor.kafka.receiver.ReceiverRecord +import reactor.test.StepVerifier + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since August 2018 + */ +internal class KafkaSourceTest : Spek({ + + describe("KafkaSource"){ + given("mocked Kafka Receiver"){ + val mockedKafkaReceiver = mock<KafkaReceiver<ByteArray, ByteArray>>() + val mockedReceiverRecord = mock<ReceiverRecord<ByteArray, ByteArray>>() + whenever(mockedKafkaReceiver.receive()).thenReturn(Flux.just(mockedReceiverRecord)) + on("function that starts KafkaSource") { + val mockedReceiverOffset = mock<ReceiverOffset>() + whenever(mockedReceiverRecord.receiverOffset()).thenReturn(mockedReceiverOffset) + doNothing().`when`(mockedReceiverOffset).acknowledge() + + val testedFunction = { KafkaSource(mockedKafkaReceiver).start() } + it("should emmit receiver record") { + StepVerifier.create(testedFunction()) + .expectSubscription() + .expectNext(mockedReceiverRecord) + .expectComplete() + .verify() + } + } + } + } + + given("parameters for factory methods") { + val servers = "kafka1:9080,kafka2:9080" + val topics = setOf("topic1", "topic2") + + on("createReceiverOptions call with topics set") { + val options = KafkaSource.createReceiverOptions(servers, topics) + it("should generate options with provided topics") { + assertThat(options!!.subscriptionTopics()).contains("topic1", "topic2") + } + } + + on("create call"){ + val kafkaSource = KafkaSource.create(servers, topics) + it("should generate KafkaSource object") { + assertThat(kafkaSource).isInstanceOf(KafkaSource::class.java) + } + } + } + +}) diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt index ae797b6e..ac26cf17 100644 --- a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,10 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.kafka.api +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import org.apache.kafka.clients.consumer.ConsumerRecord +import reactor.kafka.receiver.ReceiverRecord -interface KafkaConsumer { - fun reset() - fun update(record: ConsumerRecord<ByteArray, ByteArray>) -} +internal fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = + ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index f09a3a21..7ffb5ebf 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -79,27 +79,36 @@ <version>${project.parent.version}</version> <scope>test</scope> </dependency> - + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>runtime</scope> + </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + <scope>compile</scope> </dependency> <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>runtime</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>io.projectreactor.netty</groupId> - <artifactId>reactor-netty</artifactId> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-test</artifactId> + <scope>test</scope> </dependency> </dependencies> </project> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt new file mode 100644 index 00000000..dd24345d --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer + +internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + offsetConsumer.update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index 2fabf30e..e576a88f 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics -internal interface Metrics { - fun notifyOffsetChanged(offset: Long) +interface Metrics { + fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0) fun notifyMessageTravelTime(messageSentTimeMicros: Long) -}
\ No newline at end of file +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index 748e43fc..da1225e9 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -41,7 +41,8 @@ internal class MicrometerMetrics constructor( registry.scrape() } - override fun notifyOffsetChanged(offset: Long) { + override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) { + // TODO use topic and partition currentOffset.lazySet(offset) } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt index 2c6707f9..1481a224 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -19,23 +19,24 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.state -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer +import org.apache.kafka.common.TopicPartition import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics import org.onap.dcae.collectors.veshv.utils.logging.Logger -internal class OffsetConsumer(private val metrics: Metrics): KafkaConsumer { +internal class OffsetConsumer(private val metrics: Metrics) { - override fun update(record: ConsumerRecord<ByteArray, ByteArray>) { - val offset = record.offset() - logger.trace { "Current consumer offset $offset" } - metrics.notifyOffsetChanged(offset) + fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic ${topicPartition.topic()} " + + "on partition ${topicPartition.partition()}" + } + metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition()) } - override fun reset() = Unit + fun reset() = Unit companion object { - private val logger = Logger(OffsetConsumer::class) + val logger = Logger(OffsetConsumer::class) } } diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt new file mode 100644 index 00000000..b0eb7a52 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt @@ -0,0 +1,154 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.verifyZeroInteractions +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.runBlockingTest +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer + +@ExperimentalCoroutinesApi +object KafkaSourceTest : Spek({ + given("KafkaSource") { + val testDispatcher = TestCoroutineDispatcher() + val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>() + afterEachTest { + testDispatcher.cleanupTestCoroutines() + } + given("single topicName and partition") { + val topics = setOf("topicName") + val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + on("started KafkaSource") { + val topicPartition = createTopicPartition("topicName") + val topicPartitions = setOf(topicPartition) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset)) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + job.cancelAndJoin() + } + + it("should call update function on topicName") { + verify(mockedOffsetConsumer).update(topicPartition, newOffset) + } + } + } + + given("two topics with partition") { + val topics = setOf(topicName1, topicName2) + val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + + on("started KafkaSource for two iteration of while loop") { + val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>() + val offsetArgumentCaptor = argumentCaptor<Long>() + val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>() + val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>() + val topicPartition1 = createTopicPartition(topicName1) + val topicPartition2 = createTopicPartition(topicName2) + val topicPartitions = setOf(topicPartition1, topicPartition2) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + val partitionToOffset1 = + mapOf(topicPartition1 to newOffset, + topicPartition2 to anotherNewOffset) + val partitionToOffset2 = + mapOf(topicPartition1 to anotherNewOffset, + topicPartition2 to newOffset) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(partitionToOffset1, partitionToOffset2) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(), + offsetArgumentCaptor.capture()) + + testDispatcher.advanceTimeBy(updateIntervalInMs) + + verify(mockedOffsetConsumer, times(topicsAmountAfterInterval)) + .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture()) + + it("should calls update function with proper arguments - before interval") { + assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1) + assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) + assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2) + assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) + } + it("should calls update function with proper arguments - after interval") { + assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1) + assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2) + assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset) + } + job.cancelAndJoin() + } + } + } + + given("empty topicName list") { + val emptyTopics = emptySet<String>() + val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + + on("call of function start") { + val emptyTopicPartitions = setOf(null) + whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions) + whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) + .thenReturn(emptyMap()) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + job.cancelAndJoin() + } + + it("should not interact with OffsetConsumer") { + verifyZeroInteractions(mockedOffsetConsumer) + } + } + } + + } +}) + +private const val updateIntervalInMs = 10L +private const val partitionNumber = 0 +private const val newOffset = 2L +private const val anotherNewOffset = 10L +private const val topicName1 = "topicName1" +private const val topicName2 = "topicName2" +private const val topicsAmount = 2 +private const val topicsAmountAfterInterval = 4 +fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt index 41587867..96ba588f 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -74,7 +74,7 @@ object MicrometerMetricsTest : Spek({ val offset = 966L it("should update $gaugeName") { - cut.notifyOffsetChanged(offset) + cut.notifyOffsetChanged(offset, "sample_topic", 1) registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt index 6fb42d81..242f27be 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -21,10 +21,10 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.state import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.verify -import com.nhaarman.mockitokotlin2.whenever -import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics @@ -33,16 +33,20 @@ object OffsetConsumerTest : Spek({ describe("OffsetConsumer with metrics") { val mockedMetrics = mock<Metrics>() val offsetConsumer = OffsetConsumer(mockedMetrics) + given("topicName with partition"){ + val topicPartition = TopicPartition(topicName, partitionNumber) - on("new update method call") { - val consumerRecord = mock<ConsumerRecord<ByteArray, ByteArray>>() - whenever(consumerRecord.offset()).thenReturn(1) + on("new update method call") { + offsetConsumer.update(topicPartition, newOffset) - offsetConsumer.update(consumerRecord) - - it("should notify message offset metric") { - verify(mockedMetrics).notifyOffsetChanged(1) + it("should notify message newOffset metric") { + verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber) + } } } } }) + +private const val partitionNumber = 1 +private const val newOffset: Long = 99 +private const val topicName = "sample-topicName" diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt deleted file mode 100644 index 88eb8cec..00000000 --- a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafka.api - -import org.onap.dcae.collectors.veshv.kafka.impl.KafkaSource -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -typealias ConsumerProvider = () -> KafkaConsumer - -object ConsumerFactory { - private val logger = Logger(ConsumerFactory::class) - - fun createConsumersForTopics(kafkaBootstrapServers: String, - kafkaTopics: Set<String>, - consumerProvider: ConsumerProvider): Map<String, KafkaConsumer> = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> - val topicToConsumer = kafkaTopics.associate { it to consumerProvider() } - kafkaSource.start() - .map { - val topic = it.topic() - topicToConsumer.get(topic)?.update(it) - ?: logger.warn { "No consumer configured for topic $topic" } - }.subscribe() - topicToConsumer - } -} diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt new file mode 100644 index 00000000..b654274e --- /dev/null +++ b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafka.api + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +object KafkaPropertiesFactory { + private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" + private const val USERNAME = "admin" + private const val PASSWORD = "admin_secret" + private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" + private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name + + fun create(bootstrapServers: String): Map<String, Any> { + val props = mapOf<String, Any>( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", + + + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, + SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, + SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG + ) + return props + } +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt deleted file mode 100644 index a8ba4217..00000000 --- a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafka.api - -import com.nhaarman.mockitokotlin2.mock -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.entry -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on - -object ConsumerFactoryTest : Spek({ - describe("ConsumerFactory") { - val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41" - given("consumer provider"){ - val mockedKafkaConsumer = mock<KafkaConsumer>() - val consumerProvider = { mockedKafkaConsumer } - on("creation of consumer") { - val kafkaTopics = setOf("topic1", "topic2") - val consumer = ConsumerFactory.createConsumersForTopics( - kafkaBootstrapServers, - kafkaTopics, - consumerProvider) - it("should create consumer"){ - assertThat(consumer).isNotEmpty.hasSize(2) - assertThat(consumer).contains(entry("topic1", mockedKafkaConsumer), - entry("topic2", mockedKafkaConsumer)) - } - } - on("empty kafkaTopics set"){ - val emptyKafkaTopics = emptySet<String>() - val consumer = ConsumerFactory.createConsumersForTopics( - kafkaBootstrapServers, - emptyKafkaTopics, - consumerProvider) - - it("should not create consumer"){ - assertThat(consumer).isEmpty() - } - } - } - - - } -})
\ No newline at end of file diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt index 43650f34..9760fb98 100644 --- a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt +++ b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,29 +17,28 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.kafka.impl +package org.onap.dcae.collectors.veshv.kafka.api +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -/** - * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> - * @since August 2018 - */ -internal class KafkaSourceTest : Spek({ +internal class KafkaPropertiesFactoryTest : Spek({ val servers = "kafka1:9080,kafka2:9080" - val topics = setOf("topic1", "topic2") - describe("receiver options") { - val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() + describe("KafkaPropertiesFactory") { + val options = KafkaPropertiesFactory.create(servers) fun verifyProperty(key: String, expectedValue: Any) { it("should have $key option set") { - assertThat(options.consumerProperty(key)) + assertThat(options.getValue(key)) .isEqualTo(expectedValue) } } @@ -50,5 +49,15 @@ internal class KafkaSourceTest : Spek({ verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + verifyProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000") + verifyProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) + verifyProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM) + verifyProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG) } -})
\ No newline at end of file +}) + +private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" +private const val USERNAME = "admin" +private const val PASSWORD = "admin_secret" +internal val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name +internal const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" diff --git a/development/bin/constants.sh b/tools/development/bin/constants.sh index f0df9b00..f0df9b00 100755 --- a/development/bin/constants.sh +++ b/tools/development/bin/constants.sh diff --git a/development/bin/consul.sh b/tools/development/bin/consul.sh index 5f9271f2..5f9271f2 100755 --- a/development/bin/consul.sh +++ b/tools/development/bin/consul.sh diff --git a/development/bin/dcae-msgs.sh b/tools/development/bin/dcae-msgs.sh index 84cef972..84cef972 100755 --- a/development/bin/dcae-msgs.sh +++ b/tools/development/bin/dcae-msgs.sh diff --git a/development/bin/dcae-reset.sh b/tools/development/bin/dcae-reset.sh index d2d8ebd0..d2d8ebd0 100755 --- a/development/bin/dcae-reset.sh +++ b/tools/development/bin/dcae-reset.sh diff --git a/development/bin/dcae-topic.sh b/tools/development/bin/dcae-topic.sh index b4c2638d..b4c2638d 100755 --- a/development/bin/dcae-topic.sh +++ b/tools/development/bin/dcae-topic.sh diff --git a/development/bin/run-xnf-simulator.sh b/tools/development/bin/run-xnf-simulator.sh index e4d8d94a..e4d8d94a 100755 --- a/development/bin/run-xnf-simulator.sh +++ b/tools/development/bin/run-xnf-simulator.sh diff --git a/development/bin/start-simulation.sh b/tools/development/bin/start-simulation.sh index 8c63ddbb..8c63ddbb 100755 --- a/development/bin/start-simulation.sh +++ b/tools/development/bin/start-simulation.sh diff --git a/development/bin/xnf-simulation.sh b/tools/development/bin/xnf-simulation.sh index ade0e426..ade0e426 100755 --- a/development/bin/xnf-simulation.sh +++ b/tools/development/bin/xnf-simulation.sh diff --git a/development/configuration/base.json b/tools/development/configuration/base.json index 2a806adb..2a806adb 100644 --- a/development/configuration/base.json +++ b/tools/development/configuration/base.json diff --git a/development/configuration/local.json b/tools/development/configuration/local.json index cfaaaa40..cfaaaa40 100644 --- a/development/configuration/local.json +++ b/tools/development/configuration/local.json diff --git a/development/consul/configuration.hcl b/tools/development/consul/configuration.hcl index f975955e..f975955e 100644 --- a/development/consul/configuration.hcl +++ b/tools/development/consul/configuration.hcl diff --git a/development/docker-compose.yml b/tools/development/docker-compose.yml index 2704722c..fbc38920 100644 --- a/development/docker-compose.yml +++ b/tools/development/docker-compose.yml @@ -110,7 +110,7 @@ services: - config-binding-service volumes: - ./configuration/:/etc/ves-hv/configuration/ - - ./ssl/:/etc/ves-hv/ssl/ + - ../ssl/:/etc/ves-hv/ssl/ - ./logs:/var/log/ONAP/dcae-hv-ves-collector diff --git a/development/grafana/dashboards-providers/dashboard-providers.yaml b/tools/development/grafana/dashboards-providers/dashboard-providers.yaml index 78da55c9..78da55c9 100644 --- a/development/grafana/dashboards-providers/dashboard-providers.yaml +++ b/tools/development/grafana/dashboards-providers/dashboard-providers.yaml diff --git a/development/grafana/dashboards/connections.json b/tools/development/grafana/dashboards/connections.json index 2d0182c9..2d0182c9 100644 --- a/development/grafana/dashboards/connections.json +++ b/tools/development/grafana/dashboards/connections.json diff --git a/development/grafana/dashboards/processing.json b/tools/development/grafana/dashboards/processing.json index d74968cb..d74968cb 100644 --- a/development/grafana/dashboards/processing.json +++ b/tools/development/grafana/dashboards/processing.json diff --git a/development/grafana/datasources/prometheus.yaml b/tools/development/grafana/datasources/prometheus.yaml index 80717b08..80717b08 100644 --- a/development/grafana/datasources/prometheus.yaml +++ b/tools/development/grafana/datasources/prometheus.yaml diff --git a/development/logs/.gitignore b/tools/development/logs/.gitignore index 1287e9bd..1287e9bd 100644 --- a/development/logs/.gitignore +++ b/tools/development/logs/.gitignore diff --git a/development/prometheus.yml b/tools/development/prometheus.yml index 201c8f96..201c8f96 100644 --- a/development/prometheus.yml +++ b/tools/development/prometheus.yml diff --git a/tools/performance/configuration/base.json b/tools/performance/configuration/base.json new file mode 100644 index 00000000..2a806adb --- /dev/null +++ b/tools/performance/configuration/base.json @@ -0,0 +1,11 @@ +{ + "logLevel": "DEBUG", + "server.listenPort": 6061, + "server.idleTimeoutSec": 60, + "cbs.firstRequestDelaySec": 5, + "cbs.requestIntervalSec": 5, + "security.keys.keyStoreFile": "/etc/ves-hv/ssl/server.p12", + "security.keys.keyStorePasswordFile": "/etc/ves-hv/ssl/server.pass", + "security.keys.trustStoreFile": "/etc/ves-hv/ssl/trust.p12", + "security.keys.trustStorePasswordFile": "/etc/ves-hv/ssl/trust.pass" +}
\ No newline at end of file diff --git a/tools/performance/consul/configuration.hcl b/tools/performance/consul/configuration.hcl new file mode 100644 index 00000000..f975955e --- /dev/null +++ b/tools/performance/consul/configuration.hcl @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * csit-dcaegen2-collectors-hv-ves + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +server = true +bootstrap = true +ui = true +client_addr = "0.0.0.0" + +service { + # name under which hv-ves collector should seek cbs + # usually set as CONFIG_BINDING_SERVICE environment variable + Name = "CBS" + # address of CBS as seen by hv-ves collector + Address = "config-binding-service" + Port = 10000 +} + diff --git a/tools/performance/docker-compose.yml b/tools/performance/docker-compose.yml new file mode 100644 index 00000000..82143235 --- /dev/null +++ b/tools/performance/docker-compose.yml @@ -0,0 +1,116 @@ +version: "3.5" +services: + + message-router-zookeeper: + image: nexus3.onap.org:10001/onap/dmaap/zookeeper:4.0.0 + ports: + - "2181:2181" + + message-router-kafka-0: + image: nexus3.onap.org:10001/onap/dmaap/kafka111:0.0.6 + ports: + - "9092:9092" + - "9093:9093" + environment: + HOST_IP: 127.0.0.1 + KAFKA_BROKER_ID: 0 + ENDPOINT_PORT: 30490 + KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_DELETE_TOPIC_ENABLE: "true" + + KAFKA_LISTENERS: "INTERNAL_SASL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_SASL_PLAINTEXT://0.0.0.0:9093" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL_SASL_PLAINTEXT://message-router-kafka-0:9092,EXTERNAL_SASL_PLAINTEXT://message-router-kafka-0:9093" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_SASL_PLAINTEXT" + KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN" + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN" + KAFKA_AUTHORIZER_CLASS_NAME: "org.onap.dmaap.kafkaAuthorize.KafkaCustomAuthorizer" + + aaf_locate_url: https://aaf-locate:8095 + KAFKA_LOG_DIRS: /opt/kafka/data + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_NUM_PARTITIONS: 1 + volumes: + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - message-router-zookeeper + + consul-server: + image: docker.io/consul:1.0.6 + ports: + - "8500:8500" + volumes: + - ./consul/:/consul/config + + consul-bootstrap: + image: docker.io/consul:1.0.6 + restart: on-failure + command: ["kv", "put", "-http-addr=http://consul-server:8500", "dcae-hv-ves-collector", '{ + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "aaf_credentials": { + "username": "admin", + "password": "admin_secret" + }, + "kafka_info": { + "bootstrap_servers": "message-router-kafka-0:9093", + "topic_name": "HV_VES_PERF3GPP" + } + } + } + }' + ] + depends_on: + - consul-server + + config-binding-service: + image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.platform.configbinding.app-app:2.2.4 + ports: + - "10000:10000" + environment: + CONSUL_HOST: "consul-server" + depends_on: + - consul-bootstrap + + ves-hv-collector: + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest + ports: + - "6060:6060" + - "6061:6061/tcp" + environment: + JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml" + VESHV_CONFIGURATION_FILE: "/etc/ves-hv/configuration/base.json" + CONSUL_HOST: "consul-server" + CONFIG_BINDING_SERVICE: "CBS" + HOSTNAME: "dcae-hv-ves-collector" + healthcheck: + test: ./healthcheck.sh || exit 1 + interval: 10s + timeout: 3s + retries: 3 + start_period: 15s + depends_on: + - message-router-kafka-0 + - config-binding-service + volumes: + - ./configuration/:/etc/ves-hv/configuration/ + - ./logs:/var/log/ONAP/dcae-hv-ves-collector + - ../ssl/:/etc/ves-hv/ssl/ + + kafka-consumer: + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer + ports: + - "6064:6064/tcp" + command: ["--listen-port", "6062"] + depends_on: + - message-router-kafka-0 + + prometheus: + image: prom/prometheus + ports: + - "9090:9090" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml diff --git a/tools/performance/local-performance-test.sh b/tools/performance/local-performance-test.sh new file mode 100755 index 00000000..cad21ef8 --- /dev/null +++ b/tools/performance/local-performance-test.sh @@ -0,0 +1,176 @@ +#!/usr/bin/env bash + +cd "$(dirname "$0")" + +CERT_FILE=${CERT_FILE:-/ssl/client.p12} +CERT_PASS_FILE=${CERT_PASS_FILE:-/ssl/client.pass} +HV_VES_NETWORK=${HV_VES_NETWORK:-performance_default} +VOLUME_MAPPING=${VOLUME_MAPPING:-$PWD/../ssl/:/ssl} +PRODUCER_IMAGE_NAME=${PRODUCER_IMAGE_NAME:-the-a-team-registry-local.esisoj70.emea.nsn-net.net/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-rust-client:latest} + +PRODUCER_APP_NAME=hv-ves-producer +HV_VES_ADDRESS=ves-hv-collector:6061 +CONTAINERS_COUNT=1 +CLIENTS_PER_CONTAINER=1 +MSG_SIZE=16384 +MSG_COUNT=1000 +INTERVAL_MS=0 + + +function usage() { + echo "" + echo "Run HV-VES performance test locally" + echo "Usage $0 setup|start|clean|help" + echo " setup : generate certs and set up docker components" + echo " start : run the performance test" + echo " Optional parameters:" + echo " --address : HV-VES address in host:port format (ves-hv-collector:6061)" + echo " --containers : number of docker containers to create (1)" + echo " --clients : number of clients in single container (1)" + echo " --msg-size : size in bytes of a single message (16384)" + echo " --msg-count : amount of messages to sent by one client in single container (1000)" + echo " --interval : interval between messages (0)" + echo " clean : remove generated certs, HV-VES components and producers" + echo " help : print usage" + echo "Example invocations:" + echo "./local-performance-test.sh setup" + echo "./local-performance-test.sh start --containers 10 --clients 100 --msg-count 10000" + echo "./local-performance-test.sh clean" + exit 1 +} + +function setup_environment(){ + echo "Setting up" + cd ../ssl + ./gen-certs.sh + cd ../performance + docker-compose up -d + + echo "Waiting for components to be healthy.." + while [[ $(docker-compose ps | grep -c "unhealthy\|starting") -ne 0 ]] ; do + sleep 1 + done + + echo "All components ready" + exit 0 +} + +function start_performance_test(){ + + TEST_ID=$(date +%s) + create_containers ${CONTAINERS_COUNT} ${TEST_ID} & + + while :; do + ACTIVE_PRODUCERS=$(docker ps --format "table {{.ID}}\t{{.Status}}" -f "label=id=$TEST_ID") + ACTIVE_PRODUCERS_COUNT=$(echo "$ACTIVE_PRODUCERS" | grep -c "Up") + + clear + print_test_configuration + echo "Active producers ($ACTIVE_PRODUCERS_COUNT/$CONTAINERS_COUNT):" + echo "$ACTIVE_PRODUCERS" + + EXITED_CONTAINERS=$(docker ps -aq -f "label=id=$TEST_ID" -f status=exited | wc -l) + [[ ${EXITED_CONTAINERS} -eq ${CONTAINERS_COUNT} ]] && break + + sleep 1 + done + + clear + print_test_configuration + echo "Test finished" + # TODO put test result here + exit 0 +} + +function print_test_configuration(){ + echo "PERFORMANCE TEST IN PROGRESS" + echo "" + echo "Test configuration:" + echo "Containers count: $CONTAINERS_COUNT" + echo "Clients per container: $CLIENTS_PER_CONTAINER" + echo "Message size: $MSG_SIZE" + echo "Messages per client: $MSG_COUNT" + echo "Interval: $INTERVAL_MS" + echo "" +} + +function create_containers(){ + + for i in $(seq 1 ${1}); do + docker run -d -l id="$2" -l app="$PRODUCER_APP_NAME" -v "$VOLUME_MAPPING" --network="$HV_VES_NETWORK" "$PRODUCER_IMAGE_NAME" \ + --address "$HV_VES_ADDRESS" \ + --certfile "$CERT_FILE" \ + --certpass "$CERT_PASS_FILE" \ + --containers "$CONTAINERS_COUNT" \ + --clients "$CLIENTS_PER_CONTAINER" \ + --msgsize "$MSG_SIZE" \ + --msgcount "$MSG_COUNT" \ + --intervalms "$INTERVAL_MS" > /dev/null + done +} + +function clean(){ + echo "Cleaning up" + + echo "Removing active producers" + docker rm --force $(docker ps -aqf "label=app=$PRODUCER_APP_NAME") + + echo "Clearing generated certs" + cd ../ssl + ./gen-certs.sh clean + cd ../performance + + echo "Removing HV-VES components" + docker-compose down + exit 0 +} + +if [[ $# -eq 0 ]]; then + usage +else + for arg in ${@} + do + case ${arg} in + setup) + setup_environment + ;; + start) + shift 1 + while [[ $(($#)) -gt 0 ]]; do + case "${1}" in + --address) + HV_VES_ADDRESS=${2} + ;; + --containers) + CONTAINERS_COUNT=${2} + ;; + --clients) + CLIENTS_PER_CONTAINER=${2} + ;; + --msg-size) + MSG_SIZE=${2} + ;; + --msg-count) + MSG_COUNT=${2} + ;; + --interval) + INTERVAL_MS=${2} + ;; + esac + shift 2 + done + start_performance_test + ;; + clean) + clean + ;; + help) + usage + ;; + *) + echo "Unknown action: ${arg}" >&2 + usage + ;; + esac + done +fi
\ No newline at end of file diff --git a/tools/performance/logs/.gitignore b/tools/performance/logs/.gitignore new file mode 100644 index 00000000..1287e9bd --- /dev/null +++ b/tools/performance/logs/.gitignore @@ -0,0 +1,2 @@ +** +!.gitignore diff --git a/tools/performance/prometheus.yml b/tools/performance/prometheus.yml new file mode 100644 index 00000000..b9a937c2 --- /dev/null +++ b/tools/performance/prometheus.yml @@ -0,0 +1,14 @@ +global: + scrape_interval: 5s + external_labels: + monitor: 'my-monitor' + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'kafka-consumer' + metrics_path: '/monitoring/prometheus' + static_configs: + - targets: ['kafka-consumer:6062'] diff --git a/development/ssl/.gitignore b/tools/ssl/.gitignore index 955c17d1..955c17d1 100644 --- a/development/ssl/.gitignore +++ b/tools/ssl/.gitignore diff --git a/development/ssl/Makefile-openssl b/tools/ssl/Makefile-openssl index 09802ce4..09802ce4 100644 --- a/development/ssl/Makefile-openssl +++ b/tools/ssl/Makefile-openssl diff --git a/development/ssl/README.md b/tools/ssl/README.md index c2819d24..c2819d24 100644 --- a/development/ssl/README.md +++ b/tools/ssl/README.md diff --git a/development/ssl/gen-certs.sh b/tools/ssl/gen-certs.sh index bf28ca02..bf28ca02 100755 --- a/development/ssl/gen-certs.sh +++ b/tools/ssl/gen-certs.sh |