summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/nats-lib
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt43
1 files changed, 41 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
index 976f9f5e8..549be6481 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
@@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
import kotlin.test.assertNotNull
@@ -106,6 +107,7 @@ class BluePrintNatsServiceTest {
testMultiPublish(natsService)
testLoadBalance(natsService)
+ testLimitSubscription(natsService)
testRequestReply(natsService)
testMultiRequestReply(natsService)
delay(1000)
@@ -137,12 +139,49 @@ class BluePrintNatsServiceTest {
val lbMessageHandler2 =
MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+ val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+ val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
repeat(5) {
natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
}
+ sub1.unsubscribe()
+ sub2.unsubscribe()
+ }
+ }
+
+ private fun testLimitSubscription(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Load balance Publish Message Test **/
+ val lbMessageHandler1 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 1: ${message.strData()}")
+ message.ack()
+ }
+ }
+ val lbMessageHandler2 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 2: ${message.strData()}")
+ message.ack()
+ }
+ }
+
+ val sub1 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler1,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+ val sub2 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler2,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+
+ repeat(10) {
+ natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
+ }
+ sub1.unsubscribe()
+ sub2.unsubscribe()
}
}