diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt')
-rw-r--r-- | ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt new file mode 100644 index 000000000..721828ac9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -0,0 +1,267 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.nats.streaming.MessageHandler +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType +import kotlin.test.assertNotNull + +class BluePrintNatsServiceTest { + + @Test + fun testTokenAuthNatService() { + val configuration = """{ + "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}", + "host" : "nats://localhost:4222", + "token" : "tokenAuth" + } + """.trimIndent() + + val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk()) + + val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService) + every { + spkBluePrintNatsLibPropertyService + .bluePrintNatsService(any<NatsConnectionProperties>()) + } returns TokenAuthNatsService( + mockk() + ) + + val bluePrintNatsService = + spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType()) + assertNotNull(bluePrintNatsService, "failed to get NATS Service") + } + + @Test + fun testTLSAuthNatService() { + val configuration = """{ + "type" : "${NatsLibConstants.TYPE_TLS_AUTH}", + "host" : "nats://localhost:4222" + } + """.trimIndent() + + val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk()) + + val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService) + every { + spkBluePrintNatsLibPropertyService + .bluePrintNatsService(any<NatsConnectionProperties>()) + } returns TLSAuthNatsService( + mockk() + ) + + val bluePrintNatsService = + spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType()) + assertNotNull(bluePrintNatsService, "failed to get NATS Service") + } + + /** Enable to test only on local desktop. Don't enable in Build server + * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + */ + // @Test + fun localIntegrationTest() { + runBlocking { + + val connectionProperties = TokenAuthNatsConnectionProperties().apply { + host = "nats://localhost:4222,nats://localhost:4223" + clientId = "client-1" + token = "tokenAuth" + } + val natsService = TokenAuthNatsService(connectionProperties) + val streamingConnection = natsService.connection() + assertNotNull(streamingConnection, "failed to create nats connection") + + val connectionProperties2 = TokenAuthNatsConnectionProperties().apply { + host = "nats://localhost:4222,nats://localhost:4223" + clientId = "client-2" + token = "tokenAuth" + } + val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2) + val streamingConnection2 = tlsAuthNatsService2.connection() + assertNotNull(streamingConnection2, "failed to create nats connection 2") + + testMultiPublish(natsService) + testLoadBalance(natsService) + testLimitSubscription(natsService) + testRequestReply(natsService) + testMultiRequestReply(natsService) + delay(1000) + } + } + + private fun testMultiPublish(natsService: BluePrintNatsService) { + runBlocking { + /** Multiple Publish Message Test **/ + val messageHandler1 = + MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") } + val messageHandler2 = + MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") } + + natsService.subscribe("multi-publish", messageHandler1) + natsService.subscribe("multi-publish", messageHandler2) + + repeat(5) { + natsService.publish("multi-publish", "multi publish message-$it".toByteArray()) + } + } + } + + private fun testLoadBalance(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") } + val lbMessageHandler2 = + MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") } + + val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) + val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) + + repeat(5) { + natsService.publish("lb-publish", "lb publish message-$it".toByteArray()) + } + sub1.unsubscribe() + sub2.unsubscribe() + } + } + + private fun testLimitSubscription(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 1: ${message.strData()}") + message.ack() + } + } + val lbMessageHandler2 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 2: ${message.strData()}") + message.ack() + } + } + + val sub1 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler1, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + val sub2 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler2, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + + repeat(10) { + natsService.publish("lb-publish", "lb limit message-$it".toByteArray()) + } + sub1.unsubscribe() + sub2.unsubscribe() + } + } + + private fun testRequestReply(natsService: BluePrintNatsService) { + runBlocking { + val lbMessageHandler1 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${String(message.data)} reply from 1".toByteArray() + ) + } + + val lbMessageHandler2 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${String(message.data)} reply from 2".toByteArray() + ) + } + + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1) + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2) + + repeat(5) { + val message = natsService.requestAndGetOneReply( + "rr-request", + "rr message-$it".toByteArray(), + 1000 + ) + println("Received : ${message.strData()}") + } + } + } + + private fun testMultiRequestReply(natsService: BluePrintNatsService) { + runBlocking { + /** Request Reply **/ + val lbMessageHandler1 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${message.strData()} reply from 1".toByteArray() + ) + message.connection.publish( + message.replyTo, + "Completion ${message.strData()} reply from 1".toByteArray() + ) + } + val lbMessageHandler2 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${message.strData()} reply from 2".toByteArray() + ) + message.connection.publish( + message.replyTo, + "Completion ${message.strData()} reply from 2".toByteArray() + ) + } + + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1) + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2) + + /** Should unsubscribe on completion message */ + val rrReplyMessageHandler = io.nats.client.MessageHandler { message -> + val messageContent = message.strData() + println("RR Reply Handler : $messageContent") + if (messageContent.startsWith("Completion")) { + message.subscription.unsubscribe() + } + } + repeat(5) { + natsService.requestAndGetMultipleReplies( + "rr-request", + "rr-reply-$it", + "rr message-$it".toByteArray(), + rrReplyMessageHandler + ) + } + } + } +} |