From 10c2988b51c764e62d8eeed52b254d363512eb24 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Wed, 18 Dec 2019 15:19:58 -0500 Subject: Cluster communication channels Add NATS property and library services both . NATS Messaging Services with Token Auth and TLS Auth implementation Docker Compose for NATS Streaming instance. Documentation : https://docs.nats.io/ Issue-ID: CCSDK-2007 Signed-off-by: Brinda Santh Change-Id: Ieebaa8f2b18ae89d02a4f38a8027eda495a9db43 --- .../nats/BluePrintNatsExtensions.kt | 36 ++++ .../nats/BluePrintNatsLibConfiguration.kt | 41 ++++ .../nats/BluePrintNatsLibData.kt | 37 ++++ .../service/BluePrintNatsLibPropertyService.kt | 97 +++++++++ .../nats/service/BluePrintNatsService.kt | 143 +++++++++++++ .../nats/service/TLSAuthNatsService.kt | 53 +++++ .../nats/service/TokenAuthNatsService.kt | 48 +++++ .../nats/utils/SubscriptionOptionsUtils.kt | 32 +++ .../nats/service/BluePrintNatsExtensionsTest.kt | 51 +++++ .../nats/service/BluePrintNatsServiceTest.kt | 228 +++++++++++++++++++++ .../nats-lib/src/test/resources/logback-test.xml | 36 ++++ 11 files changed, 802 insertions(+) create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib/src') diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt new file mode 100644 index 000000000..a585c972b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsExtensions.kt @@ -0,0 +1,36 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats + +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType + +fun io.nats.client.Message.strData(): String { + return String(this.data) +} + +fun io.nats.streaming.Message.strData(): String { + return String(this.data) +} + +fun io.nats.client.Message.asJsonType(): JsonNode { + return this.data.asJsonType() +} + +fun io.nats.streaming.Message.asJsonType(): JsonNode { + return this.data.asJsonType() +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt new file mode 100644 index 000000000..709ee7d6e --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt @@ -0,0 +1,41 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +open class BluePrintNatsLibConfiguration + +/** + * Exposed Dependency Service by this NATS Lib Module + */ +fun BluePrintDependencyService.natsLibPropertyService(): BluePrintNatsLibPropertyService = + instance(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY) + +class NatsLibConstants { + companion object { + const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service" + const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats." + const val TYPE_TOKEN_AUTH = "token-auth" + const val TYPE_TLS_AUTH = "tls-auth" + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt new file mode 100644 index 000000000..3329ec200 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt @@ -0,0 +1,37 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats + +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsConnectionProperties { + lateinit var type: String + var clusterId: String = ClusterUtils.clusterId() + var clientId: String = ClusterUtils.clusterNodeId() + lateinit var host: String +} + +open class TokenAuthNatsConnectionProperties : NatsConnectionProperties() { + lateinit var token: String +} + +open class TLSAuthNatsConnectionProperties : NatsConnectionProperties() { + var trustCertCollection: String? = null + /** Below Used only for Mutual TLS */ + var clientCertChain: String? = null + var clientPrivateKey: String? = null +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt new file mode 100644 index 000000000..faf171528 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt @@ -0,0 +1,97 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TLSAuthNatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.springframework.stereotype.Service + +@Service(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY) +open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) { + + fun bluePrintNatsService(jsonNode: JsonNode): BluePrintNatsService { + val natsConnectionProperties = natsConnectionProperties(jsonNode) + return bluePrintNatsService(natsConnectionProperties) + } + + fun bluePrintNatsService(selector: String): BluePrintNatsService { + val prefix = "${NatsLibConstants.PROPERTY_NATS_PREFIX}$selector" + val natsConnectionProperties = natsConnectionProperties(prefix) + return bluePrintNatsService(natsConnectionProperties) + } + + /** NATS Lib Property Service */ + fun natsConnectionProperties(jsonNode: JsonNode): NatsConnectionProperties { + return when (val type = jsonNode.get("type").textValue()) { + NatsLibConstants.TYPE_TOKEN_AUTH -> { + JacksonUtils.readValue(jsonNode, TokenAuthNatsConnectionProperties::class.java)!! + } + NatsLibConstants.TYPE_TLS_AUTH -> { + JacksonUtils.readValue(jsonNode, TLSAuthNatsConnectionProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Nats type($type) not supported") + } + } + } + + fun natsConnectionProperties(prefix: String): NatsConnectionProperties { + val type = bluePrintPropertiesService.propertyBeanType( + "$prefix.type", String::class.java + ) + return when (type) { + NatsLibConstants.TYPE_TOKEN_AUTH -> { + tokenAuthNatsConnectionProperties(prefix) + } + NatsLibConstants.TYPE_TLS_AUTH -> { + tlsAuthNatsConnectionProperties(prefix) + } + else -> { + throw BluePrintProcessorException("Grpc type($type) not supported") + } + } + } + + private fun tokenAuthNatsConnectionProperties(prefix: String): TokenAuthNatsConnectionProperties { + return bluePrintPropertiesService.propertyBeanType(prefix, TokenAuthNatsConnectionProperties::class.java) + } + + private fun tlsAuthNatsConnectionProperties(prefix: String): TLSAuthNatsConnectionProperties { + return bluePrintPropertiesService.propertyBeanType(prefix, TLSAuthNatsConnectionProperties::class.java) + } + + fun bluePrintNatsService(natsConnectionProperties: NatsConnectionProperties): + BluePrintNatsService { + return when (natsConnectionProperties) { + is TokenAuthNatsConnectionProperties -> { + TokenAuthNatsService(natsConnectionProperties) + } + is TLSAuthNatsConnectionProperties -> { + TLSAuthNatsService(natsConnectionProperties) + } + else -> { + throw BluePrintProcessorException("couldn't get nats service for properties $natsConnectionProperties") + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt new file mode 100644 index 000000000..9548fe78d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt @@ -0,0 +1,143 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:Suppress("BlockingMethodInNonBlockingContext") + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.nats.client.Dispatcher +import io.nats.streaming.MessageHandler +import io.nats.streaming.StreamingConnection +import io.nats.streaming.Subscription +import io.nats.streaming.SubscriptionOptions +import java.time.Duration + +interface BluePrintNatsService { + + /** Create and Return the NATS streaming connection */ + suspend fun connection(): StreamingConnection + + /** Send one request [message] to the [subject] and get only one reply + * The request message subscriber may be multi instances consumer or load balance consumer. + * If it is multi instances consumer, then we will get only first responses from subscribers. + * + */ + suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message { + return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout)) + } + + /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler] + * The request message subscriber may be multi instances consumer or load balance consumer. + * If it is multi instances consumer, then we will get multiple responses from subscribers. + * Include the unSubscribe logic's in [messageHandler] implementation. + */ + suspend fun requestAndGetMultipleReplies( + subject: String, + replySubject: String, + message: ByteArray, + messageHandler: io.nats.client.MessageHandler + ) { + val natsConnection = connection().natsConnection + val dispatcher = natsConnection.createDispatcher(messageHandler) + /** Reply subject consumer */ + dispatcher.subscribe(replySubject) + + /** Publish the request message and expect the reply messages in reply subject consumer */ + natsConnection.publish(subject, replySubject, message) + } + + /** Synchronous reply Subscribe the [subject] with the [messageHandler]. + * This is used only the message has to be consumed by all instances in the cluster and message handler must reply. + */ + suspend fun replySubscribe( + subject: String, + messageHandler: io.nats.client.MessageHandler + ): Dispatcher { + val natsConnection = connection().natsConnection + val dispatcher = natsConnection.createDispatcher(messageHandler) + return dispatcher.subscribe(subject) + } + + /** + * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup]. + * This is used only the message has to be consumed by only one instance in the cluster. + * server will now load balance messages between the members of the queue group and message handler must reply. + */ + suspend fun loadBalanceReplySubscribe( + subject: String, + loadBalanceGroup: String, + messageHandler: io.nats.client.MessageHandler + ): Dispatcher { + val natsConnection = connection().natsConnection + val dispatcher = natsConnection.createDispatcher(messageHandler) + return dispatcher.subscribe(subject, loadBalanceGroup) + } + + /** Publish the [message] to all subscribers on the [subject] */ + suspend fun publish(subject: String, message: ByteArray) { + connection().publish(subject, message) + } + + /** Subscribe the [subject] with the [messageHandler]. + * This is used only the message has to be consumed by all instances in the cluster. + */ + suspend fun subscribe( + subject: String, + messageHandler: MessageHandler + ): Subscription { + return connection().subscribe(subject, messageHandler) + } + + /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions]. + * This is used only the message has to be consumed by all instances in the cluster. + */ + suspend fun subscribe( + subject: String, + messageHandler: MessageHandler, + subscriptionOptions: SubscriptionOptions + ): Subscription { + return connection().subscribe(subject, messageHandler, subscriptionOptions) + } + + /** + * https://docs.nats.io/developing-with-nats/receiving/queues + * subscribers will listen for [subject] with [loadBalanceGroup]. + * This is used only the message has to be consumed by only one instance in the cluster. + * server will now load balance messages between the members of the queue group. + */ + suspend fun loadBalanceSubscribe( + subject: String, + loadBalanceGroup: String, + messageHandler: MessageHandler + ): Subscription { + return connection().subscribe(subject, loadBalanceGroup, messageHandler) + } + + /** + * https://docs.nats.io/developing-with-nats/receiving/queues + * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions]. + * This is used only the message has to be consumed by only one instance in the cluster. + * server will now load balance messages between the members of the queue group. + */ + suspend fun loadBalanceSubscribe( + subject: String, + loadBalanceGroup: String, + messageHandler: MessageHandler, + subscriptionOptions: SubscriptionOptions + ): Subscription { + return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt new file mode 100644 index 000000000..3781fae59 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt @@ -0,0 +1,53 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.nats.client.Nats +import io.nats.client.Options +import io.nats.streaming.NatsStreaming +import io.nats.streaming.StreamingConnection +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TLSAuthNatsConnectionProperties +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList +import javax.net.ssl.SSLContext + +open class TLSAuthNatsService(private val natsConnectionProperties: TLSAuthNatsConnectionProperties) : + BluePrintNatsService { + + lateinit var streamingConnection: StreamingConnection + + override suspend fun connection(): StreamingConnection { + if (!::streamingConnection.isInitialized) { + val serverList = natsConnectionProperties.host.splitCommaAsList() + + val options = Options.Builder() + .servers(serverList.toTypedArray()) + // .sslContext(sslContext()) + .build() + val natsConnection = Nats.connect(options) + val streamingOptions = io.nats.streaming.Options.Builder().natsConn(natsConnection).build() + streamingConnection = NatsStreaming.connect( + natsConnectionProperties.clusterId, + natsConnectionProperties.clientId, streamingOptions + ) + } + return streamingConnection + } + + fun sslContext(): SSLContext { + TODO("Implement NATS SSL Context") + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt new file mode 100644 index 000000000..0da3022ff --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt @@ -0,0 +1,48 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.nats.client.Nats +import io.nats.client.Options +import io.nats.streaming.NatsStreaming +import io.nats.streaming.StreamingConnection +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList + +open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthNatsConnectionProperties) : + BluePrintNatsService { + + lateinit var streamingConnection: StreamingConnection + + override suspend fun connection(): StreamingConnection { + if (!::streamingConnection.isInitialized) { + val serverList = natsConnectionProperties.host.splitCommaAsList() + + val options = Options.Builder() + .servers(serverList.toTypedArray()) + .token(natsConnectionProperties.token.toCharArray()) + .build() + val natsConnection = Nats.connect(options) + val streamingOptions = io.nats.streaming.Options.Builder().natsConn(natsConnection).build() + streamingConnection = NatsStreaming.connect( + natsConnectionProperties.clusterId, + natsConnectionProperties.clientId, streamingOptions + ) + } + return streamingConnection + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt new file mode 100644 index 000000000..48a759cc5 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/SubscriptionOptionsUtils.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.utils + +import io.nats.streaming.SubscriptionOptions + +object SubscriptionOptionsUtils { + + /** Subscribe with a durable [name] and client can re subscribe with durable [name] */ + fun durable(name: String): SubscriptionOptions { + return SubscriptionOptions.Builder().durableName(name).build() + } + + /** Subscribe with manual ack mode and a max in-flight [limit] */ + fun manualAckWithRateLimit(limit: Int): SubscriptionOptions { + return SubscriptionOptions.Builder().manualAcks().maxInFlight(limit).build() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt new file mode 100644 index 000000000..ec120dc18 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsExtensionsTest.kt @@ -0,0 +1,51 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.mockk.every +import io.mockk.mockk +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType +import java.nio.charset.Charset +import kotlin.test.assertEquals + +class BluePrintNatsExtensionsTest { + + @Test + fun testMessageStrConversion() { + val mockMessage = mockk() + every { mockMessage.data } returns "I am message".toByteArray(Charset.defaultCharset()) + + val messageData = mockMessage.strData() + assertEquals("I am message", messageData) + } + + @Test + fun testMessageJsonConversion() { + val json = """{"name":"value"}""" + + val mockMessage = mockk() + every { mockMessage.data } returns json.jsonAsJsonType().asByteArray() + + val messageData = mockMessage.asJsonType().asJsonString() + assertEquals(json, messageData) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt new file mode 100644 index 000000000..976f9f5e8 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -0,0 +1,228 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.nats.streaming.MessageHandler +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties +import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType +import kotlin.test.assertNotNull + +class BluePrintNatsServiceTest { + + @Test + fun testTokenAuthNatService() { + val configuration = """{ + "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}", + "host" : "nats://localhost:4222", + "token" : "tokenAuth" + } + """.trimIndent() + + val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk()) + + val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService) + every { + spkBluePrintNatsLibPropertyService + .bluePrintNatsService(any()) + } returns TokenAuthNatsService( + mockk() + ) + + val bluePrintNatsService = + spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType()) + assertNotNull(bluePrintNatsService, "failed to get NATS Service") + } + + @Test + fun testTLSAuthNatService() { + val configuration = """{ + "type" : "${NatsLibConstants.TYPE_TLS_AUTH}", + "host" : "nats://localhost:4222" + } + """.trimIndent() + + val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk()) + + val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService) + every { + spkBluePrintNatsLibPropertyService + .bluePrintNatsService(any()) + } returns TLSAuthNatsService( + mockk() + ) + + val bluePrintNatsService = + spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType()) + assertNotNull(bluePrintNatsService, "failed to get NATS Service") + } + + /** Enable to test only on local desktop. Don't enable in Build server + * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + */ + // @Test + fun localTntegrationTest() { + runBlocking { + + val connectionProperties = TokenAuthNatsConnectionProperties().apply { + host = "nats://localhost:4222,nats://localhost:4223" + clientId = "client-1" + token = "tokenAuth" + } + val natsService = TokenAuthNatsService(connectionProperties) + val streamingConnection = natsService.connection() + assertNotNull(streamingConnection, "failed to create nats connection") + + val connectionProperties2 = TokenAuthNatsConnectionProperties().apply { + host = "nats://localhost:4222,nats://localhost:4223" + clientId = "client-2" + token = "tokenAuth" + } + val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2) + val streamingConnection2 = tlsAuthNatsService2.connection() + assertNotNull(streamingConnection2, "failed to create nats connection 2") + + testMultiPublish(natsService) + testLoadBalance(natsService) + testRequestReply(natsService) + testMultiRequestReply(natsService) + delay(1000) + } + } + + private fun testMultiPublish(natsService: BluePrintNatsService) { + runBlocking { + /** Multiple Publish Message Test **/ + val messageHandler1 = + MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") } + val messageHandler2 = + MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") } + + natsService.subscribe("multi-publish", messageHandler1) + natsService.subscribe("multi-publish", messageHandler2) + + repeat(5) { + natsService.publish("multi-publish", "multi publish message-$it".toByteArray()) + } + } + } + + private fun testLoadBalance(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") } + val lbMessageHandler2 = + MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") } + + natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) + natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) + + repeat(5) { + natsService.publish("lb-publish", "lb publish message-$it".toByteArray()) + } + } + } + + private fun testRequestReply(natsService: BluePrintNatsService) { + runBlocking { + val lbMessageHandler1 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${String(message.data)} reply from 1".toByteArray() + ) + } + + val lbMessageHandler2 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${String(message.data)} reply from 2".toByteArray() + ) + } + + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1) + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2) + + repeat(5) { + val message = natsService.requestAndGetOneReply( + "rr-request", + "rr message-$it".toByteArray(), + 1000 + ) + println("Received : ${message.strData()}") + } + } + } + + private fun testMultiRequestReply(natsService: BluePrintNatsService) { + runBlocking { + /** Request Reply **/ + val lbMessageHandler1 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${message.strData()} reply from 1".toByteArray() + ) + message.connection.publish( + message.replyTo, + "Completion ${message.strData()} reply from 1".toByteArray() + ) + } + val lbMessageHandler2 = io.nats.client.MessageHandler { message -> + println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})") + message.connection.publish( + message.replyTo, + "Notification ${message.strData()} reply from 2".toByteArray() + ) + message.connection.publish( + message.replyTo, + "Completion ${message.strData()} reply from 2".toByteArray() + ) + } + + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1) + natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2) + + /** Should unsubscribe on completion message */ + val rrReplyMessageHandler = io.nats.client.MessageHandler { message -> + val messageContent = message.strData() + println("RR Reply Handler : $messageContent") + if (messageContent.startsWith("Completion")) { + message.subscription.unsubscribe() + } + } + repeat(5) { + natsService.requestAndGetMultipleReplies( + "rr-request", + "rr-reply-$it", + "rr message-$it".toByteArray(), + rrReplyMessageHandler + ) + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml new file mode 100644 index 000000000..9bb940006 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/resources/logback-test.xml @@ -0,0 +1,36 @@ + + + + + + + %d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n + + + + + + + + + + + + + + -- cgit 1.2.3-korg