From a2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 5 Sep 2019 11:06:48 -0400 Subject: Add Config based blueprint process consumer Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497 Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh --- .../message/BluePrintMessageLibConfiguration.kt | 3 +- .../message/BluePrintMessageLibData.kt | 5 ++- .../KafkaBasicAuthMessageConsumerService.kt | 23 +++++----- .../KafkaBasicAuthMessageProducerService.kt | 1 - .../service/BlueprintMessageConsumerServiceTest.kt | 52 +++++++++++++++++++--- .../service/BlueprintMessageProducerServiceTest.kt | 2 +- 6 files changed, 64 insertions(+), 22 deletions(-) (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 281a970b8..27a444bdc 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -59,8 +59,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep class MessageLibConstants { companion object { const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" - //TODO("Change to .messageconsumer in application.properties") - const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient." + const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index c77cdfdb5..ab04054fe 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -33,8 +33,9 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var consumerTopic: String? = null - var pollMillSec: Long = 100 + var clientId: String? = null + var topic: String? = null + var pollMillSec: Long = 1000 } open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index 076501eab..5a9e61bfd 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.CommonClientConfigs @@ -47,6 +48,10 @@ class KafkaBasicAuthMessageConsumerService( configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + if (messageConsumerProperties.clientId != null) { + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! + } + // TODO("Security Implementation based on type") /** add or override already set properties */ additionalConfig?.let { configProperties.putAll(it) } /** Create Kafka consumer */ @@ -55,7 +60,7 @@ class KafkaBasicAuthMessageConsumerService( override suspend fun subscribe(additionalConfig: Map?): Channel { /** get to topic names */ - val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() } + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } return subscribe(consumerTopic, additionalConfig) } @@ -64,6 +69,7 @@ class KafkaBasicAuthMessageConsumerService( override suspend fun subscribe(consumerTopic: List, additionalConfig: Map?): Channel { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) + checkNotNull(kafkaConsumer) { "failed to create kafka consumer for " + "server(${messageConsumerProperties.bootstrapServers})'s " + @@ -73,7 +79,7 @@ class KafkaBasicAuthMessageConsumerService( kafkaConsumer!!.subscribe(consumerTopic) log.info("Successfully consumed topic($consumerTopic)") - val listenerThread = thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { @@ -93,21 +99,18 @@ class KafkaBasicAuthMessageConsumerService( } } } + log.info("message listener shutting down.....") } - } - log.info("Successfully consumed in thread(${listenerThread})") return channel } override suspend fun shutDown() { - /** Close the Channel */ - channel.close() /** stop the polling loop */ keepGoing = false - if (kafkaConsumer != null) { - /** sunsubscribe the consumer */ - kafkaConsumer!!.unsubscribe() - } + /** Close the Channel */ + channel.cancel() + /** TO shutdown gracefully, need to wait for the maximum poll time */ + delay(messageConsumerProperties.pollMillSec) } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 008e92437..1c93bb0fc 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -83,7 +83,6 @@ class KafkaBasicAuthMessageProducerService( } fun messageTemplate(additionalConfig: Map? = null): KafkaTemplate { - log.info("Prepering templates") if (kafkaTemplate == null) { kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 18b86b8d8..2b84eaa78 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import io.mockk.every import io.mockk.spyk import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.ConsumerRecord @@ -30,12 +31,14 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertNotNull +import kotlin.test.assertTrue @RunWith(SpringRunner::class) @@ -43,12 +46,20 @@ import kotlin.test.assertNotNull @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, BlueprintPropertyConfiguration::class, BluePrintProperties::class]) @TestPropertySource(properties = -["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092", - "blueprintsprocessor.messageclient.sample.groupId=sample-group", - "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic" +["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" ]) open class BlueprintMessageConsumerServiceTest { + val log = logger(BlueprintMessageConsumerServiceTest::class) @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -90,11 +101,40 @@ open class BlueprintMessageConsumerServiceTest { val channel = spyBlueprintMessageConsumerService.subscribe(null) launch { channel.consumeEach { - println("Received message : $it") + assertTrue(it.startsWith("I am message"), "failed to get the actual message") } } - //delay(100) + delay(10) spyBlueprintMessageConsumerService.shutDown() } } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testKafkaIntegration() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val channel = blueprintMessageConsumerService.subscribe(null) + launch { + channel.consumeEach { + log.info("Consumed Message : $it") + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + launch { + repeat(5) { + delay(1000) + blueprintMessageProducerService.sendMessage("this is my message($it)") + } + } + delay(10000) + blueprintMessageConsumerService.shutDown() + } + } } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 0db62c1df..31bcc1517 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -42,7 +42,7 @@ import kotlin.test.assertTrue BlueprintPropertyConfiguration::class, BluePrintProperties::class]) @TestPropertySource(properties = ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" ]) -- cgit 1.2.3-korg