summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
diff options
context:
space:
mode:
authorBrinda Santh <brindasanth@in.ibm.com>2019-09-05 11:06:48 -0400
committerBrinda Santh <brindasanth@in.ibm.com>2019-09-05 11:06:48 -0400
commita2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 (patch)
tree45cd9390e65bf1b3851f1e00f7a4cdf21054b93e /ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
parent8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (diff)
Add Config based blueprint process consumer
Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497 Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt23
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt1
4 files changed, 17 insertions, 15 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index 281a970b8..27a444bdc 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -59,8 +59,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep
class MessageLibConstants {
companion object {
const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
- //TODO("Change to .messageconsumer in application.properties")
- const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient."
+ const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index c77cdfdb5..ab04054fe 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -33,8 +33,9 @@ open class MessageConsumerProperties
open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
lateinit var groupId: String
- var consumerTopic: String? = null
- var pollMillSec: Long = 100
+ var clientId: String? = null
+ var topic: String? = null
+ var pollMillSec: Long = 1000
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 076501eab..5a9e61bfd 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.CommonClientConfigs
@@ -47,6 +48,10 @@ class KafkaBasicAuthMessageConsumerService(
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ if (messageConsumerProperties.clientId != null) {
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
+ }
+ // TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
@@ -55,7 +60,7 @@ class KafkaBasicAuthMessageConsumerService(
override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
/** get to topic names */
- val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() }
+ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
return subscribe(consumerTopic, additionalConfig)
}
@@ -64,6 +69,7 @@ class KafkaBasicAuthMessageConsumerService(
override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
+
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
"server(${messageConsumerProperties.bootstrapServers})'s " +
@@ -73,7 +79,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.subscribe(consumerTopic)
log.info("Successfully consumed topic($consumerTopic)")
- val listenerThread = thread(start = true, name = "KafkaConsumer") {
+ thread(start = true, name = "KafkaConsumer") {
keepGoing = true
kafkaConsumer!!.use { kc ->
while (keepGoing) {
@@ -93,21 +99,18 @@ class KafkaBasicAuthMessageConsumerService(
}
}
}
+ log.info("message listener shutting down.....")
}
-
}
- log.info("Successfully consumed in thread(${listenerThread})")
return channel
}
override suspend fun shutDown() {
- /** Close the Channel */
- channel.close()
/** stop the polling loop */
keepGoing = false
- if (kafkaConsumer != null) {
- /** sunsubscribe the consumer */
- kafkaConsumer!!.unsubscribe()
- }
+ /** Close the Channel */
+ channel.cancel()
+ /** TO shutdown gracefully, need to wait for the maximum poll time */
+ delay(messageConsumerProperties.pollMillSec)
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 008e92437..1c93bb0fc 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -83,7 +83,6 @@ class KafkaBasicAuthMessageProducerService(
}
fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
- log.info("Prepering templates")
if (kafkaTemplate == null) {
kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
}