aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt28
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt69
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt11
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt14
6 files changed, 86 insertions, 59 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 1cd8a2af7..0b899f2a3 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -35,6 +35,8 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var groupId: String
var clientId: String? = null
var topic: String? = null
+ var autoCommit: Boolean = true
+ var autoOffsetReset: String = "latest"
var pollMillSec: Long = 1000
var pollRecords: Int = -1
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index e33d41c09..7d8138639 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking
interface BlueprintMessageProducerService {
- fun sendMessage(message: Any): Boolean = runBlocking {
- sendMessageNB(message)
+ fun sendMessage(message: Any): Boolean {
+ return sendMessage(message = message, headers = null)
}
- fun sendMessage(topic: String, message: Any): Boolean = runBlocking {
- sendMessageNB(topic, message)
+ fun sendMessage(topic: String, message: Any): Boolean {
+ return sendMessage(topic, message, null)
}
- suspend fun sendMessageNB(message: Any): Boolean
+ fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
+ sendMessageNB(message = message, headers = headers)
+ }
+
+ fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
+ sendMessageNB(topic, message, headers)
+ }
+
+ suspend fun sendMessageNB(message: Any): Boolean {
+ return sendMessageNB(message = message, headers = null)
+ }
+
+ suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
+
+ suspend fun sendMessageNB(topic: String, message: Any): Boolean {
+ return sendMessageNB(topic, message, null)
+ }
- suspend fun sendMessageNB(topic: String, message: Any): Boolean
+ suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index b5d444a49..b99be0ae5 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -24,30 +24,37 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
-class KafkaBasicAuthMessageConsumerService(
+open class KafkaBasicAuthMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
: BlueprintMessageConsumerService {
- private val channel = Channel<String>()
- private var kafkaConsumer: Consumer<String, String>? = null
+ val channel = Channel<String>()
+ var kafkaConsumer: Consumer<String, ByteArray>? = null
val log = logger(KafkaBasicAuthMessageConsumerService::class)
@Volatile
var keepGoing = true
- fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+ fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
val configProperties = hashMapOf<String, Any>()
configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+ configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
+ /**
+ * earliest: automatically reset the offset to the earliest offset
+ * latest: automatically reset the offset to the latest offset
+ */
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
if (messageConsumerProperties.clientId != null) {
configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
}
@@ -95,7 +102,7 @@ class KafkaBasicAuthMessageConsumerService(
consumerRecord.value()?.let {
launch {
if (!channel.isClosedForSend) {
- channel.send(it)
+ channel.send(String(it, Charset.defaultCharset()))
} else {
log.error("Channel is closed to receive message")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 1c93bb0fc..86c04f6be 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -17,16 +17,18 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.producer.Callback
+import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.SendResult
-import org.springframework.util.concurrent.ListenableFutureCallback
+import java.nio.charset.Charset
class KafkaBasicAuthMessageProducerService(
private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
@@ -34,42 +36,42 @@ class KafkaBasicAuthMessageProducerService(
private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
- private var kafkaTemplate: KafkaTemplate<String, Any>? = null
+ private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessage(messageProducerProperties.topic!!, message)
+ return sendMessageNB(messageProducerProperties.topic!!, message)
}
- override suspend fun sendMessageNB(topic: String, message: Any): Boolean {
- val serializedMessage = when (message) {
- is String -> {
- message
- }
- else -> {
- message.asJsonType().toString()
- }
- }
- val future = messageTemplate().send(topic, serializedMessage)
+ override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
+ checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
+ return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+ }
- future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> {
- override fun onSuccess(result: SendResult<String, Any>) {
- log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]")
- }
+ override suspend fun sendMessageNB(topic: String, message: Any,
+ headers: MutableMap<String, String>?): Boolean {
+ val byteArrayMessage = when (message) {
+ is String -> message.toByteArray(Charset.defaultCharset())
+ else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+ }
- override fun onFailure(ex: Throwable) {
- log.error("Unable to send message", ex)
- }
- })
+ val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ headers?.let {
+ headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+ }
+ val callback = Callback { metadata, exception ->
+ log.info("message published offset(${metadata.offset()}, headers :$headers )")
+ }
+ messageTemplate().send(record, callback).get()
return true
}
- private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
- log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+ fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
+ log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
val configProps = hashMapOf<String, Any>()
configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
- configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+ configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
if (messageProducerProperties.clientId != null) {
configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
}
@@ -79,14 +81,11 @@ class KafkaBasicAuthMessageProducerService(
if (additionalConfig != null) {
configProps.putAll(additionalConfig)
}
- return DefaultKafkaProducerFactory(configProps)
- }
- fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
- if (kafkaTemplate == null) {
- kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
+ if (kafkaProducer == null) {
+ kafkaProducer = KafkaProducer(configProps)
}
- return kafkaTemplate!!
+ return kafkaProducer!!
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index f4e85a94b..86c2ec5ef 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest {
partitionsEndMap[partition] = records
topicsCollection.add(partition.topic())
}
- val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+ val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
mockKafkaConsumer.subscribe(topicsCollection)
mockKafkaConsumer.rebalance(partitions)
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
- "I am message $i")
+ val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray())
mockKafkaConsumer.addRecord(record)
}
@@ -131,7 +131,10 @@ open class BlueprintMessageConsumerServiceTest {
launch {
repeat(5) {
delay(100)
- blueprintMessageProducerService.sendMessage("this is my message($it)")
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.toString()
+ blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+ headers = headers)
}
}
delay(5000)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 31bcc1517..f23624f7a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -20,18 +20,18 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.RecordMetadata
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.SendResult
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
-import org.springframework.util.concurrent.SettableListenableFuture
+import java.util.concurrent.Future
import kotlin.test.Test
import kotlin.test.assertTrue
@@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest {
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
.blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
- val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>()
+ val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
- val future = SettableListenableFuture<SendResult<String, Any>>()
- //future.setException(BluePrintException("failed sending"))
+ val responseMock = mockk<Future<RecordMetadata>>()
+ every { responseMock.get() } returns mockk()
- every { mockKafkaTemplate.send(any(), any()) } returns future
+ every { mockKafkaTemplate.send(any(), any()) } returns responseMock
val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)