diff options
author | Brinda Santh <brindasanth@in.ibm.com> | 2019-09-05 11:06:48 -0400 |
---|---|---|
committer | Brinda Santh <brindasanth@in.ibm.com> | 2019-09-05 11:06:48 -0400 |
commit | a2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 (patch) | |
tree | 45cd9390e65bf1b3851f1e00f7a4cdf21054b93e /ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin | |
parent | 8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (diff) |
Add Config based blueprint process consumer
Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497
Issue-ID: CCSDK-1668
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin')
2 files changed, 47 insertions, 7 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 18b86b8d8..2b84eaa78 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import io.mockk.every import io.mockk.spyk import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.ConsumerRecord @@ -30,12 +31,14 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertNotNull +import kotlin.test.assertTrue @RunWith(SpringRunner::class) @@ -43,12 +46,20 @@ import kotlin.test.assertNotNull @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, BlueprintPropertyConfiguration::class, BluePrintProperties::class]) @TestPropertySource(properties = -["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092", - "blueprintsprocessor.messageclient.sample.groupId=sample-group", - "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic" +["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" ]) open class BlueprintMessageConsumerServiceTest { + val log = logger(BlueprintMessageConsumerServiceTest::class) @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -90,11 +101,40 @@ open class BlueprintMessageConsumerServiceTest { val channel = spyBlueprintMessageConsumerService.subscribe(null) launch { channel.consumeEach { - println("Received message : $it") + assertTrue(it.startsWith("I am message"), "failed to get the actual message") } } - //delay(100) + delay(10) spyBlueprintMessageConsumerService.shutDown() } } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testKafkaIntegration() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val channel = blueprintMessageConsumerService.subscribe(null) + launch { + channel.consumeEach { + log.info("Consumed Message : $it") + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + launch { + repeat(5) { + delay(1000) + blueprintMessageProducerService.sendMessage("this is my message($it)") + } + } + delay(10000) + blueprintMessageConsumerService.shutDown() + } + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 0db62c1df..31bcc1517 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -42,7 +42,7 @@ import kotlin.test.assertTrue BlueprintPropertyConfiguration::class, BluePrintProperties::class]) @TestPropertySource(properties = ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" ]) |