summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt21
1 files changed, 12 insertions, 9 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index a0932e916..83cc0e022 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -61,8 +62,8 @@ open class KafkaMessageConsumerService(
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
- "server(${messageConsumerProperties.bootstrapServers})'s " +
- "topics(${messageConsumerProperties.bootstrapServers})"
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
}
kafkaConsumer!!.subscribe(topics)
@@ -76,11 +77,13 @@ open class KafkaMessageConsumerService(
log.trace("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
- /** execute the command block */
- if (!channel.isClosedForSend) {
- channel.send(consumerRecord)
- } else {
- log.error("Channel is closed to receive message")
+ launch {
+ /** execute the command block */
+ if (!channel.isClosedForSend) {
+ channel.send(consumerRecord)
+ } else {
+ log.error("Channel is closed to receive message")
+ }
}
}
}
@@ -111,8 +114,8 @@ open class KafkaMessageConsumerService(
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
- "server(${messageConsumerProperties.bootstrapServers})'s " +
- "topics(${messageConsumerProperties.bootstrapServers})"
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
}
kafkaConsumer!!.subscribe(topics)