From 8a26b031f8980b2a93b51e5327df959c6926c034 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 29 Oct 2019 10:47:00 -0400 Subject: Add message consumer dynamic functions. Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh Change-Id: I51be88598557a05ef5db7cd595689a8e4a653fdc --- .../message/BluePrintMessageLibData.kt | 3 +- .../service/BlueprintMessageConsumerService.kt | 34 ++++++++++++- .../KafkaBasicAuthMessageConsumerService.kt | 56 ++++++++++++++++++---- .../service/BlueprintMessageConsumerServiceTest.kt | 54 +++++++++++++++++++-- .../src/test/resources/logback-test.xml | 2 +- 5 files changed, 134 insertions(+), 15 deletions(-) (limited to 'ms/blueprintsprocessor/modules') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 0b899f2a3..184e85b70 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +34,7 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var clientId: String? = null + lateinit var clientId: String var topic: String? = null var autoCommit: Boolean = true var autoOffsetReset: String = "latest" diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 25f0bf44d..8bcc7580a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,47 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +/** Consumer Function Interfaces */ +interface ConsumerFunction interface BlueprintMessageConsumerService { + suspend fun subscribe(): Channel { + return subscribe(null) + } + /** Subscribe to the Kafka channel with [additionalConfig] */ suspend fun subscribe(additionalConfig: Map?): Channel /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ suspend fun subscribe(topics: List, additionalConfig: Map? = null): Channel + /** Consume and execute dynamic function [consumerFunction] */ + suspend fun consume(consumerFunction: ConsumerFunction) { + consume(null, consumerFunction) + } + + /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + + /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(topics: List, additionalConfig: Map?, + consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + /** close the channel, consumer and other resources */ suspend fun shutDown() - +} +/** Consumer dynamic implementation interface */ +interface KafkaConsumerRecordsFunction : ConsumerFunction { + suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *>) } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index a4ccfa901..757846c81 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -38,7 +38,6 @@ open class KafkaBasicAuthMessageConsumerService( : BlueprintMessageConsumerService { val log = logger(KafkaBasicAuthMessageConsumerService::class) - val channel = Channel() var kafkaConsumer: Consumer? = null @@ -57,9 +56,8 @@ open class KafkaBasicAuthMessageConsumerService( configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - if (messageConsumerProperties.clientId != null) { - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! - } + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId + /** To handle Back pressure, Get only configured record for processing */ if (messageConsumerProperties.pollRecords > 0) { configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords @@ -79,7 +77,7 @@ open class KafkaBasicAuthMessageConsumerService( } - override suspend fun subscribe(consumerTopic: List, additionalConfig: Map?): Channel { + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -89,15 +87,15 @@ open class KafkaBasicAuthMessageConsumerService( "topics(${messageConsumerProperties.bootstrapServers})" } - kafkaConsumer!!.subscribe(consumerTopic) - log.info("Successfully consumed topic($consumerTopic)") + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") - thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.info("Consumed Records : ${consumerRecords.count()}") + log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ @@ -119,6 +117,46 @@ open class KafkaBasicAuthMessageConsumerService( return channel } + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume(topics: List, additionalConfig: Map?, + consumerFunction: ConsumerFunction) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + override suspend fun shutDown() { /** stop the polling loop */ keepGoing = false diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 86c2ec5ef..bdceec7d3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.MockConsumer -import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext @@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaBasicAuthConsumerWithDynamicFunction() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList = arrayListOf() + val topicsCollection: MutableList = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap = mutableMapOf() + val partitionsEndMap: MutableMap = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + /** Test Consumer Function implementation */ + val consumerFunction = object : KafkaConsumerRecordsFunction { + override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + val count = consumerRecords.count() + log.trace("Received Message count($count)") + } + } + spyBlueprintMessageConsumerService.consume(consumerFunction) + delay(10) + spyBlueprintMessageConsumerService.shutDown() + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ //@Test fun testKafkaIntegration() { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 5f4fb4f73..820041f74 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -27,7 +27,7 @@ - ${testing} + ${localPattern} -- cgit 1.2.3-korg