diff options
author | Jozsef Csongvai <jozsef.csongvai@bell.ca> | 2021-07-26 12:00:59 -0400 |
---|---|---|
committer | Jozsef Csongvai <jozsef.csongvai@bell.ca> | 2021-07-28 09:06:26 -0400 |
commit | 45263f50896a7021cd17d78ce83b29365cb19c29 (patch) | |
tree | 0245881b0c3badd2e72144dc29311f4e7df58e38 /ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt | |
parent | 76cb04c2302c9f8f0395f86d8e8d246fdae0fd28 (diff) |
Revert "Renaming Files having BluePrint to have Blueprint"1.1.5
The renaming in CCSDK-3098 caused breaking changes to the grpc api and
compile issues for kotlin scripts.
Issue-ID: CCSDK-3385
Change-Id: I0d745cb858371678eabcb2284671c1fd76a1ab6d
Signed-off-by: Jozsef Csongvai <jozsef.csongvai@bell.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt')
-rw-r--r-- | ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt new file mode 100644 index 000000000..9548fe78d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt @@ -0,0 +1,143 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:Suppress("BlockingMethodInNonBlockingContext") + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import io.nats.client.Dispatcher +import io.nats.streaming.MessageHandler +import io.nats.streaming.StreamingConnection +import io.nats.streaming.Subscription +import io.nats.streaming.SubscriptionOptions +import java.time.Duration + +interface BluePrintNatsService { + + /** Create and Return the NATS streaming connection */ + suspend fun connection(): StreamingConnection + + /** Send one request [message] to the [subject] and get only one reply + * The request message subscriber may be multi instances consumer or load balance consumer. + * If it is multi instances consumer, then we will get only first responses from subscribers. + * + */ + suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message { + return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout)) + } + + /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler] + * The request message subscriber may be multi instances consumer or load balance consumer. + * If it is multi instances consumer, then we will get multiple responses from subscribers. + * Include the unSubscribe logic's in [messageHandler] implementation. + */ + suspend fun requestAndGetMultipleReplies( + subject: String, + replySubject: String, + message: ByteArray, + messageHandler: io.nats.client.MessageHandler + ) { + val natsConnection = connection().natsConnection + val dispatcher = natsConnection.createDispatcher(messageHandler) + /** Reply subject consumer */ + dispatcher.subscribe(replySubject) + + /** Publish the request message and expect the reply messages in reply subject consumer */ + natsConnection.publish(subject, replySubject, message) + } + + /** Synchronous reply Subscribe the [subject] with the [messageHandler]. + * This is used only the message has to be consumed by all instances in the cluster and message handler must reply. + */ + suspend fun replySubscribe( + subject: String, + messageHandler: io.nats.client.MessageHandler + ): Dispatcher { + val natsConnection = connection().natsConnection + val dispatcher = natsConnection.createDispatcher(messageHandler) + return dispatcher.subscribe(subject) + } + + /** + * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup]. + * This is used only the message has to be consumed by only one instance in the cluster. + * server will now load balance messages between the members of the queue group and message handler must reply. + */ + suspend fun loadBalanceReplySubscribe( + subject: String, + loadBalanceGroup: String, + messageHandler: io.nats.client.MessageHandler + ): Dispatcher { + val natsConnection = connection().natsConnection + val dispatcher = natsConnection.createDispatcher(messageHandler) + return dispatcher.subscribe(subject, loadBalanceGroup) + } + + /** Publish the [message] to all subscribers on the [subject] */ + suspend fun publish(subject: String, message: ByteArray) { + connection().publish(subject, message) + } + + /** Subscribe the [subject] with the [messageHandler]. + * This is used only the message has to be consumed by all instances in the cluster. + */ + suspend fun subscribe( + subject: String, + messageHandler: MessageHandler + ): Subscription { + return connection().subscribe(subject, messageHandler) + } + + /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions]. + * This is used only the message has to be consumed by all instances in the cluster. + */ + suspend fun subscribe( + subject: String, + messageHandler: MessageHandler, + subscriptionOptions: SubscriptionOptions + ): Subscription { + return connection().subscribe(subject, messageHandler, subscriptionOptions) + } + + /** + * https://docs.nats.io/developing-with-nats/receiving/queues + * subscribers will listen for [subject] with [loadBalanceGroup]. + * This is used only the message has to be consumed by only one instance in the cluster. + * server will now load balance messages between the members of the queue group. + */ + suspend fun loadBalanceSubscribe( + subject: String, + loadBalanceGroup: String, + messageHandler: MessageHandler + ): Subscription { + return connection().subscribe(subject, loadBalanceGroup, messageHandler) + } + + /** + * https://docs.nats.io/developing-with-nats/receiving/queues + * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions]. + * This is used only the message has to be consumed by only one instance in the cluster. + * server will now load balance messages between the members of the queue group. + */ + suspend fun loadBalanceSubscribe( + subject: String, + loadBalanceGroup: String, + messageHandler: MessageHandler, + subscriptionOptions: SubscriptionOptions + ): Subscription { + return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions) + } +} |