diff options
author | Brinda Santh <bs2796@att.com> | 2019-12-18 15:19:58 -0500 |
---|---|---|
committer | KAPIL SINGAL <ks220y@att.com> | 2019-12-19 20:48:38 +0000 |
commit | 10c2988b51c764e62d8eeed52b254d363512eb24 (patch) | |
tree | 2c93f00798168375d083ee35fed74cfe49669d02 /ms/blueprintsprocessor/modules/commons/nats-lib/src/test | |
parent | 20b07e37990f1926d7b3cb45542b76c0336f9f19 (diff) |
Cluster communication channels
Add NATS property and library services both .
NATS Messaging Services with Token Auth and TLS Auth implementation
Docker Compose for NATS Streaming instance.
Documentation : https://docs.nats.io/
Issue-ID: CCSDK-2007
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Ieebaa8f2b18ae89d02a4f38a8027eda495a9db43
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib/src/test')
3 files changed, 315 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt new file mode 100644 index 000000000..ec120dc18 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt @@ -0,0 +1,51 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.mockk.every +import io.mockk.mockk +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType +import java.nio.charset.Charset +import kotlin.test.assertEquals + +class BluePrintNatsExtensionsTest { + + @Test + fun testMessageStrConversion() { + val mockMessage = mockk<io.nats.client.Message>() + every { mockMessage.data } returns "I am message".toByteArray(Charset.defaultCharset()) + + val messageData = mockMessage.strData() + assertEquals("I am message", messageData) + } + + @Test + fun testMessageJsonConversion() { + val json = """{"name":"value"}""" + + val mockMessage = mockk<io.nats.client.Message>() + every { mockMessage.data } returns json.jsonAsJsonType().asByteArray() + + val messageData = mockMessage.asJsonType().asJsonString() + assertEquals(json, messageData) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt new file mode 100644 index 000000000..976f9f5e8 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -0,0 +1,228 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.nats.streaming.MessageHandler +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType +import kotlin.test.assertNotNull + +class BluePrintNatsServiceTest { + + @Test + fun testTokenAuthNatService() { + val configuration = """{ + "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}", + "host" : "nats://localhost:4222", + "token" : "tokenAuth" + } + """.trimIndent() + + val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk()) + + val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService) + every { + spkBluePrintNatsLibPropertyService + .bluePrintNatsService(any<NatsConnectionProperties>()) + } returns TokenAuthNatsService( + mockk() + ) + + val bluePrintNatsService = + spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType()) + assertNotNull(bluePrintNatsService, "failed to get NATS Service") + } + + @Test + fun testTLSAuthNatService() { + val configuration = """{ + "type" : "${NatsLibConstants.TYPE_TLS_AUTH}", + "host" : "nats://localhost:4222" + } + """.trimIndent() + + val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk()) + + val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService) + every { + spkBluePrintNatsLibPropertyService + .bluePrintNatsService(any<NatsConnectionProperties>()) + } returns TLSAuthNatsService( + mockk() + ) + + val bluePrintNatsService = + spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType()) + assertNotNull(bluePrintNatsService, "failed to get NATS Service") + } + + /** Enable to test only on local desktop. Don't enable in Build server + * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + */ + // @Test + fun localTntegrationTest() { + runBlocking { + + val connectionProperties = TokenAuthNatsConnectionProperties().apply { + host = "nats://localhost:4222,nats://localhost:4223" + clientId = "client-1" + token = "tokenAuth" + } + val natsService = TokenAuthNatsService(connectionProperties) + val streamingConnection = natsService.connection() + assertNotNull(streamingConnection, "failed to create nats connection") + + val connectionProperties2 = TokenAuthNatsConnectionProperties().apply { + host = "nats://localhost:4222,nats://localhost:4223" + clientId = "client-2" + token = "tokenAuth" + } + val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2) + val streamingConnection2 = tlsAuthNatsService2.connection() + assertNotNull(streamingConnection2, "failed to create nats connection 2") + + testMultiPublish(natsService) + testLoadBalance(natsService) + testRequestReply(natsService) + testMultiRequestReply(natsService) + delay(1000) + } + } + + private fun testMultiPublish(natsService: BluePrintNatsService) { + runBlocking { + /** Multiple Publish Message Test **/ + val messageHandler1 = + MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") } + val messageHandler2 = + MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") } + + natsService.subscribe("multi-publish", messageHandler1) + natsService.subscribe("multi-publish", messageHandler2) + + repeat(5) { + natsService.publish("multi-publish", "multi publish message-$it".toByteArray()) + } + } + } + + private fun testLoadBalance(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") } + val lbMessageHandler2 = + MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") } + + natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) + natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) + + repeat(5) { + natsService.publish("lb-publish", "lb publish message-$it".toByteArray()) + } + } + } + + private fun testRequestReply(natsService: BluePrintNatsService) { + runBlocking { + val lbMessageHandler1 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${String(message.data)} reply from 1".toByteArray() + ) + } + + val lbMessageHandler2 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${String(message.data)} reply from 2".toByteArray() + ) + } + + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1) + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2) + + repeat(5) { + val message = natsService.requestAndGetOneReply( + "rr-request", + "rr message-$it".toByteArray(), + 1000 + ) + println("Received : ${message.strData()}") + } + } + } + + private fun testMultiRequestReply(natsService: BluePrintNatsService) { + runBlocking { + /** Request Reply **/ + val lbMessageHandler1 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${message.strData()} reply from 1".toByteArray() + ) + message.connection.publish( + message.replyTo, + "Completion ${message.strData()} reply from 1".toByteArray() + ) + } + val lbMessageHandler2 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${message.strData()} reply from 2".toByteArray() + ) + message.connection.publish( + message.replyTo, + "Completion ${message.strData()} reply from 2".toByteArray() + ) + } + + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1) + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2) + + /** Should unsubscribe on completion message */ + val rrReplyMessageHandler = io.nats.client.MessageHandler { message -> + val messageContent = message.strData() + println("RR Reply Handler : $messageContent") + if (messageContent.startsWith("Completion")) { + message.subscription.unsubscribe() + } + } + repeat(5) { + natsService.requestAndGetMultipleReplies( + "rr-request", + "rr-reply-$it", + "rr message-$it".toByteArray(), + rrReplyMessageHandler + ) + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml new file mode 100644 index 000000000..9bb940006 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml @@ -0,0 +1,36 @@ +<!-- + ~ Copyright © 2019 IBM. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="io.mockk" level="warn"/> + <logger name="org.springframework.test" level="warn"/> + <logger name="org.springframework" level="warn"/> + <logger name="org.hibernate" level="info"/> + <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> + + <root level="warn"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> |