summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorOleg Mitsura <oleg.mitsura@amdocs.com>2020-08-27 10:21:29 -0400
committerOleg Mitsura <oleg.mitsura@amdocs.com>2020-08-27 10:23:31 -0400
commitf99889a17e8296d8b69d05f986342eb93491e332 (patch)
treeb3645fbca147e76a43dd05089b76bd3e15ae617d /ms/blueprintsprocessor/modules
parente0da7c94aedc038c76d17c1d4a904c7b12ba3c5b (diff)
KafkaMessageConsumerService: 'launch' was missing
Issue-ID: CCSDK-2704 This was accidentally removed few commits back. Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com> Change-Id: I8f08c72e8d5695c1262aad2d10d1081bbabbdfcb
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt21
1 files changed, 12 insertions, 9 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index a0932e916..83cc0e022 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -61,8 +62,8 @@ open class KafkaMessageConsumerService(
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
- "server(${messageConsumerProperties.bootstrapServers})'s " +
- "topics(${messageConsumerProperties.bootstrapServers})"
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
}
kafkaConsumer!!.subscribe(topics)
@@ -76,11 +77,13 @@ open class KafkaMessageConsumerService(
log.trace("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
- /** execute the command block */
- if (!channel.isClosedForSend) {
- channel.send(consumerRecord)
- } else {
- log.error("Channel is closed to receive message")
+ launch {
+ /** execute the command block */
+ if (!channel.isClosedForSend) {
+ channel.send(consumerRecord)
+ } else {
+ log.error("Channel is closed to receive message")
+ }
}
}
}
@@ -111,8 +114,8 @@ open class KafkaMessageConsumerService(
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
- "server(${messageConsumerProperties.bootstrapServers})'s " +
- "topics(${messageConsumerProperties.bootstrapServers})"
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
}
kafkaConsumer!!.subscribe(topics)