aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
blob: 4f601fa8b303bb9c7a3b1068974ef8f8e2ab51aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/*-
 * ============LICENSE_START=======================================================
 * ONAP
 * ================================================================================
 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
 * Modifications Copyright (C) 2023 Nordix Foundation.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.common.endpoints.event.comm.client;

import jakarta.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
 * requests and the other to receive responses.
 */
@Getter
public class BidirectionalTopicClient {
    private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class);
    private static final Coder coder = new StandardCoder();

    private final String sinkTopic;
    private final String sourceTopic;
    private final TopicSink sink;
    private final TopicSource source;
    private final CommInfrastructure sinkTopicCommInfrastructure;
    private final CommInfrastructure sourceTopicCommInfrastructure;

    /**
     * Used when checking whether a message sent on the sink topic can be received
     * on the source topic. When a matching message is received on the incoming topic,
     * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting
     * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
     * is pulled from the queue, it is immediately placed back on the queue.
     */
    private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>();


    /**
     * Constructs the object.
     *
     * @param sinkTopic sink topic name
     * @param sourceTopic source topic name
     * @throws BidirectionalTopicClientException if either topic does not exist
     */
    public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
        this.sinkTopic = sinkTopic.toLowerCase();
        this.sourceTopic = sourceTopic.toLowerCase();

        // init sinkClient
        List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
        if (sinks.isEmpty()) {
            throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
        } else if (sinks.size() > 1) {
            throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
        }

        this.sink = sinks.get(0);

        // init source
        List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic));
        if (sources.isEmpty()) {
            throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
        } else if (sources.size() > 1) {
            throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
        }

        this.source = sources.get(0);

        this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
        this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
    }

    public boolean send(String message) {
        return sink.send(message);
    }

    public void register(TopicListener topicListener) {
        source.register(topicListener);
    }

    public boolean offer(String event) {
        return source.offer(event);
    }

    public void unregister(TopicListener topicListener) {
        source.unregister(topicListener);
    }

    /**
     * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has
     * previously returned {@code true}).
     *
     * @return {@code true}, if the topic is ready to send and receive
     */
    public boolean isReady() {
        return Boolean.TRUE.equals(checkerQueue.peek());
    }

    /**
     * Waits for the bidirectional topic to become "ready" by publishing a message on the
     * sink topic and awaiting receipt of the message on the source topic. If the message
     * is not received within a few seconds, then it tries again. This process is
     * continued until the message is received, {@link #stopWaiting()} is called, or this thread
     * is interrupted. Once this returns, subsequent calls will return immediately, always
     * with the same value.
     *
     * @param message message to be sent to the sink topic. Note: the equals() method must
     *        return {@code true} if and only if two messages are the same
     * @param waitMs time to wait, in milliseconds, before re-sending the message
     * @return {@code true} if the message was received from the source topic,
     *         {@code false} if this method was stopped or interrupted before receipt of
     *         the message
     * @throws CoderException if the message cannot be encoded
     */
    public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException {
        // see if we already know the answer
        if (!checkerQueue.isEmpty()) {
            return checkerQueue.peek();
        }

        final String messageText = coder.encode(message);

        // class of message to be decoded
        final TopicListener listener = getTopicListener(message);

        source.register(listener);

        // loop until the message is received
        try {
            Boolean result;
            do {
                send(messageText);
            } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null);

            // put it back on the queue
            checkerQueue.add(result);

        } catch (InterruptedException e) {
            logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e);
            Thread.currentThread().interrupt();
            checkerQueue.add(Boolean.FALSE);

        } finally {
            source.unregister(listener);
        }

        return checkerQueue.peek();
    }

    @NotNull
    private <T> TopicListener getTopicListener(T message) {
        @SuppressWarnings("unchecked")
        final Class<? extends T> clazz = (Class<? extends T>) message.getClass();

        // create a listener to detect when a matching message is received
        return (infra, topic, msg) -> {
            try {
                T incoming = decode(msg, clazz);

                if (message.equals(incoming)) {
                    logger.info("topic {} is ready; found matching message {}", topic, incoming);
                    checkerQueue.add(Boolean.TRUE);
                }

            } catch (CoderException e) {
                logger.warn("cannot decode message from topic {}", topic, e);
                decodeFailed();
            }
        };
    }

    /**
     * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
     * adding {@code false} to the queue.
     */
    public void stopWaiting() {
        checkerQueue.add(Boolean.FALSE);
    }

    // these may be overridden by junit tests

    protected TopicEndpoint getTopicEndpointManager() {
        return TopicEndpointManager.getManager();
    }

    protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
        return coder.decode(msg, clazz);
    }

    protected void decodeFailed() {
        // already logged - nothing else to do
    }
}