diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java new file mode 100644 index 00000000..4f601fa8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -0,0 +1,224 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + +import jakarta.validation.constraints.NotNull; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A "bidirectional" topic, which is a pair of topics, one of which is used to publish + * requests and the other to receive responses. + */ +@Getter +public class BidirectionalTopicClient { + private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class); + private static final Coder coder = new StandardCoder(); + + private final String sinkTopic; + private final String sourceTopic; + private final TopicSink sink; + private final TopicSource source; + private final CommInfrastructure sinkTopicCommInfrastructure; + private final CommInfrastructure sourceTopicCommInfrastructure; + + /** + * Used when checking whether a message sent on the sink topic can be received + * on the source topic. When a matching message is received on the incoming topic, + * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting + * thread is interrupted, then {@code false} is placed on the queue. Whenever a value + * is pulled from the queue, it is immediately placed back on the queue. + */ + private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>(); + + + /** + * Constructs the object. + * + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @throws BidirectionalTopicClientException if either topic does not exist + */ + public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { + this.sinkTopic = sinkTopic.toLowerCase(); + this.sourceTopic = sourceTopic.toLowerCase(); + + // init sinkClient + List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); + if (sinks.isEmpty()) { + throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic); + } else if (sinks.size() > 1) { + throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic); + } + + this.sink = sinks.get(0); + + // init source + List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic)); + if (sources.isEmpty()) { + throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic); + } else if (sources.size() > 1) { + throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic); + } + + this.source = sources.get(0); + + this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure(); + this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure(); + } + + public boolean send(String message) { + return sink.send(message); + } + + public void register(TopicListener topicListener) { + source.register(topicListener); + } + + public boolean offer(String event) { + return source.offer(event); + } + + public void unregister(TopicListener topicListener) { + source.unregister(topicListener); + } + + /** + * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has + * previously returned {@code true}). + * + * @return {@code true}, if the topic is ready to send and receive + */ + public boolean isReady() { + return Boolean.TRUE.equals(checkerQueue.peek()); + } + + /** + * Waits for the bidirectional topic to become "ready" by publishing a message on the + * sink topic and awaiting receipt of the message on the source topic. If the message + * is not received within a few seconds, then it tries again. This process is + * continued until the message is received, {@link #stopWaiting()} is called, or this thread + * is interrupted. Once this returns, subsequent calls will return immediately, always + * with the same value. + * + * @param message message to be sent to the sink topic. Note: the equals() method must + * return {@code true} if and only if two messages are the same + * @param waitMs time to wait, in milliseconds, before re-sending the message + * @return {@code true} if the message was received from the source topic, + * {@code false} if this method was stopped or interrupted before receipt of + * the message + * @throws CoderException if the message cannot be encoded + */ + public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException { + // see if we already know the answer + if (!checkerQueue.isEmpty()) { + return checkerQueue.peek(); + } + + final String messageText = coder.encode(message); + + // class of message to be decoded + final TopicListener listener = getTopicListener(message); + + source.register(listener); + + // loop until the message is received + try { + Boolean result; + do { + send(messageText); + } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null); + + // put it back on the queue + checkerQueue.add(result); + + } catch (InterruptedException e) { + logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e); + Thread.currentThread().interrupt(); + checkerQueue.add(Boolean.FALSE); + + } finally { + source.unregister(listener); + } + + return checkerQueue.peek(); + } + + @NotNull + private <T> TopicListener getTopicListener(T message) { + @SuppressWarnings("unchecked") + final Class<? extends T> clazz = (Class<? extends T>) message.getClass(); + + // create a listener to detect when a matching message is received + return (infra, topic, msg) -> { + try { + T incoming = decode(msg, clazz); + + if (message.equals(incoming)) { + logger.info("topic {} is ready; found matching message {}", topic, incoming); + checkerQueue.add(Boolean.TRUE); + } + + } catch (CoderException e) { + logger.warn("cannot decode message from topic {}", topic, e); + decodeFailed(); + } + }; + } + + /** + * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by + * adding {@code false} to the queue. + */ + public void stopWaiting() { + checkerQueue.add(Boolean.FALSE); + } + + // these may be overridden by junit tests + + protected TopicEndpoint getTopicEndpointManager() { + return TopicEndpointManager.getManager(); + } + + protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException { + return coder.decode(msg, clazz); + } + + protected void decodeFailed() { + // already logged - nothing else to do + } +} |