diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main')
4 files changed, 72 insertions, 48 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 1cd8a2af7..0b899f2a3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -35,6 +35,8 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var groupId: String var clientId: String? = null var topic: String? = null + var autoCommit: Boolean = true + var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index e33d41c09..7d8138639 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean = runBlocking { - sendMessageNB(message) + fun sendMessage(message: Any): Boolean { + return sendMessage(message = message, headers = null) } - fun sendMessage(topic: String, message: Any): Boolean = runBlocking { - sendMessageNB(topic, message) + fun sendMessage(topic: String, message: Any): Boolean { + return sendMessage(topic, message, null) } - suspend fun sendMessageNB(message: Any): Boolean + fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(message = message, headers = headers) + } + + fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(topic, message, headers) + } + + suspend fun sendMessageNB(message: Any): Boolean { + return sendMessageNB(message = message, headers = null) + } + + suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean + + suspend fun sendMessageNB(topic: String, message: Any): Boolean { + return sendMessageNB(topic, message, null) + } - suspend fun sendMessageNB(topic: String, message: Any): Boolean + suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b5d444a49..b99be0ae5 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -24,30 +24,37 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread -class KafkaBasicAuthMessageConsumerService( +open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { - private val channel = Channel<String>() - private var kafkaConsumer: Consumer<String, String>? = null + val channel = Channel<String>() + var kafkaConsumer: Consumer<String, ByteArray>? = null val log = logger(KafkaBasicAuthMessageConsumerService::class) @Volatile var keepGoing = true - fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> { + fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> { val configProperties = hashMapOf<String, Any>() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java if (messageConsumerProperties.clientId != null) { configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! } @@ -95,7 +102,7 @@ class KafkaBasicAuthMessageConsumerService( consumerRecord.value()?.let { launch { if (!channel.isClosedForSend) { - channel.send(it) + channel.send(String(it, Charset.defaultCharset())) } else { log.error("Channel is closed to receive message") } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 1c93bb0fc..86c04f6be 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -17,16 +17,18 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.SendResult -import org.springframework.util.concurrent.ListenableFutureCallback +import java.nio.charset.Charset class KafkaBasicAuthMessageProducerService( private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) @@ -34,42 +36,42 @@ class KafkaBasicAuthMessageProducerService( private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - private var kafkaTemplate: KafkaTemplate<String, Any>? = null + private var kafkaProducer: KafkaProducer<String, ByteArray>? = null override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessage(messageProducerProperties.topic!!, message) + return sendMessageNB(messageProducerProperties.topic!!, message) } - override suspend fun sendMessageNB(topic: String, message: Any): Boolean { - val serializedMessage = when (message) { - is String -> { - message - } - else -> { - message.asJsonType().toString() - } - } - val future = messageTemplate().send(topic, serializedMessage) + override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } - future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> { - override fun onSuccess(result: SendResult<String, Any>) { - log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]") - } + override suspend fun sendMessageNB(topic: String, message: Any, + headers: MutableMap<String, String>?): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } - override fun onFailure(ex: Throwable) { - log.error("Unable to send message", ex) - } - }) + val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + headers?.let { + headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.info("message published offset(${metadata.offset()}, headers :$headers )") + } + messageTemplate().send(record, callback).get() return true } - private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> { - log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { + log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = hashMapOf<String, Any>() configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } @@ -79,14 +81,11 @@ class KafkaBasicAuthMessageProducerService( if (additionalConfig != null) { configProps.putAll(additionalConfig) } - return DefaultKafkaProducerFactory(configProps) - } - fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> { - if (kafkaTemplate == null) { - kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) + if (kafkaProducer == null) { + kafkaProducer = KafkaProducer(configProps) } - return kafkaTemplate!! + return kafkaProducer!! } } |