diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java | 58 |
1 files changed, 33 insertions, 25 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index 2a9f1446..b5d53909 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +22,13 @@ package org.onap.policy.common.endpoints.event.comm.client; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import lombok.Getter; +import org.jetbrains.annotations.NotNull; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; @@ -55,9 +58,9 @@ public class BidirectionalTopicClient { private final CommInfrastructure sourceTopicCommInfrastructure; /** - * Used when checking whether or not a message sent on the sink topic can be received + * Used when checking whether a message sent on the sink topic can be received * on the source topic. When a matching message is received on the incoming topic, - * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting + * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting * thread is interrupted, then {@code false} is placed on the queue. Whenever a value * is pulled from the queue, it is immediately placed back on the queue. */ @@ -72,8 +75,8 @@ public class BidirectionalTopicClient { * @throws BidirectionalTopicClientException if either topic does not exist */ public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { - this.sinkTopic = sinkTopic; - this.sourceTopic = sourceTopic; + this.sinkTopic = sinkTopic.toLowerCase(); + this.sourceTopic = sourceTopic.toLowerCase(); // init sinkClient List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); @@ -86,7 +89,7 @@ public class BidirectionalTopicClient { this.sink = sinks.get(0); // init source - List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); + List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic)); if (sources.isEmpty()) { throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic); } else if (sources.size() > 1) { @@ -116,7 +119,7 @@ public class BidirectionalTopicClient { } /** - * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has + * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has * previously returned {@code true}). * * @return {@code true}, if the topic is ready to send and receive @@ -129,7 +132,7 @@ public class BidirectionalTopicClient { * Waits for the bidirectional topic to become "ready" by publishing a message on the * sink topic and awaiting receipt of the message on the source topic. If the message * is not received within a few seconds, then it tries again. This process is - * continued until the message is received, {@link #stop()} is called, or this thread + * continued until the message is received, {@link #stopWaiting()} is called, or this thread * is interrupted. Once this returns, subsequent calls will return immediately, always * with the same value. * @@ -150,24 +153,7 @@ public class BidirectionalTopicClient { final String messageText = coder.encode(message); // class of message to be decoded - @SuppressWarnings("unchecked") - final Class<? extends T> clazz = (Class<? extends T>) message.getClass(); - - // create a listener to detect when a matching message is received - final TopicListener listener = (infra, topic, msg) -> { - try { - T incoming = decode(msg, clazz); - - if (message.equals(incoming)) { - logger.info("topic {} is ready; found matching message {}", topic, incoming); - checkerQueue.add(Boolean.TRUE); - } - - } catch (CoderException e) { - logger.warn("cannot decode message from topic {}", topic, e); - decodeFailed(); - } - }; + final TopicListener listener = getTopicListener(message); source.register(listener); @@ -193,6 +179,28 @@ public class BidirectionalTopicClient { return checkerQueue.peek(); } + @NotNull + private <T> TopicListener getTopicListener(T message) { + @SuppressWarnings("unchecked") + final Class<? extends T> clazz = (Class<? extends T>) message.getClass(); + + // create a listener to detect when a matching message is received + return (infra, topic, msg) -> { + try { + T incoming = decode(msg, clazz); + + if (message.equals(incoming)) { + logger.info("topic {} is ready; found matching message {}", topic, incoming); + checkerQueue.add(Boolean.TRUE); + } + + } catch (CoderException e) { + logger.warn("cannot decode message from topic {}", topic, e); + decodeFailed(); + } + }; + } + /** * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by * adding {@code false} to the queue. |