diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2020-08-04 11:57:56 -0400 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2020-08-04 18:24:12 -0400 |
commit | f72ff0cff34d17147a5142eb57987b1515b80580 (patch) | |
tree | 556ae71f28b4192703bcb1878375aaeea2408122 /ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin | |
parent | 260b95f2a1fe773bb1e89150164ae2a7c880be04 (diff) |
Make use of Kafka Key for Audit service and Kafka listener
* When message is sent by audit service, key will be the CBA name
* When sent by kafka listener (self-service api), key is the same as the request message key consumed. If not specified, a random UUID
* MessageProducer interface refactoring :
* add 'key' parameter to specify a key
* add default value null to paramater 'headers' to remove some unnecessary method
Issue-ID: CCSDK-2628
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I68580151184c87104c07037f379276dd8c8c71c7
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin')
5 files changed, 26 insertions, 49 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index f74abcdb7..311d35c38 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties @@ -29,15 +30,15 @@ interface ConsumerFunction interface BlueprintMessageConsumerService { - suspend fun subscribe(): Channel<String> { + suspend fun subscribe(): Channel<ConsumerRecord<String, ByteArray>> { return subscribe(null) } /** Subscribe to the Kafka channel with [additionalConfig] */ - suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> + suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ - suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String> + suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<ConsumerRecord<String, ByteArray>> /** Consume and execute dynamic function [consumerFunction] */ suspend fun consume(consumerFunction: ConsumerFunction) { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index cdc65a1c6..66d3a5b73 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -17,34 +17,19 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.runBlocking +import java.util.UUID interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean { - return sendMessage(message = message, headers = null) + fun sendMessage(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking { + sendMessageNB(key, message, headers) } - fun sendMessage(topic: String, message: Any): Boolean { - return sendMessage(topic, message, null) + fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking { + sendMessageNB(key, topic, message, headers) } - fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { - sendMessageNB(message = message, headers = headers) - } - - fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { - sendMessageNB(topic, message, headers) - } - - suspend fun sendMessageNB(message: Any): Boolean { - return sendMessageNB(message = message, headers = null) - } - - suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean - - suspend fun sendMessageNB(topic: String, message: Any): Boolean { - return sendMessageNB(topic, message, null) - } + suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean - suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean + suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index cdcd4197c..a0932e916 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -19,13 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread @@ -35,7 +34,7 @@ open class KafkaMessageConsumerService( BlueprintMessageConsumerService { val log = logger(KafkaMessageConsumerService::class) - val channel = Channel<String>() + val channel = Channel<ConsumerRecord<String, ByteArray>>() var kafkaConsumer: Consumer<String, ByteArray>? = null @Volatile @@ -49,14 +48,14 @@ open class KafkaMessageConsumerService( return KafkaConsumer(configProperties) } - override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { /** get to topic names */ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } return subscribe(consumerTopic, additionalConfig) } - override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -78,14 +77,10 @@ open class KafkaMessageConsumerService( runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ - consumerRecord.value()?.let { - launch { - if (!channel.isClosedForSend) { - channel.send(String(it, Charset.defaultCharset())) - } else { - log.error("Channel is closed to receive message") - } - } + if (!channel.isClosedForSend) { + channel.send(consumerRecord) + } else { + log.error("Channel is closed to receive message") } } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 8958d4f0c..59e9192bb 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -29,7 +29,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory import java.nio.charset.Charset @@ -48,17 +47,13 @@ class KafkaMessageProducerService( const val MAX_ERR_MSG_LEN = 128 } - override suspend fun sendMessageNB(message: Any): Boolean { + override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message) - } - - override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { - checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message, headers) + return sendMessageNB(key, messageProducerProperties.topic!!, message, headers) } override suspend fun sendMessageNB( + key: String, topic: String, message: Any, headers: MutableMap<String, String>? @@ -73,7 +68,7 @@ class KafkaMessageProducerService( else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset()) } - val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage) val recordHeaders = record.headers() messageLoggerService.messageProducing(recordHeaders) headers?.let { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt index 60f2dfa05..4340e4815 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.streams.KafkaStreams import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException @@ -39,11 +40,11 @@ open class KafkaStreamsConsumerService(private val messageConsumerProperties: Me return configProperties } - override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { throw BluePrintProcessorException("not implemented") } - override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { throw BluePrintProcessorException("not implemented") } |