summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src
diff options
context:
space:
mode:
authorBrinda Santh <brindasanth@in.ibm.com>2019-09-05 11:06:48 -0400
committerBrinda Santh <brindasanth@in.ibm.com>2019-09-05 11:06:48 -0400
commita2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 (patch)
tree45cd9390e65bf1b3851f1e00f7a4cdf21054b93e /ms/blueprintsprocessor/modules/commons/message-lib/src
parent8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (diff)
Add Config based blueprint process consumer
Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497 Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt23
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt52
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
6 files changed, 64 insertions, 22 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index 281a970b8..27a444bdc 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -59,8 +59,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep
class MessageLibConstants {
companion object {
const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
- //TODO("Change to .messageconsumer in application.properties")
- const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient."
+ const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index c77cdfdb5..ab04054fe 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -33,8 +33,9 @@ open class MessageConsumerProperties
open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
lateinit var groupId: String
- var consumerTopic: String? = null
- var pollMillSec: Long = 100
+ var clientId: String? = null
+ var topic: String? = null
+ var pollMillSec: Long = 1000
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 076501eab..5a9e61bfd 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.CommonClientConfigs
@@ -47,6 +48,10 @@ class KafkaBasicAuthMessageConsumerService(
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ if (messageConsumerProperties.clientId != null) {
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
+ }
+ // TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
@@ -55,7 +60,7 @@ class KafkaBasicAuthMessageConsumerService(
override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
/** get to topic names */
- val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() }
+ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
return subscribe(consumerTopic, additionalConfig)
}
@@ -64,6 +69,7 @@ class KafkaBasicAuthMessageConsumerService(
override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
+
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
"server(${messageConsumerProperties.bootstrapServers})'s " +
@@ -73,7 +79,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.subscribe(consumerTopic)
log.info("Successfully consumed topic($consumerTopic)")
- val listenerThread = thread(start = true, name = "KafkaConsumer") {
+ thread(start = true, name = "KafkaConsumer") {
keepGoing = true
kafkaConsumer!!.use { kc ->
while (keepGoing) {
@@ -93,21 +99,18 @@ class KafkaBasicAuthMessageConsumerService(
}
}
}
+ log.info("message listener shutting down.....")
}
-
}
- log.info("Successfully consumed in thread(${listenerThread})")
return channel
}
override suspend fun shutDown() {
- /** Close the Channel */
- channel.close()
/** stop the polling loop */
keepGoing = false
- if (kafkaConsumer != null) {
- /** sunsubscribe the consumer */
- kafkaConsumer!!.unsubscribe()
- }
+ /** Close the Channel */
+ channel.cancel()
+ /** TO shutdown gracefully, need to wait for the maximum poll time */
+ delay(messageConsumerProperties.pollMillSec)
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 008e92437..1c93bb0fc 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -83,7 +83,6 @@ class KafkaBasicAuthMessageProducerService(
}
fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
- log.info("Prepering templates")
if (kafkaTemplate == null) {
kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 18b86b8d8..2b84eaa78 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import io.mockk.every
import io.mockk.spyk
import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -30,12 +31,14 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@@ -43,12 +46,20 @@ import kotlin.test.assertNotNull
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
BlueprintPropertyConfiguration::class, BluePrintProperties::class])
@TestPropertySource(properties =
-["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
- "blueprintsprocessor.messageclient.sample.groupId=sample-group",
- "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
+["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
])
open class BlueprintMessageConsumerServiceTest {
+ val log = logger(BlueprintMessageConsumerServiceTest::class)
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -90,11 +101,40 @@ open class BlueprintMessageConsumerServiceTest {
val channel = spyBlueprintMessageConsumerService.subscribe(null)
launch {
channel.consumeEach {
- println("Received message : $it")
+ assertTrue(it.startsWith("I am message"), "failed to get the actual message")
}
}
- //delay(100)
+ delay(10)
spyBlueprintMessageConsumerService.shutDown()
}
}
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testKafkaIntegration() {
+ runBlocking {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val channel = blueprintMessageConsumerService.subscribe(null)
+ launch {
+ channel.consumeEach {
+ log.info("Consumed Message : $it")
+ }
+ }
+
+ /** Send message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ launch {
+ repeat(5) {
+ delay(1000)
+ blueprintMessageProducerService.sendMessage("this is my message($it)")
+ }
+ }
+ delay(10000)
+ blueprintMessageConsumerService.shutDown()
+ }
+ }
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 0db62c1df..31bcc1517 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -42,7 +42,7 @@ import kotlin.test.assertTrue
BlueprintPropertyConfiguration::class, BluePrintProperties::class])
@TestPropertySource(properties =
["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
"blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
])