diff options
3 files changed, 9 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index ab04054fe..1cd8a2af7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() { var clientId: String? = null var topic: String? = null var pollMillSec: Long = 1000 + var pollRecords: Int = -1 } open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index 5a9e61bfd..b5d444a49 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService( if (messageConsumerProperties.clientId != null) { configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! } + /** To handle Back pressure, Get only configured record for processing */ + if (messageConsumerProperties.pollRecords > 0) { + configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords + } // TODO("Security Implementation based on type") /** add or override already set properties */ additionalConfig?.let { configProperties.putAll(it) } @@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService( kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.info("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 2b84eaa78..f4e85a94b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -52,6 +52,7 @@ import kotlin.test.assertTrue "blueprintsprocessor.messageconsumer.sample.topic=default-topic", "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + "blueprintsprocessor.messageconsumer.sample.pollRecords=1", "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", @@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest { .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService launch { repeat(5) { - delay(1000) + delay(100) blueprintMessageProducerService.sendMessage("this is my message($it)") } } - delay(10000) + delay(5000) blueprintMessageConsumerService.shutDown() } } |