aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-08-04 11:57:56 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-08-04 18:24:12 -0400
commitf72ff0cff34d17147a5142eb57987b1515b80580 (patch)
tree556ae71f28b4192703bcb1878375aaeea2408122 /ms/blueprintsprocessor/modules/commons/message-lib/src
parent260b95f2a1fe773bb1e89150164ae2a7c880be04 (diff)
Make use of Kafka Key for Audit service and Kafka listener
* When message is sent by audit service, key will be the CBA name * When sent by kafka listener (self-service api), key is the same as the request message key consumed. If not specified, a random UUID * MessageProducer interface refactoring : * add 'key' parameter to specify a key * add default value null to paramater 'headers' to remove some unnecessary method Issue-ID: CCSDK-2628 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: I68580151184c87104c07037f379276dd8c8c71c7
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt29
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt13
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt1
8 files changed, 36 insertions, 51 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index f74abcdb7..311d35c38 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -29,15 +30,15 @@ interface ConsumerFunction
interface BlueprintMessageConsumerService {
- suspend fun subscribe(): Channel<String> {
+ suspend fun subscribe(): Channel<ConsumerRecord<String, ByteArray>> {
return subscribe(null)
}
/** Subscribe to the Kafka channel with [additionalConfig] */
- suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
+ suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>>
/** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
- suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+ suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<ConsumerRecord<String, ByteArray>>
/** Consume and execute dynamic function [consumerFunction] */
suspend fun consume(consumerFunction: ConsumerFunction) {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index cdc65a1c6..66d3a5b73 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -17,34 +17,19 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.runBlocking
+import java.util.UUID
interface BlueprintMessageProducerService {
- fun sendMessage(message: Any): Boolean {
- return sendMessage(message = message, headers = null)
+ fun sendMessage(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+ sendMessageNB(key, message, headers)
}
- fun sendMessage(topic: String, message: Any): Boolean {
- return sendMessage(topic, message, null)
+ fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+ sendMessageNB(key, topic, message, headers)
}
- fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
- sendMessageNB(message = message, headers = headers)
- }
-
- fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
- sendMessageNB(topic, message, headers)
- }
-
- suspend fun sendMessageNB(message: Any): Boolean {
- return sendMessageNB(message = message, headers = null)
- }
-
- suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
-
- suspend fun sendMessageNB(topic: String, message: Any): Boolean {
- return sendMessageNB(topic, message, null)
- }
+ suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean
- suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
+ suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index cdcd4197c..a0932e916 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -19,13 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
@@ -35,7 +34,7 @@ open class KafkaMessageConsumerService(
BlueprintMessageConsumerService {
val log = logger(KafkaMessageConsumerService::class)
- val channel = Channel<String>()
+ val channel = Channel<ConsumerRecord<String, ByteArray>>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
@Volatile
@@ -49,14 +48,14 @@ open class KafkaMessageConsumerService(
return KafkaConsumer(configProperties)
}
- override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
/** get to topic names */
val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
return subscribe(consumerTopic, additionalConfig)
}
- override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
@@ -78,14 +77,10 @@ open class KafkaMessageConsumerService(
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
- consumerRecord.value()?.let {
- launch {
- if (!channel.isClosedForSend) {
- channel.send(String(it, Charset.defaultCharset()))
- } else {
- log.error("Channel is closed to receive message")
- }
- }
+ if (!channel.isClosedForSend) {
+ channel.send(consumerRecord)
+ } else {
+ log.error("Channel is closed to receive message")
}
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 8958d4f0c..59e9192bb 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -29,7 +29,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
@@ -48,17 +47,13 @@ class KafkaMessageProducerService(
const val MAX_ERR_MSG_LEN = 128
}
- override suspend fun sendMessageNB(message: Any): Boolean {
+ override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessageNB(messageProducerProperties.topic!!, message)
- }
-
- override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
- checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+ return sendMessageNB(key, messageProducerProperties.topic!!, message, headers)
}
override suspend fun sendMessageNB(
+ key: String,
topic: String,
message: Any,
headers: MutableMap<String, String>?
@@ -73,7 +68,7 @@ class KafkaMessageProducerService(
else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
}
- val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage)
val recordHeaders = record.headers()
messageLoggerService.messageProducing(recordHeaders)
headers?.let {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
index 60f2dfa05..4340e4815 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.streams.KafkaStreams
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
@@ -39,11 +40,11 @@ open class KafkaStreamsConsumerService(private val messageConsumerProperties: Me
return configProperties
}
- override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
throw BluePrintProcessorException("not implemented")
}
- override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
throw BluePrintProcessorException("not implemented")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index fdf6e48e7..77bdbe408 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -51,6 +51,7 @@ import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
+import java.nio.charset.Charset
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -133,9 +134,14 @@ open class BlueprintMessageConsumerServiceTest {
every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
val channel = spyBlueprintMessageConsumerService.subscribe(null)
+ var i = 0
launch {
channel.consumeEach {
- assertTrue(it.startsWith("I am message"), "failed to get the actual message")
+ ++i
+ val key = it.key()
+ val value = String(it.value(), Charset.defaultCharset())
+ assertTrue(value.startsWith("I am message"), "failed to get the actual message")
+ assertEquals("key_$i", key)
}
}
delay(10)
@@ -268,6 +274,7 @@ open class BlueprintMessageConsumerServiceTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = "this is my message($it)",
headers = headers
)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 537dab1ba..881f0b422 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -88,7 +88,7 @@ open class BlueprintMessageProducerServiceTest {
every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
- val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
+ val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
assertTrue(response, "failed to get command response")
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index c30ab9b02..44990ae7f 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -132,6 +132,7 @@ class KafkaStreamsConsumerServiceTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = "this is my message($it)",
headers = headers
)