summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/nats-lib/src
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-12-18 15:19:58 -0500
committerKAPIL SINGAL <ks220y@att.com>2019-12-19 20:48:38 +0000
commit10c2988b51c764e62d8eeed52b254d363512eb24 (patch)
tree2c93f00798168375d083ee35fed74cfe49669d02 /ms/blueprintsprocessor/modules/commons/nats-lib/src
parent20b07e37990f1926d7b3cb45542b76c0336f9f19 (diff)
Cluster communication channels
Add NATS property and library services both . NATS Messaging Services with Token Auth and TLS Auth implementation Docker Compose for NATS Streaming instance. Documentation : https://docs.nats.io/ Issue-ID: CCSDK-2007 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: Ieebaa8f2b18ae89d02a4f38a8027eda495a9db43
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt36
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt41
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt37
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt97
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt143
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt53
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt48
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt51
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt228
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml36
11 files changed, 802 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt
new file mode 100644
index 000000000..a585c972b
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+
+fun io.nats.client.Message.strData(): String {
+ return String(this.data)
+}
+
+fun io.nats.streaming.Message.strData(): String {
+ return String(this.data)
+}
+
+fun io.nats.client.Message.asJsonType(): JsonNode {
+ return this.data.asJsonType()
+}
+
+fun io.nats.streaming.Message.asJsonType(): JsonNode {
+ return this.data.asJsonType()
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
new file mode 100644
index 000000000..709ee7d6e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+@ComponentScan
+open class BluePrintNatsLibConfiguration
+
+/**
+ * Exposed Dependency Service by this NATS Lib Module
+ */
+fun BluePrintDependencyService.natsLibPropertyService(): BluePrintNatsLibPropertyService =
+ instance(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY)
+
+class NatsLibConstants {
+ companion object {
+ const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service"
+ const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats."
+ const val TYPE_TOKEN_AUTH = "token-auth"
+ const val TYPE_TLS_AUTH = "tls-auth"
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
new file mode 100644
index 000000000..3329ec200
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats
+
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsConnectionProperties {
+ lateinit var type: String
+ var clusterId: String = ClusterUtils.clusterId()
+ var clientId: String = ClusterUtils.clusterNodeId()
+ lateinit var host: String
+}
+
+open class TokenAuthNatsConnectionProperties : NatsConnectionProperties() {
+ lateinit var token: String
+}
+
+open class TLSAuthNatsConnectionProperties : NatsConnectionProperties() {
+ var trustCertCollection: String? = null
+ /** Below Used only for Mutual TLS */
+ var clientCertChain: String? = null
+ var clientPrivateKey: String? = null
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
new file mode 100644
index 000000000..faf171528
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TLSAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.springframework.stereotype.Service
+
+@Service(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY)
+open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
+
+ fun bluePrintNatsService(jsonNode: JsonNode): BluePrintNatsService {
+ val natsConnectionProperties = natsConnectionProperties(jsonNode)
+ return bluePrintNatsService(natsConnectionProperties)
+ }
+
+ fun bluePrintNatsService(selector: String): BluePrintNatsService {
+ val prefix = "${NatsLibConstants.PROPERTY_NATS_PREFIX}$selector"
+ val natsConnectionProperties = natsConnectionProperties(prefix)
+ return bluePrintNatsService(natsConnectionProperties)
+ }
+
+ /** NATS Lib Property Service */
+ fun natsConnectionProperties(jsonNode: JsonNode): NatsConnectionProperties {
+ return when (val type = jsonNode.get("type").textValue()) {
+ NatsLibConstants.TYPE_TOKEN_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TokenAuthNatsConnectionProperties::class.java)!!
+ }
+ NatsLibConstants.TYPE_TLS_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TLSAuthNatsConnectionProperties::class.java)!!
+ }
+ else -> {
+ throw BluePrintProcessorException("Nats type($type) not supported")
+ }
+ }
+ }
+
+ fun natsConnectionProperties(prefix: String): NatsConnectionProperties {
+ val type = bluePrintPropertiesService.propertyBeanType(
+ "$prefix.type", String::class.java
+ )
+ return when (type) {
+ NatsLibConstants.TYPE_TOKEN_AUTH -> {
+ tokenAuthNatsConnectionProperties(prefix)
+ }
+ NatsLibConstants.TYPE_TLS_AUTH -> {
+ tlsAuthNatsConnectionProperties(prefix)
+ }
+ else -> {
+ throw BluePrintProcessorException("Grpc type($type) not supported")
+ }
+ }
+ }
+
+ private fun tokenAuthNatsConnectionProperties(prefix: String): TokenAuthNatsConnectionProperties {
+ return bluePrintPropertiesService.propertyBeanType(prefix, TokenAuthNatsConnectionProperties::class.java)
+ }
+
+ private fun tlsAuthNatsConnectionProperties(prefix: String): TLSAuthNatsConnectionProperties {
+ return bluePrintPropertiesService.propertyBeanType(prefix, TLSAuthNatsConnectionProperties::class.java)
+ }
+
+ fun bluePrintNatsService(natsConnectionProperties: NatsConnectionProperties):
+ BluePrintNatsService {
+ return when (natsConnectionProperties) {
+ is TokenAuthNatsConnectionProperties -> {
+ TokenAuthNatsService(natsConnectionProperties)
+ }
+ is TLSAuthNatsConnectionProperties -> {
+ TLSAuthNatsService(natsConnectionProperties)
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't get nats service for properties $natsConnectionProperties")
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt
new file mode 100644
index 000000000..9548fe78d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@file:Suppress("BlockingMethodInNonBlockingContext")
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.nats.client.Dispatcher
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.StreamingConnection
+import io.nats.streaming.Subscription
+import io.nats.streaming.SubscriptionOptions
+import java.time.Duration
+
+interface BluePrintNatsService {
+
+ /** Create and Return the NATS streaming connection */
+ suspend fun connection(): StreamingConnection
+
+ /** Send one request [message] to the [subject] and get only one reply
+ * The request message subscriber may be multi instances consumer or load balance consumer.
+ * If it is multi instances consumer, then we will get only first responses from subscribers.
+ *
+ */
+ suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message {
+ return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout))
+ }
+
+ /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler]
+ * The request message subscriber may be multi instances consumer or load balance consumer.
+ * If it is multi instances consumer, then we will get multiple responses from subscribers.
+ * Include the unSubscribe logic's in [messageHandler] implementation.
+ */
+ suspend fun requestAndGetMultipleReplies(
+ subject: String,
+ replySubject: String,
+ message: ByteArray,
+ messageHandler: io.nats.client.MessageHandler
+ ) {
+ val natsConnection = connection().natsConnection
+ val dispatcher = natsConnection.createDispatcher(messageHandler)
+ /** Reply subject consumer */
+ dispatcher.subscribe(replySubject)
+
+ /** Publish the request message and expect the reply messages in reply subject consumer */
+ natsConnection.publish(subject, replySubject, message)
+ }
+
+ /** Synchronous reply Subscribe the [subject] with the [messageHandler].
+ * This is used only the message has to be consumed by all instances in the cluster and message handler must reply.
+ */
+ suspend fun replySubscribe(
+ subject: String,
+ messageHandler: io.nats.client.MessageHandler
+ ): Dispatcher {
+ val natsConnection = connection().natsConnection
+ val dispatcher = natsConnection.createDispatcher(messageHandler)
+ return dispatcher.subscribe(subject)
+ }
+
+ /**
+ * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup].
+ * This is used only the message has to be consumed by only one instance in the cluster.
+ * server will now load balance messages between the members of the queue group and message handler must reply.
+ */
+ suspend fun loadBalanceReplySubscribe(
+ subject: String,
+ loadBalanceGroup: String,
+ messageHandler: io.nats.client.MessageHandler
+ ): Dispatcher {
+ val natsConnection = connection().natsConnection
+ val dispatcher = natsConnection.createDispatcher(messageHandler)
+ return dispatcher.subscribe(subject, loadBalanceGroup)
+ }
+
+ /** Publish the [message] to all subscribers on the [subject] */
+ suspend fun publish(subject: String, message: ByteArray) {
+ connection().publish(subject, message)
+ }
+
+ /** Subscribe the [subject] with the [messageHandler].
+ * This is used only the message has to be consumed by all instances in the cluster.
+ */
+ suspend fun subscribe(
+ subject: String,
+ messageHandler: MessageHandler
+ ): Subscription {
+ return connection().subscribe(subject, messageHandler)
+ }
+
+ /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions].
+ * This is used only the message has to be consumed by all instances in the cluster.
+ */
+ suspend fun subscribe(
+ subject: String,
+ messageHandler: MessageHandler,
+ subscriptionOptions: SubscriptionOptions
+ ): Subscription {
+ return connection().subscribe(subject, messageHandler, subscriptionOptions)
+ }
+
+ /**
+ * https://docs.nats.io/developing-with-nats/receiving/queues
+ * subscribers will listen for [subject] with [loadBalanceGroup].
+ * This is used only the message has to be consumed by only one instance in the cluster.
+ * server will now load balance messages between the members of the queue group.
+ */
+ suspend fun loadBalanceSubscribe(
+ subject: String,
+ loadBalanceGroup: String,
+ messageHandler: MessageHandler
+ ): Subscription {
+ return connection().subscribe(subject, loadBalanceGroup, messageHandler)
+ }
+
+ /**
+ * https://docs.nats.io/developing-with-nats/receiving/queues
+ * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions].
+ * This is used only the message has to be consumed by only one instance in the cluster.
+ * server will now load balance messages between the members of the queue group.
+ */
+ suspend fun loadBalanceSubscribe(
+ subject: String,
+ loadBalanceGroup: String,
+ messageHandler: MessageHandler,
+ subscriptionOptions: SubscriptionOptions
+ ): Subscription {
+ return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
new file mode 100644
index 000000000..3781fae59
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.nats.client.Nats
+import io.nats.client.Options
+import io.nats.streaming.NatsStreaming
+import io.nats.streaming.StreamingConnection
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TLSAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+import javax.net.ssl.SSLContext
+
+open class TLSAuthNatsService(private val natsConnectionProperties: TLSAuthNatsConnectionProperties) :
+ BluePrintNatsService {
+
+ lateinit var streamingConnection: StreamingConnection
+
+ override suspend fun connection(): StreamingConnection {
+ if (!::streamingConnection.isInitialized) {
+ val serverList = natsConnectionProperties.host.splitCommaAsList()
+
+ val options = Options.Builder()
+ .servers(serverList.toTypedArray())
+ // .sslContext(sslContext())
+ .build()
+ val natsConnection = Nats.connect(options)
+ val streamingOptions = io.nats.streaming.Options.Builder().natsConn(natsConnection).build()
+ streamingConnection = NatsStreaming.connect(
+ natsConnectionProperties.clusterId,
+ natsConnectionProperties.clientId, streamingOptions
+ )
+ }
+ return streamingConnection
+ }
+
+ fun sslContext(): SSLContext {
+ TODO("Implement NATS SSL Context")
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
new file mode 100644
index 000000000..0da3022ff
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.nats.client.Nats
+import io.nats.client.Options
+import io.nats.streaming.NatsStreaming
+import io.nats.streaming.StreamingConnection
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+
+open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthNatsConnectionProperties) :
+ BluePrintNatsService {
+
+ lateinit var streamingConnection: StreamingConnection
+
+ override suspend fun connection(): StreamingConnection {
+ if (!::streamingConnection.isInitialized) {
+ val serverList = natsConnectionProperties.host.splitCommaAsList()
+
+ val options = Options.Builder()
+ .servers(serverList.toTypedArray())
+ .token(natsConnectionProperties.token.toCharArray())
+ .build()
+ val natsConnection = Nats.connect(options)
+ val streamingOptions = io.nats.streaming.Options.Builder().natsConn(natsConnection).build()
+ streamingConnection = NatsStreaming.connect(
+ natsConnectionProperties.clusterId,
+ natsConnectionProperties.clientId, streamingOptions
+ )
+ }
+ return streamingConnection
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt
new file mode 100644
index 000000000..48a759cc5
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.utils
+
+import io.nats.streaming.SubscriptionOptions
+
+object SubscriptionOptionsUtils {
+
+ /** Subscribe with a durable [name] and client can re subscribe with durable [name] */
+ fun durable(name: String): SubscriptionOptions {
+ return SubscriptionOptions.Builder().durableName(name).build()
+ }
+
+ /** Subscribe with manual ack mode and a max in-flight [limit] */
+ fun manualAckWithRateLimit(limit: Int): SubscriptionOptions {
+ return SubscriptionOptions.Builder().manualAcks().maxInFlight(limit).build()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt
new file mode 100644
index 000000000..ec120dc18
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.mockk.every
+import io.mockk.mockk
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
+import java.nio.charset.Charset
+import kotlin.test.assertEquals
+
+class BluePrintNatsExtensionsTest {
+
+ @Test
+ fun testMessageStrConversion() {
+ val mockMessage = mockk<io.nats.client.Message>()
+ every { mockMessage.data } returns "I am message".toByteArray(Charset.defaultCharset())
+
+ val messageData = mockMessage.strData()
+ assertEquals("I am message", messageData)
+ }
+
+ @Test
+ fun testMessageJsonConversion() {
+ val json = """{"name":"value"}"""
+
+ val mockMessage = mockk<io.nats.client.Message>()
+ every { mockMessage.data } returns json.jsonAsJsonType().asByteArray()
+
+ val messageData = mockMessage.asJsonType().asJsonString()
+ assertEquals(json, messageData)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
new file mode 100644
index 000000000..976f9f5e8
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
@@ -0,0 +1,228 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.nats.streaming.MessageHandler
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
+import kotlin.test.assertNotNull
+
+class BluePrintNatsServiceTest {
+
+ @Test
+ fun testTokenAuthNatService() {
+ val configuration = """{
+ "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}",
+ "host" : "nats://localhost:4222",
+ "token" : "tokenAuth"
+ }
+ """.trimIndent()
+
+ val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
+
+ val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
+ every {
+ spkBluePrintNatsLibPropertyService
+ .bluePrintNatsService(any<NatsConnectionProperties>())
+ } returns TokenAuthNatsService(
+ mockk()
+ )
+
+ val bluePrintNatsService =
+ spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
+ assertNotNull(bluePrintNatsService, "failed to get NATS Service")
+ }
+
+ @Test
+ fun testTLSAuthNatService() {
+ val configuration = """{
+ "type" : "${NatsLibConstants.TYPE_TLS_AUTH}",
+ "host" : "nats://localhost:4222"
+ }
+ """.trimIndent()
+
+ val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
+
+ val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
+ every {
+ spkBluePrintNatsLibPropertyService
+ .bluePrintNatsService(any<NatsConnectionProperties>())
+ } returns TLSAuthNatsService(
+ mockk()
+ )
+
+ val bluePrintNatsService =
+ spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
+ assertNotNull(bluePrintNatsService, "failed to get NATS Service")
+ }
+
+ /** Enable to test only on local desktop. Don't enable in Build server
+ * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ */
+ // @Test
+ fun localTntegrationTest() {
+ runBlocking {
+
+ val connectionProperties = TokenAuthNatsConnectionProperties().apply {
+ host = "nats://localhost:4222,nats://localhost:4223"
+ clientId = "client-1"
+ token = "tokenAuth"
+ }
+ val natsService = TokenAuthNatsService(connectionProperties)
+ val streamingConnection = natsService.connection()
+ assertNotNull(streamingConnection, "failed to create nats connection")
+
+ val connectionProperties2 = TokenAuthNatsConnectionProperties().apply {
+ host = "nats://localhost:4222,nats://localhost:4223"
+ clientId = "client-2"
+ token = "tokenAuth"
+ }
+ val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2)
+ val streamingConnection2 = tlsAuthNatsService2.connection()
+ assertNotNull(streamingConnection2, "failed to create nats connection 2")
+
+ testMultiPublish(natsService)
+ testLoadBalance(natsService)
+ testRequestReply(natsService)
+ testMultiRequestReply(natsService)
+ delay(1000)
+ }
+ }
+
+ private fun testMultiPublish(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Multiple Publish Message Test **/
+ val messageHandler1 =
+ MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") }
+ val messageHandler2 =
+ MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") }
+
+ natsService.subscribe("multi-publish", messageHandler1)
+ natsService.subscribe("multi-publish", messageHandler2)
+
+ repeat(5) {
+ natsService.publish("multi-publish", "multi publish message-$it".toByteArray())
+ }
+ }
+ }
+
+ private fun testLoadBalance(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Load balance Publish Message Test **/
+ val lbMessageHandler1 =
+ MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") }
+ val lbMessageHandler2 =
+ MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
+
+ natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+ natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+
+ repeat(5) {
+ natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
+ }
+ }
+ }
+
+ private fun testRequestReply(natsService: BluePrintNatsService) {
+ runBlocking {
+ val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${String(message.data)} reply from 1".toByteArray()
+ )
+ }
+
+ val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${String(message.data)} reply from 2".toByteArray()
+ )
+ }
+
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
+
+ repeat(5) {
+ val message = natsService.requestAndGetOneReply(
+ "rr-request",
+ "rr message-$it".toByteArray(),
+ 1000
+ )
+ println("Received : ${message.strData()}")
+ }
+ }
+ }
+
+ private fun testMultiRequestReply(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Request Reply **/
+ val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${message.strData()} reply from 1".toByteArray()
+ )
+ message.connection.publish(
+ message.replyTo,
+ "Completion ${message.strData()} reply from 1".toByteArray()
+ )
+ }
+ val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${message.strData()} reply from 2".toByteArray()
+ )
+ message.connection.publish(
+ message.replyTo,
+ "Completion ${message.strData()} reply from 2".toByteArray()
+ )
+ }
+
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
+
+ /** Should unsubscribe on completion message */
+ val rrReplyMessageHandler = io.nats.client.MessageHandler { message ->
+ val messageContent = message.strData()
+ println("RR Reply Handler : $messageContent")
+ if (messageContent.startsWith("Completion")) {
+ message.subscription.unsubscribe()
+ }
+ }
+ repeat(5) {
+ natsService.requestAndGetMultipleReplies(
+ "rr-request",
+ "rr-reply-$it",
+ "rr message-$it".toByteArray(),
+ rrReplyMessageHandler
+ )
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..9bb940006
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml
@@ -0,0 +1,36 @@
+<!--
+ ~ Copyright © 2019 IBM.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="io.mockk" level="warn"/>
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate" level="info"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>