From a2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 5 Sep 2019 11:06:48 -0400 Subject: Add Config based blueprint process consumer Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497 Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh --- .../message/BluePrintMessageLibConfiguration.kt | 3 +-- .../message/BluePrintMessageLibData.kt | 5 +++-- .../KafkaBasicAuthMessageConsumerService.kt | 23 ++++++++++++---------- .../KafkaBasicAuthMessageProducerService.kt | 1 - 4 files changed, 17 insertions(+), 15 deletions(-) (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 281a970b8..27a444bdc 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -59,8 +59,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep class MessageLibConstants { companion object { const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" - //TODO("Change to .messageconsumer in application.properties") - const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient." + const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index c77cdfdb5..ab04054fe 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -33,8 +33,9 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var consumerTopic: String? = null - var pollMillSec: Long = 100 + var clientId: String? = null + var topic: String? = null + var pollMillSec: Long = 1000 } open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index 076501eab..5a9e61bfd 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.CommonClientConfigs @@ -47,6 +48,10 @@ class KafkaBasicAuthMessageConsumerService( configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + if (messageConsumerProperties.clientId != null) { + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! + } + // TODO("Security Implementation based on type") /** add or override already set properties */ additionalConfig?.let { configProperties.putAll(it) } /** Create Kafka consumer */ @@ -55,7 +60,7 @@ class KafkaBasicAuthMessageConsumerService( override suspend fun subscribe(additionalConfig: Map?): Channel { /** get to topic names */ - val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() } + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } return subscribe(consumerTopic, additionalConfig) } @@ -64,6 +69,7 @@ class KafkaBasicAuthMessageConsumerService( override suspend fun subscribe(consumerTopic: List, additionalConfig: Map?): Channel { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) + checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + "server(${messageConsumerProperties.bootstrapServers})'s " + @@ -73,7 +79,7 @@ class KafkaBasicAuthMessageConsumerService( kafkaConsumer!!.subscribe(consumerTopic) log.info("Successfully consumed topic($consumerTopic)") - val listenerThread = thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { @@ -93,21 +99,18 @@ class KafkaBasicAuthMessageConsumerService( } } } + log.info("message listener shutting down.....") } - } - log.info("Successfully consumed in thread(${listenerThread})") return channel } override suspend fun shutDown() { - /** Close the Channel */ - channel.close() /** stop the polling loop */ keepGoing = false - if (kafkaConsumer != null) { - /** sunsubscribe the consumer */ - kafkaConsumer!!.unsubscribe() - } + /** Close the Channel */ + channel.cancel() + /** TO shutdown gracefully, need to wait for the maximum poll time */ + delay(messageConsumerProperties.pollMillSec) } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 008e92437..1c93bb0fc 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -83,7 +83,6 @@ class KafkaBasicAuthMessageProducerService( } fun messageTemplate(additionalConfig: Map? = null): KafkaTemplate { - log.info("Prepering templates") if (kafkaTemplate == null) { kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) } -- cgit 1.2.3-korg