From f99889a17e8296d8b69d05f986342eb93491e332 Mon Sep 17 00:00:00 2001 From: Oleg Mitsura Date: Thu, 27 Aug 2020 10:21:29 -0400 Subject: KafkaMessageConsumerService: 'launch' was missing Issue-ID: CCSDK-2704 This was accidentally removed few commits back. Signed-off-by: Oleg Mitsura Change-Id: I8f08c72e8d5695c1262aad2d10d1081bbabbdfcb --- .../message/service/KafkaMessageConsumerService.kt | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index a0932e916..83cc0e022 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord @@ -61,8 +62,8 @@ open class KafkaMessageConsumerService( checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" } kafkaConsumer!!.subscribe(topics) @@ -76,11 +77,13 @@ open class KafkaMessageConsumerService( log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> - /** execute the command block */ - if (!channel.isClosedForSend) { - channel.send(consumerRecord) - } else { - log.error("Channel is closed to receive message") + launch { + /** execute the command block */ + if (!channel.isClosedForSend) { + channel.send(consumerRecord) + } else { + log.error("Channel is closed to receive message") + } } } } @@ -111,8 +114,8 @@ open class KafkaMessageConsumerService( checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" } kafkaConsumer!!.subscribe(topics) -- cgit 1.2.3-korg