summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-12-18 15:19:58 -0500
committerKAPIL SINGAL <ks220y@att.com>2019-12-19 20:48:38 +0000
commit10c2988b51c764e62d8eeed52b254d363512eb24 (patch)
tree2c93f00798168375d083ee35fed74cfe49669d02 /ms/blueprintsprocessor
parent20b07e37990f1926d7b3cb45542b76c0336f9f19 (diff)
Cluster communication channels
Add NATS property and library services both . NATS Messaging Services with Token Auth and TLS Auth implementation Docker Compose for NATS Streaming instance. Documentation : https://docs.nats.io/ Issue-ID: CCSDK-2007 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: Ieebaa8f2b18ae89d02a4f38a8027eda495a9db43
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml15
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt29
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt9
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctionsTest.kt11
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README5
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/pom.xml52
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt36
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt41
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt37
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt97
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt143
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt53
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt48
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt51
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt228
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml36
-rwxr-xr-xms/blueprintsprocessor/modules/commons/pom.xml1
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/pom.xml4
-rwxr-xr-xms/blueprintsprocessor/parent/pom.xml43
20 files changed, 942 insertions, 29 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
index f4b4b7995..f43f19c52 100644
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -16,9 +16,21 @@ services:
MYSQL_DATABASE: sdnctl
MYSQL_USER: sdnctl
MYSQL_PASSWORD: sdnctl
+ nats:
+ image: nats-streaming:latest
+ container_name: nats
+ hostname: nats
+ command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-1 --cluster_node_id nats-1"
+ networks:
+ - cds-network
+ ports:
+ - "8222:8222"
+ - "4222:4222"
+ restart: always
cds-controller-1:
depends_on:
- db
+ - nats
image: onap/ccsdk-blueprintsprocessor:latest
container_name: cds-controller-1
hostname: cds-controller-1
@@ -42,6 +54,7 @@ services:
CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ NATS_HOSTS: nats://nats:4222
APPLICATIONNAME: cds-controller
BUNDLEVERSION: 1.0.0
APP_CONFIG_HOME: /opt/app/onap/config
@@ -50,6 +63,7 @@ services:
resource-resolution-1:
depends_on:
- db
+ - nats
image: onap/ccsdk-blueprintsprocessor:latest
container_name: resource-resolution-1
hostname: resource-resolution-1
@@ -72,6 +86,7 @@ services:
CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ NATS_HOSTS: nats://nats:4222
APPLICATIONNAME: resource-resolution
BUNDLEVERSION: 1.0.0
APP_CONFIG_HOME: /opt/app/onap/config
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
index 5189d0758..4ab3d6f86 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
@@ -56,9 +56,13 @@ fun <T : Any> T.bpClone(): T {
return ObjectUtils.clone(this)
}
+fun String.splitCommaAsList(): List<String> {
+ return this.split(",").map { it.trim() }.toList()
+}
+
fun String.isJson(): Boolean {
return ((this.trim().startsWith("{") && this.trim().endsWith("}")) ||
- (this.trim().startsWith("[") && this.trim().endsWith("]")))
+ (this.trim().startsWith("[") && this.trim().endsWith("]")))
}
fun Any.asJsonString(intend: Boolean? = false): String {
@@ -126,29 +130,25 @@ fun String.asJsonType(bpDataType: String): JsonNode {
}
/**
- * Utility to convert Complex or Primitive object to Json Type.
+ * Utility to convert Complex or Primitive object or ByteArray to Json Type.
*/
fun <T : Any?> T.asJsonType(): JsonNode {
return if (this == null || this is MissingNode || this is NullNode) {
NullNode.instance
} else {
when (this) {
- is JsonNode ->
- this
+ is JsonNode -> this
+ is ByteArray -> JacksonUtils.objectMapper.reader().readTree(this.inputStream())
is String -> {
if (this.isJson())
this.jsonAsJsonType()
else
TextNode(this)
}
- is Boolean ->
- BooleanNode.valueOf(this)
- is Int ->
- IntNode.valueOf(this.toInt())
- is Double ->
- DoubleNode.valueOf(this.toDouble())
- else ->
- JacksonUtils.jsonNodeFromObject(this)
+ is Boolean -> BooleanNode.valueOf(this)
+ is Int -> IntNode.valueOf(this.toInt())
+ is Double -> DoubleNode.valueOf(this.toDouble())
+ else -> JacksonUtils.jsonNodeFromObject(this)
}
}
}
@@ -188,6 +188,11 @@ fun ArrayNode.asListOfString(): List<String> {
return JacksonUtils.getListFromJsonNode(this, String::class.java)
}
+fun JsonNode.asByteArray(): ByteArray {
+ val writer = JacksonUtils.objectMapper.writer()
+ return writer.writeValueAsBytes(this)
+}
+
fun <T> JsonNode.asType(clazzType: Class<T>): T {
return JacksonUtils.readValue(this, clazzType)
?: throw BluePrintException("couldn't convert JsonNode of type $clazzType")
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
index d3d621093..d5ffe6b7f 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
@@ -16,6 +16,7 @@
package org.onap.ccsdk.cds.controllerblueprints.core.utils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import java.net.InetAddress
object ClusterUtils {
@@ -25,4 +26,12 @@ object ClusterUtils {
val ip = InetAddress.getLocalHost()
return ip.hostName
}
+
+ fun clusterId(): String {
+ return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
+ }
+
+ fun clusterNodeId(): String {
+ return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller"
+ }
}
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctionsTest.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctionsTest.kt
index 3ae87b32d..7ac9b8649 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctionsTest.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctionsTest.kt
@@ -68,6 +68,17 @@ class CustomFunctionsTest {
}
@Test
+ fun testByteArrayJsonType() {
+ val jsonNode = """{"Name" :"Value"}""".jsonAsJsonType()
+
+ val byteArray = jsonNode.asByteArray()
+ assertNotNull(byteArray, "failed to get ByteArray form Json")
+
+ val reverseJsonNode = byteArray.asJsonType()
+ assertNotNull(reverseJsonNode, "failed to get Json type from ByteArray")
+ }
+
+ @Test
fun testAsJsonType() {
val nullReturnValue: JsonNode = null.asJsonType()
assertEquals(NullNode.instance, nullReturnValue)
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README
deleted file mode 100644
index c4e91a2fc..000000000
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README
+++ /dev/null
@@ -1,5 +0,0 @@
-
-Generate Certifactes & Keys
-----------------------------
-
-openssl req -x509 -newkey rsa:4096 -keyout my-private-key.pem -out my-public-key-cert.pem -days 3650 -nodes -subj '/CN=localhost'
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/nats-lib/pom.xml
new file mode 100644
index 000000000..f4e7f4d98
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>commons</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nats-lib</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Blueprints Processor NATS Lib</name>
+ <description>Blueprints Processor NATS Lib</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>jnats</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>java-nats-streaming</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>blueprint-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>processor-core</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt
new file mode 100644
index 000000000..a585c972b
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+
+fun io.nats.client.Message.strData(): String {
+ return String(this.data)
+}
+
+fun io.nats.streaming.Message.strData(): String {
+ return String(this.data)
+}
+
+fun io.nats.client.Message.asJsonType(): JsonNode {
+ return this.data.asJsonType()
+}
+
+fun io.nats.streaming.Message.asJsonType(): JsonNode {
+ return this.data.asJsonType()
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
new file mode 100644
index 000000000..709ee7d6e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+@ComponentScan
+open class BluePrintNatsLibConfiguration
+
+/**
+ * Exposed Dependency Service by this NATS Lib Module
+ */
+fun BluePrintDependencyService.natsLibPropertyService(): BluePrintNatsLibPropertyService =
+ instance(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY)
+
+class NatsLibConstants {
+ companion object {
+ const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service"
+ const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats."
+ const val TYPE_TOKEN_AUTH = "token-auth"
+ const val TYPE_TLS_AUTH = "tls-auth"
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
new file mode 100644
index 000000000..3329ec200
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats
+
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsConnectionProperties {
+ lateinit var type: String
+ var clusterId: String = ClusterUtils.clusterId()
+ var clientId: String = ClusterUtils.clusterNodeId()
+ lateinit var host: String
+}
+
+open class TokenAuthNatsConnectionProperties : NatsConnectionProperties() {
+ lateinit var token: String
+}
+
+open class TLSAuthNatsConnectionProperties : NatsConnectionProperties() {
+ var trustCertCollection: String? = null
+ /** Below Used only for Mutual TLS */
+ var clientCertChain: String? = null
+ var clientPrivateKey: String? = null
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
new file mode 100644
index 000000000..faf171528
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TLSAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.springframework.stereotype.Service
+
+@Service(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY)
+open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
+
+ fun bluePrintNatsService(jsonNode: JsonNode): BluePrintNatsService {
+ val natsConnectionProperties = natsConnectionProperties(jsonNode)
+ return bluePrintNatsService(natsConnectionProperties)
+ }
+
+ fun bluePrintNatsService(selector: String): BluePrintNatsService {
+ val prefix = "${NatsLibConstants.PROPERTY_NATS_PREFIX}$selector"
+ val natsConnectionProperties = natsConnectionProperties(prefix)
+ return bluePrintNatsService(natsConnectionProperties)
+ }
+
+ /** NATS Lib Property Service */
+ fun natsConnectionProperties(jsonNode: JsonNode): NatsConnectionProperties {
+ return when (val type = jsonNode.get("type").textValue()) {
+ NatsLibConstants.TYPE_TOKEN_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TokenAuthNatsConnectionProperties::class.java)!!
+ }
+ NatsLibConstants.TYPE_TLS_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TLSAuthNatsConnectionProperties::class.java)!!
+ }
+ else -> {
+ throw BluePrintProcessorException("Nats type($type) not supported")
+ }
+ }
+ }
+
+ fun natsConnectionProperties(prefix: String): NatsConnectionProperties {
+ val type = bluePrintPropertiesService.propertyBeanType(
+ "$prefix.type", String::class.java
+ )
+ return when (type) {
+ NatsLibConstants.TYPE_TOKEN_AUTH -> {
+ tokenAuthNatsConnectionProperties(prefix)
+ }
+ NatsLibConstants.TYPE_TLS_AUTH -> {
+ tlsAuthNatsConnectionProperties(prefix)
+ }
+ else -> {
+ throw BluePrintProcessorException("Grpc type($type) not supported")
+ }
+ }
+ }
+
+ private fun tokenAuthNatsConnectionProperties(prefix: String): TokenAuthNatsConnectionProperties {
+ return bluePrintPropertiesService.propertyBeanType(prefix, TokenAuthNatsConnectionProperties::class.java)
+ }
+
+ private fun tlsAuthNatsConnectionProperties(prefix: String): TLSAuthNatsConnectionProperties {
+ return bluePrintPropertiesService.propertyBeanType(prefix, TLSAuthNatsConnectionProperties::class.java)
+ }
+
+ fun bluePrintNatsService(natsConnectionProperties: NatsConnectionProperties):
+ BluePrintNatsService {
+ return when (natsConnectionProperties) {
+ is TokenAuthNatsConnectionProperties -> {
+ TokenAuthNatsService(natsConnectionProperties)
+ }
+ is TLSAuthNatsConnectionProperties -> {
+ TLSAuthNatsService(natsConnectionProperties)
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't get nats service for properties $natsConnectionProperties")
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt
new file mode 100644
index 000000000..9548fe78d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@file:Suppress("BlockingMethodInNonBlockingContext")
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.nats.client.Dispatcher
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.StreamingConnection
+import io.nats.streaming.Subscription
+import io.nats.streaming.SubscriptionOptions
+import java.time.Duration
+
+interface BluePrintNatsService {
+
+ /** Create and Return the NATS streaming connection */
+ suspend fun connection(): StreamingConnection
+
+ /** Send one request [message] to the [subject] and get only one reply
+ * The request message subscriber may be multi instances consumer or load balance consumer.
+ * If it is multi instances consumer, then we will get only first responses from subscribers.
+ *
+ */
+ suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message {
+ return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout))
+ }
+
+ /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler]
+ * The request message subscriber may be multi instances consumer or load balance consumer.
+ * If it is multi instances consumer, then we will get multiple responses from subscribers.
+ * Include the unSubscribe logic's in [messageHandler] implementation.
+ */
+ suspend fun requestAndGetMultipleReplies(
+ subject: String,
+ replySubject: String,
+ message: ByteArray,
+ messageHandler: io.nats.client.MessageHandler
+ ) {
+ val natsConnection = connection().natsConnection
+ val dispatcher = natsConnection.createDispatcher(messageHandler)
+ /** Reply subject consumer */
+ dispatcher.subscribe(replySubject)
+
+ /** Publish the request message and expect the reply messages in reply subject consumer */
+ natsConnection.publish(subject, replySubject, message)
+ }
+
+ /** Synchronous reply Subscribe the [subject] with the [messageHandler].
+ * This is used only the message has to be consumed by all instances in the cluster and message handler must reply.
+ */
+ suspend fun replySubscribe(
+ subject: String,
+ messageHandler: io.nats.client.MessageHandler
+ ): Dispatcher {
+ val natsConnection = connection().natsConnection
+ val dispatcher = natsConnection.createDispatcher(messageHandler)
+ return dispatcher.subscribe(subject)
+ }
+
+ /**
+ * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup].
+ * This is used only the message has to be consumed by only one instance in the cluster.
+ * server will now load balance messages between the members of the queue group and message handler must reply.
+ */
+ suspend fun loadBalanceReplySubscribe(
+ subject: String,
+ loadBalanceGroup: String,
+ messageHandler: io.nats.client.MessageHandler
+ ): Dispatcher {
+ val natsConnection = connection().natsConnection
+ val dispatcher = natsConnection.createDispatcher(messageHandler)
+ return dispatcher.subscribe(subject, loadBalanceGroup)
+ }
+
+ /** Publish the [message] to all subscribers on the [subject] */
+ suspend fun publish(subject: String, message: ByteArray) {
+ connection().publish(subject, message)
+ }
+
+ /** Subscribe the [subject] with the [messageHandler].
+ * This is used only the message has to be consumed by all instances in the cluster.
+ */
+ suspend fun subscribe(
+ subject: String,
+ messageHandler: MessageHandler
+ ): Subscription {
+ return connection().subscribe(subject, messageHandler)
+ }
+
+ /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions].
+ * This is used only the message has to be consumed by all instances in the cluster.
+ */
+ suspend fun subscribe(
+ subject: String,
+ messageHandler: MessageHandler,
+ subscriptionOptions: SubscriptionOptions
+ ): Subscription {
+ return connection().subscribe(subject, messageHandler, subscriptionOptions)
+ }
+
+ /**
+ * https://docs.nats.io/developing-with-nats/receiving/queues
+ * subscribers will listen for [subject] with [loadBalanceGroup].
+ * This is used only the message has to be consumed by only one instance in the cluster.
+ * server will now load balance messages between the members of the queue group.
+ */
+ suspend fun loadBalanceSubscribe(
+ subject: String,
+ loadBalanceGroup: String,
+ messageHandler: MessageHandler
+ ): Subscription {
+ return connection().subscribe(subject, loadBalanceGroup, messageHandler)
+ }
+
+ /**
+ * https://docs.nats.io/developing-with-nats/receiving/queues
+ * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions].
+ * This is used only the message has to be consumed by only one instance in the cluster.
+ * server will now load balance messages between the members of the queue group.
+ */
+ suspend fun loadBalanceSubscribe(
+ subject: String,
+ loadBalanceGroup: String,
+ messageHandler: MessageHandler,
+ subscriptionOptions: SubscriptionOptions
+ ): Subscription {
+ return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
new file mode 100644
index 000000000..3781fae59
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.nats.client.Nats
+import io.nats.client.Options
+import io.nats.streaming.NatsStreaming
+import io.nats.streaming.StreamingConnection
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TLSAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+import javax.net.ssl.SSLContext
+
+open class TLSAuthNatsService(private val natsConnectionProperties: TLSAuthNatsConnectionProperties) :
+ BluePrintNatsService {
+
+ lateinit var streamingConnection: StreamingConnection
+
+ override suspend fun connection(): StreamingConnection {
+ if (!::streamingConnection.isInitialized) {
+ val serverList = natsConnectionProperties.host.splitCommaAsList()
+
+ val options = Options.Builder()
+ .servers(serverList.toTypedArray())
+ // .sslContext(sslContext())
+ .build()
+ val natsConnection = Nats.connect(options)
+ val streamingOptions = io.nats.streaming.Options.Builder().natsConn(natsConnection).build()
+ streamingConnection = NatsStreaming.connect(
+ natsConnectionProperties.clusterId,
+ natsConnectionProperties.clientId, streamingOptions
+ )
+ }
+ return streamingConnection
+ }
+
+ fun sslContext(): SSLContext {
+ TODO("Implement NATS SSL Context")
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
new file mode 100644
index 000000000..0da3022ff
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.nats.client.Nats
+import io.nats.client.Options
+import io.nats.streaming.NatsStreaming
+import io.nats.streaming.StreamingConnection
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+
+open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthNatsConnectionProperties) :
+ BluePrintNatsService {
+
+ lateinit var streamingConnection: StreamingConnection
+
+ override suspend fun connection(): StreamingConnection {
+ if (!::streamingConnection.isInitialized) {
+ val serverList = natsConnectionProperties.host.splitCommaAsList()
+
+ val options = Options.Builder()
+ .servers(serverList.toTypedArray())
+ .token(natsConnectionProperties.token.toCharArray())
+ .build()
+ val natsConnection = Nats.connect(options)
+ val streamingOptions = io.nats.streaming.Options.Builder().natsConn(natsConnection).build()
+ streamingConnection = NatsStreaming.connect(
+ natsConnectionProperties.clusterId,
+ natsConnectionProperties.clientId, streamingOptions
+ )
+ }
+ return streamingConnection
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt
new file mode 100644
index 000000000..48a759cc5
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.utils
+
+import io.nats.streaming.SubscriptionOptions
+
+object SubscriptionOptionsUtils {
+
+ /** Subscribe with a durable [name] and client can re subscribe with durable [name] */
+ fun durable(name: String): SubscriptionOptions {
+ return SubscriptionOptions.Builder().durableName(name).build()
+ }
+
+ /** Subscribe with manual ack mode and a max in-flight [limit] */
+ fun manualAckWithRateLimit(limit: Int): SubscriptionOptions {
+ return SubscriptionOptions.Builder().manualAcks().maxInFlight(limit).build()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt
new file mode 100644
index 000000000..ec120dc18
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.mockk.every
+import io.mockk.mockk
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
+import java.nio.charset.Charset
+import kotlin.test.assertEquals
+
+class BluePrintNatsExtensionsTest {
+
+ @Test
+ fun testMessageStrConversion() {
+ val mockMessage = mockk<io.nats.client.Message>()
+ every { mockMessage.data } returns "I am message".toByteArray(Charset.defaultCharset())
+
+ val messageData = mockMessage.strData()
+ assertEquals("I am message", messageData)
+ }
+
+ @Test
+ fun testMessageJsonConversion() {
+ val json = """{"name":"value"}"""
+
+ val mockMessage = mockk<io.nats.client.Message>()
+ every { mockMessage.data } returns json.jsonAsJsonType().asByteArray()
+
+ val messageData = mockMessage.asJsonType().asJsonString()
+ assertEquals(json, messageData)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
new file mode 100644
index 000000000..976f9f5e8
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
@@ -0,0 +1,228 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.nats.streaming.MessageHandler
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
+import kotlin.test.assertNotNull
+
+class BluePrintNatsServiceTest {
+
+ @Test
+ fun testTokenAuthNatService() {
+ val configuration = """{
+ "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}",
+ "host" : "nats://localhost:4222",
+ "token" : "tokenAuth"
+ }
+ """.trimIndent()
+
+ val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
+
+ val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
+ every {
+ spkBluePrintNatsLibPropertyService
+ .bluePrintNatsService(any<NatsConnectionProperties>())
+ } returns TokenAuthNatsService(
+ mockk()
+ )
+
+ val bluePrintNatsService =
+ spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
+ assertNotNull(bluePrintNatsService, "failed to get NATS Service")
+ }
+
+ @Test
+ fun testTLSAuthNatService() {
+ val configuration = """{
+ "type" : "${NatsLibConstants.TYPE_TLS_AUTH}",
+ "host" : "nats://localhost:4222"
+ }
+ """.trimIndent()
+
+ val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
+
+ val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
+ every {
+ spkBluePrintNatsLibPropertyService
+ .bluePrintNatsService(any<NatsConnectionProperties>())
+ } returns TLSAuthNatsService(
+ mockk()
+ )
+
+ val bluePrintNatsService =
+ spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
+ assertNotNull(bluePrintNatsService, "failed to get NATS Service")
+ }
+
+ /** Enable to test only on local desktop. Don't enable in Build server
+ * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ */
+ // @Test
+ fun localTntegrationTest() {
+ runBlocking {
+
+ val connectionProperties = TokenAuthNatsConnectionProperties().apply {
+ host = "nats://localhost:4222,nats://localhost:4223"
+ clientId = "client-1"
+ token = "tokenAuth"
+ }
+ val natsService = TokenAuthNatsService(connectionProperties)
+ val streamingConnection = natsService.connection()
+ assertNotNull(streamingConnection, "failed to create nats connection")
+
+ val connectionProperties2 = TokenAuthNatsConnectionProperties().apply {
+ host = "nats://localhost:4222,nats://localhost:4223"
+ clientId = "client-2"
+ token = "tokenAuth"
+ }
+ val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2)
+ val streamingConnection2 = tlsAuthNatsService2.connection()
+ assertNotNull(streamingConnection2, "failed to create nats connection 2")
+
+ testMultiPublish(natsService)
+ testLoadBalance(natsService)
+ testRequestReply(natsService)
+ testMultiRequestReply(natsService)
+ delay(1000)
+ }
+ }
+
+ private fun testMultiPublish(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Multiple Publish Message Test **/
+ val messageHandler1 =
+ MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") }
+ val messageHandler2 =
+ MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") }
+
+ natsService.subscribe("multi-publish", messageHandler1)
+ natsService.subscribe("multi-publish", messageHandler2)
+
+ repeat(5) {
+ natsService.publish("multi-publish", "multi publish message-$it".toByteArray())
+ }
+ }
+ }
+
+ private fun testLoadBalance(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Load balance Publish Message Test **/
+ val lbMessageHandler1 =
+ MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") }
+ val lbMessageHandler2 =
+ MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
+
+ natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+ natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+
+ repeat(5) {
+ natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
+ }
+ }
+ }
+
+ private fun testRequestReply(natsService: BluePrintNatsService) {
+ runBlocking {
+ val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${String(message.data)} reply from 1".toByteArray()
+ )
+ }
+
+ val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${String(message.data)} reply from 2".toByteArray()
+ )
+ }
+
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
+
+ repeat(5) {
+ val message = natsService.requestAndGetOneReply(
+ "rr-request",
+ "rr message-$it".toByteArray(),
+ 1000
+ )
+ println("Received : ${message.strData()}")
+ }
+ }
+ }
+
+ private fun testMultiRequestReply(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Request Reply **/
+ val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${message.strData()} reply from 1".toByteArray()
+ )
+ message.connection.publish(
+ message.replyTo,
+ "Completion ${message.strData()} reply from 1".toByteArray()
+ )
+ }
+ val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
+ println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})")
+ message.connection.publish(
+ message.replyTo,
+ "Notification ${message.strData()} reply from 2".toByteArray()
+ )
+ message.connection.publish(
+ message.replyTo,
+ "Completion ${message.strData()} reply from 2".toByteArray()
+ )
+ }
+
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
+ natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
+
+ /** Should unsubscribe on completion message */
+ val rrReplyMessageHandler = io.nats.client.MessageHandler { message ->
+ val messageContent = message.strData()
+ println("RR Reply Handler : $messageContent")
+ if (messageContent.startsWith("Completion")) {
+ message.subscription.unsubscribe()
+ }
+ }
+ repeat(5) {
+ natsService.requestAndGetMultipleReplies(
+ "rr-request",
+ "rr-reply-$it",
+ "rr message-$it".toByteArray(),
+ rrReplyMessageHandler
+ )
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..9bb940006
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml
@@ -0,0 +1,36 @@
+<!--
+ ~ Copyright © 2019 IBM.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="io.mockk" level="warn"/>
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate" level="info"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
diff --git a/ms/blueprintsprocessor/modules/commons/pom.xml b/ms/blueprintsprocessor/modules/commons/pom.xml
index 78c569125..18ef63469 100755
--- a/ms/blueprintsprocessor/modules/commons/pom.xml
+++ b/ms/blueprintsprocessor/modules/commons/pom.xml
@@ -41,6 +41,7 @@
<module>grpc-lib</module>
<module>message-lib</module>
<module>ssh-lib</module>
+ <module>nats-lib</module>
</modules>
<dependencies>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
index 4517a3f05..a17417b78 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
@@ -52,6 +52,10 @@
<artifactId>grpc-lib</artifactId>
</dependency>
<dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>nats-lib</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>resource-dict</artifactId>
</dependency>
diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml
index 091d9dc68..4f8ff35b4 100755
--- a/ms/blueprintsprocessor/parent/pom.xml
+++ b/ms/blueprintsprocessor/parent/pom.xml
@@ -34,6 +34,8 @@
<properties>
<sli.version>${ccsdk.sli.core.version}</sli.version>
<dmaap.client.version>1.1.5</dmaap.client.version>
+ <nats.version>2.6.6</nats.version>
+ <nats.streaming.version>2.2.3</nats.streaming.version>
<!-- Should be using released artifact as soon as available: -->
<!-- https://github.com/springfox/springfox/milestone/44 -->
@@ -297,6 +299,18 @@
<version>${netty-ssl}</version>
</dependency>
+ <!-- NATS -->
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>jnats</artifactId>
+ <version>${nats.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>java-nats-streaming</artifactId>
+ <version>${nats.streaming.version}</version>
+ </dependency>
+
<!-- Atomix -->
<dependency>
<groupId>io.atomix</groupId>
@@ -416,6 +430,11 @@
</dependency>
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>nats-lib</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>ssh-lib</artifactId>
<version>${project.version}</version>
</dependency>
@@ -537,24 +556,24 @@
<!-- Controller Blueprints Application Dependency -->
<dependency>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>resource-dict</artifactId>
- <version>${project.version}</version>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>resource-dict</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>blueprint-core</artifactId>
- <version>${project.version}</version>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>blueprint-core</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>blueprint-proto</artifactId>
- <version>${project.version}</version>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>blueprint-proto</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>blueprint-validation</artifactId>
- <version>${project.version}</version>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>blueprint-validation</artifactId>
+ <version>${project.version}</version>
</dependency>
<!-- Database -->