aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsService.kt
blob: 9548fe78d5c7c4ada79e6e34c6db9dc2bc8e4e62 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
 * Copyright © 2018-2019 AT&T Intellectual Property.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

@file:Suppress("BlockingMethodInNonBlockingContext")

package org.onap.ccsdk.cds.blueprintsprocessor.nats.service

import io.nats.client.Dispatcher
import io.nats.streaming.MessageHandler
import io.nats.streaming.StreamingConnection
import io.nats.streaming.Subscription
import io.nats.streaming.SubscriptionOptions
import java.time.Duration

interface BluePrintNatsService {

    /** Create and Return the NATS streaming connection */
    suspend fun connection(): StreamingConnection

    /** Send one request [message] to the [subject] and get only one reply
     * The request message subscriber may be multi instances consumer or load balance consumer.
     * If it is multi instances consumer, then we will get only first responses from subscribers.
     *
     */
    suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message {
        return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout))
    }

    /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler]
     * The request message subscriber may be multi instances consumer or load balance consumer.
     * If it is multi instances consumer, then we will get multiple responses from subscribers.
     * Include the unSubscribe logic's in [messageHandler] implementation.
     */
    suspend fun requestAndGetMultipleReplies(
        subject: String,
        replySubject: String,
        message: ByteArray,
        messageHandler: io.nats.client.MessageHandler
    ) {
        val natsConnection = connection().natsConnection
        val dispatcher = natsConnection.createDispatcher(messageHandler)
        /** Reply subject consumer */
        dispatcher.subscribe(replySubject)

        /** Publish the request message and expect the reply messages in reply subject consumer */
        natsConnection.publish(subject, replySubject, message)
    }

    /** Synchronous reply Subscribe the [subject] with the [messageHandler].
     * This is used only the message has to be consumed by all instances in the cluster and message handler must reply.
     */
    suspend fun replySubscribe(
        subject: String,
        messageHandler: io.nats.client.MessageHandler
    ): Dispatcher {
        val natsConnection = connection().natsConnection
        val dispatcher = natsConnection.createDispatcher(messageHandler)
        return dispatcher.subscribe(subject)
    }

    /**
     * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup].
     * This is used only the message has to be consumed by only one instance in the cluster.
     * server will now load balance messages between the members of the queue group and message handler must reply.
     */
    suspend fun loadBalanceReplySubscribe(
        subject: String,
        loadBalanceGroup: String,
        messageHandler: io.nats.client.MessageHandler
    ): Dispatcher {
        val natsConnection = connection().natsConnection
        val dispatcher = natsConnection.createDispatcher(messageHandler)
        return dispatcher.subscribe(subject, loadBalanceGroup)
    }

    /** Publish the [message] to all subscribers on the [subject] */
    suspend fun publish(subject: String, message: ByteArray) {
        connection().publish(subject, message)
    }

    /** Subscribe the [subject] with the [messageHandler].
     * This is used only the message has to be consumed by all instances in the cluster.
     */
    suspend fun subscribe(
        subject: String,
        messageHandler: MessageHandler
    ): Subscription {
        return connection().subscribe(subject, messageHandler)
    }

    /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions].
     * This is used only the message has to be consumed by all instances in the cluster.
     */
    suspend fun subscribe(
        subject: String,
        messageHandler: MessageHandler,
        subscriptionOptions: SubscriptionOptions
    ): Subscription {
        return connection().subscribe(subject, messageHandler, subscriptionOptions)
    }

    /**
     * https://docs.nats.io/developing-with-nats/receiving/queues
     * subscribers will listen for [subject] with [loadBalanceGroup].
     * This is used only the message has to be consumed by only one instance in the cluster.
     * server will now load balance messages between the members of the queue group.
     */
    suspend fun loadBalanceSubscribe(
        subject: String,
        loadBalanceGroup: String,
        messageHandler: MessageHandler
    ): Subscription {
        return connection().subscribe(subject, loadBalanceGroup, messageHandler)
    }

    /**
     * https://docs.nats.io/developing-with-nats/receiving/queues
     * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions].
     * This is used only the message has to be consumed by only one instance in the cluster.
     * server will now load balance messages between the members of the queue group.
     */
    suspend fun loadBalanceSubscribe(
        subject: String,
        loadBalanceGroup: String,
        messageHandler: MessageHandler,
        subscriptionOptions: SubscriptionOptions
    ): Subscription {
        return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions)
    }
}