aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java58
1 files changed, 33 insertions, 25 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index 2a9f1446..b5d53909 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,11 +22,13 @@
package org.onap.policy.common.endpoints.event.comm.client;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
@@ -55,9 +58,9 @@ public class BidirectionalTopicClient {
private final CommInfrastructure sourceTopicCommInfrastructure;
/**
- * Used when checking whether or not a message sent on the sink topic can be received
+ * Used when checking whether a message sent on the sink topic can be received
* on the source topic. When a matching message is received on the incoming topic,
- * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
+ * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting
* thread is interrupted, then {@code false} is placed on the queue. Whenever a value
* is pulled from the queue, it is immediately placed back on the queue.
*/
@@ -72,8 +75,8 @@ public class BidirectionalTopicClient {
* @throws BidirectionalTopicClientException if either topic does not exist
*/
public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
- this.sinkTopic = sinkTopic;
- this.sourceTopic = sourceTopic;
+ this.sinkTopic = sinkTopic.toLowerCase();
+ this.sourceTopic = sourceTopic.toLowerCase();
// init sinkClient
List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
@@ -86,7 +89,7 @@ public class BidirectionalTopicClient {
this.sink = sinks.get(0);
// init source
- List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
+ List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic));
if (sources.isEmpty()) {
throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
} else if (sources.size() > 1) {
@@ -116,7 +119,7 @@ public class BidirectionalTopicClient {
}
/**
- * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
+ * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has
* previously returned {@code true}).
*
* @return {@code true}, if the topic is ready to send and receive
@@ -129,7 +132,7 @@ public class BidirectionalTopicClient {
* Waits for the bidirectional topic to become "ready" by publishing a message on the
* sink topic and awaiting receipt of the message on the source topic. If the message
* is not received within a few seconds, then it tries again. This process is
- * continued until the message is received, {@link #stop()} is called, or this thread
+ * continued until the message is received, {@link #stopWaiting()} is called, or this thread
* is interrupted. Once this returns, subsequent calls will return immediately, always
* with the same value.
*
@@ -150,24 +153,7 @@ public class BidirectionalTopicClient {
final String messageText = coder.encode(message);
// class of message to be decoded
- @SuppressWarnings("unchecked")
- final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
-
- // create a listener to detect when a matching message is received
- final TopicListener listener = (infra, topic, msg) -> {
- try {
- T incoming = decode(msg, clazz);
-
- if (message.equals(incoming)) {
- logger.info("topic {} is ready; found matching message {}", topic, incoming);
- checkerQueue.add(Boolean.TRUE);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot decode message from topic {}", topic, e);
- decodeFailed();
- }
- };
+ final TopicListener listener = getTopicListener(message);
source.register(listener);
@@ -193,6 +179,28 @@ public class BidirectionalTopicClient {
return checkerQueue.peek();
}
+ @NotNull
+ private <T> TopicListener getTopicListener(T message) {
+ @SuppressWarnings("unchecked")
+ final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
+
+ // create a listener to detect when a matching message is received
+ return (infra, topic, msg) -> {
+ try {
+ T incoming = decode(msg, clazz);
+
+ if (message.equals(incoming)) {
+ logger.info("topic {} is ready; found matching message {}", topic, incoming);
+ checkerQueue.add(Boolean.TRUE);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot decode message from topic {}", topic, e);
+ decodeFailed();
+ }
+ };
+ }
+
/**
* Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
* adding {@code false} to the queue.