diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib')
-rw-r--r-- | ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt index 976f9f5e8..549be6481 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import kotlin.test.assertNotNull @@ -106,6 +107,7 @@ class BluePrintNatsServiceTest { testMultiPublish(natsService) testLoadBalance(natsService) + testLimitSubscription(natsService) testRequestReply(natsService) testMultiRequestReply(natsService) delay(1000) @@ -137,12 +139,49 @@ class BluePrintNatsServiceTest { val lbMessageHandler2 = MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") } - natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) - natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) + val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) + val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) repeat(5) { natsService.publish("lb-publish", "lb publish message-$it".toByteArray()) } + sub1.unsubscribe() + sub2.unsubscribe() + } + } + + private fun testLimitSubscription(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 1: ${message.strData()}") + message.ack() + } + } + val lbMessageHandler2 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 2: ${message.strData()}") + message.ack() + } + } + + val sub1 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler1, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + val sub2 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler2, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + + repeat(10) { + natsService.publish("lb-publish", "lb limit message-$it".toByteArray()) + } + sub1.unsubscribe() + sub2.unsubscribe() } } |