diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
6 files changed, 86 insertions, 59 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 1cd8a2af7..0b899f2a3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -35,6 +35,8 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var groupId: String var clientId: String? = null var topic: String? = null + var autoCommit: Boolean = true + var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index e33d41c09..7d8138639 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean = runBlocking { - sendMessageNB(message) + fun sendMessage(message: Any): Boolean { + return sendMessage(message = message, headers = null) } - fun sendMessage(topic: String, message: Any): Boolean = runBlocking { - sendMessageNB(topic, message) + fun sendMessage(topic: String, message: Any): Boolean { + return sendMessage(topic, message, null) } - suspend fun sendMessageNB(message: Any): Boolean + fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(message = message, headers = headers) + } + + fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(topic, message, headers) + } + + suspend fun sendMessageNB(message: Any): Boolean { + return sendMessageNB(message = message, headers = null) + } + + suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean + + suspend fun sendMessageNB(topic: String, message: Any): Boolean { + return sendMessageNB(topic, message, null) + } - suspend fun sendMessageNB(topic: String, message: Any): Boolean + suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b5d444a49..b99be0ae5 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -24,30 +24,37 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread -class KafkaBasicAuthMessageConsumerService( +open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { - private val channel = Channel<String>() - private var kafkaConsumer: Consumer<String, String>? = null + val channel = Channel<String>() + var kafkaConsumer: Consumer<String, ByteArray>? = null val log = logger(KafkaBasicAuthMessageConsumerService::class) @Volatile var keepGoing = true - fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> { + fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> { val configProperties = hashMapOf<String, Any>() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java if (messageConsumerProperties.clientId != null) { configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! } @@ -95,7 +102,7 @@ class KafkaBasicAuthMessageConsumerService( consumerRecord.value()?.let { launch { if (!channel.isClosedForSend) { - channel.send(it) + channel.send(String(it, Charset.defaultCharset())) } else { log.error("Channel is closed to receive message") } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 1c93bb0fc..86c04f6be 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -17,16 +17,18 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.SendResult -import org.springframework.util.concurrent.ListenableFutureCallback +import java.nio.charset.Charset class KafkaBasicAuthMessageProducerService( private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) @@ -34,42 +36,42 @@ class KafkaBasicAuthMessageProducerService( private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - private var kafkaTemplate: KafkaTemplate<String, Any>? = null + private var kafkaProducer: KafkaProducer<String, ByteArray>? = null override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessage(messageProducerProperties.topic!!, message) + return sendMessageNB(messageProducerProperties.topic!!, message) } - override suspend fun sendMessageNB(topic: String, message: Any): Boolean { - val serializedMessage = when (message) { - is String -> { - message - } - else -> { - message.asJsonType().toString() - } - } - val future = messageTemplate().send(topic, serializedMessage) + override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } - future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> { - override fun onSuccess(result: SendResult<String, Any>) { - log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]") - } + override suspend fun sendMessageNB(topic: String, message: Any, + headers: MutableMap<String, String>?): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } - override fun onFailure(ex: Throwable) { - log.error("Unable to send message", ex) - } - }) + val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + headers?.let { + headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.info("message published offset(${metadata.offset()}, headers :$headers )") + } + messageTemplate().send(record, callback).get() return true } - private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> { - log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { + log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = hashMapOf<String, Any>() configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } @@ -79,14 +81,11 @@ class KafkaBasicAuthMessageProducerService( if (additionalConfig != null) { configProps.putAll(additionalConfig) } - return DefaultKafkaProducerFactory(configProps) - } - fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> { - if (kafkaTemplate == null) { - kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) + if (kafkaProducer == null) { + kafkaProducer = KafkaProducer(configProps) } - return kafkaTemplate!! + return kafkaProducer!! } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f4e85a94b..86c2ec5ef 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest { partitionsEndMap[partition] = records topicsCollection.add(partition.topic()) } - val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) mockKafkaConsumer.subscribe(topicsCollection) mockKafkaConsumer.rebalance(partitions) mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i", - "I am message $i") + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) mockKafkaConsumer.addRecord(record) } @@ -131,7 +131,10 @@ open class BlueprintMessageConsumerServiceTest { launch { repeat(5) { delay(100) - blueprintMessageProducerService.sendMessage("this is my message($it)") + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", + headers = headers) } } delay(5000) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 31bcc1517..f23624f7a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,18 +20,18 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.RecordMetadata import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.springframework.beans.factory.annotation.Autowired -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.SendResult import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner -import org.springframework.util.concurrent.SettableListenableFuture +import java.util.concurrent.Future import kotlin.test.Test import kotlin.test.assertTrue @@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest { val blueprintMessageProducerService = bluePrintMessageLibPropertyService .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService - val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>() + val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() - val future = SettableListenableFuture<SendResult<String, Any>>() - //future.setException(BluePrintException("failed sending")) + val responseMock = mockk<Future<RecordMetadata>>() + every { responseMock.get() } returns mockk() - every { mockKafkaTemplate.send(any(), any()) } returns future + every { mockKafkaTemplate.send(any(), any()) } returns responseMock val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true) |