summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-10-29 10:47:00 -0400
committerBrinda Santh <bs2796@att.com>2019-10-29 10:47:00 -0400
commit8a26b031f8980b2a93b51e5327df959c6926c034 (patch)
treec8e24e1614710403926c22a5605aead51e65b765 /ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
parent0d3a0223fd11d431497519f3f9da640aafe00460 (diff)
Add message consumer dynamic functions.
Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I51be88598557a05ef5db7cd595689a8e4a653fdc
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt34
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt56
3 files changed, 82 insertions, 11 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 0b899f2a3..184e85b70 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,7 +34,7 @@ open class MessageConsumerProperties
open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
lateinit var groupId: String
- var clientId: String? = null
+ lateinit var clientId: String
var topic: String? = null
var autoCommit: Boolean = true
var autoOffsetReset: String = "latest"
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index 25f0bf44d..8bcc7580a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,16 +18,47 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+/** Consumer Function Interfaces */
+interface ConsumerFunction
interface BlueprintMessageConsumerService {
+ suspend fun subscribe(): Channel<String> {
+ return subscribe(null)
+ }
+
/** Subscribe to the Kafka channel with [additionalConfig] */
suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
/** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+ /** Consume and execute dynamic function [consumerFunction] */
+ suspend fun consume(consumerFunction: ConsumerFunction) {
+ consume(null, consumerFunction)
+ }
+
+ /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+ suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+ throw BluePrintProcessorException("Not Implemented")
+ }
+
+ /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+ suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction) {
+ throw BluePrintProcessorException("Not Implemented")
+ }
+
/** close the channel, consumer and other resources */
suspend fun shutDown()
-
+}
+/** Consumer dynamic implementation interface */
+interface KafkaConsumerRecordsFunction : ConsumerFunction {
+ suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>)
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index a4ccfa901..757846c81 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -38,7 +38,6 @@ open class KafkaBasicAuthMessageConsumerService(
: BlueprintMessageConsumerService {
val log = logger(KafkaBasicAuthMessageConsumerService::class)
-
val channel = Channel<String>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
@@ -57,9 +56,8 @@ open class KafkaBasicAuthMessageConsumerService(
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- if (messageConsumerProperties.clientId != null) {
- configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
- }
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId
+
/** To handle Back pressure, Get only configured record for processing */
if (messageConsumerProperties.pollRecords > 0) {
configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
@@ -79,7 +77,7 @@ open class KafkaBasicAuthMessageConsumerService(
}
- override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
@@ -89,15 +87,15 @@ open class KafkaBasicAuthMessageConsumerService(
"topics(${messageConsumerProperties.bootstrapServers})"
}
- kafkaConsumer!!.subscribe(consumerTopic)
- log.info("Successfully consumed topic($consumerTopic)")
+ kafkaConsumer!!.subscribe(topics)
+ log.info("Successfully consumed topic($topics)")
- thread(start = true, name = "KafkaConsumer") {
+ thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
keepGoing = true
kafkaConsumer!!.use { kc ->
while (keepGoing) {
val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
- log.info("Consumed Records : ${consumerRecords.count()}")
+ log.trace("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
@@ -119,6 +117,46 @@ open class KafkaBasicAuthMessageConsumerService(
return channel
}
+ override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+ /** get to topic names */
+ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
+ check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
+ return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
+ }
+
+ override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction) {
+
+ val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
+
+ /** Create Kafka consumer */
+ kafkaConsumer = kafkaConsumer(additionalConfig)
+
+ checkNotNull(kafkaConsumer) {
+ "failed to create kafka consumer for " +
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
+ }
+
+ kafkaConsumer!!.subscribe(topics)
+ log.info("Successfully consumed topic($topics)")
+
+ thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
+ keepGoing = true
+ kafkaConsumer!!.use { kc ->
+ while (keepGoing) {
+ val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ log.trace("Consumed Records : ${consumerRecords.count()}")
+ runBlocking {
+ /** Execute dynamic consumer Block substitution */
+ kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
+ }
+ }
+ log.info("message listener shutting down.....")
+ }
+ }
+ }
+
override suspend fun shutDown() {
/** stop the polling loop */
keepGoing = false