diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r-- | ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index a0932e916..83cc0e022 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord @@ -61,8 +62,8 @@ open class KafkaMessageConsumerService( checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" } kafkaConsumer!!.subscribe(topics) @@ -76,11 +77,13 @@ open class KafkaMessageConsumerService( log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> - /** execute the command block */ - if (!channel.isClosedForSend) { - channel.send(consumerRecord) - } else { - log.error("Channel is closed to receive message") + launch { + /** execute the command block */ + if (!channel.isClosedForSend) { + channel.send(consumerRecord) + } else { + log.error("Channel is closed to receive message") + } } } } @@ -111,8 +114,8 @@ open class KafkaMessageConsumerService( checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" } kafkaConsumer!!.subscribe(topics) |