diff options
author | Oleg Mitsura <oleg.mitsura@amdocs.com> | 2020-08-27 10:21:29 -0400 |
---|---|---|
committer | Oleg Mitsura <oleg.mitsura@amdocs.com> | 2020-08-27 10:23:31 -0400 |
commit | f99889a17e8296d8b69d05f986342eb93491e332 (patch) | |
tree | b3645fbca147e76a43dd05089b76bd3e15ae617d /ms/blueprintsprocessor | |
parent | e0da7c94aedc038c76d17c1d4a904c7b12ba3c5b (diff) |
KafkaMessageConsumerService: 'launch' was missing
Issue-ID: CCSDK-2704
This was accidentally removed few commits back.
Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com>
Change-Id: I8f08c72e8d5695c1262aad2d10d1081bbabbdfcb
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r-- | ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index a0932e916..83cc0e022 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord @@ -61,8 +62,8 @@ open class KafkaMessageConsumerService( checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" } kafkaConsumer!!.subscribe(topics) @@ -76,11 +77,13 @@ open class KafkaMessageConsumerService( log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> - /** execute the command block */ - if (!channel.isClosedForSend) { - channel.send(consumerRecord) - } else { - log.error("Channel is closed to receive message") + launch { + /** execute the command block */ + if (!channel.isClosedForSend) { + channel.send(consumerRecord) + } else { + log.error("Channel is closed to receive message") + } } } } @@ -111,8 +114,8 @@ open class KafkaMessageConsumerService( checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" } kafkaConsumer!!.subscribe(topics) |