aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java29
1 files changed, 11 insertions, 18 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index a1e03155..57a331c1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
public class BidirectionalTopicClient {
private final String sinkTopic;
private final String sourceTopic;
- private final TopicSinkClient sinkClient;
+ private final TopicSink sink;
private final TopicSource source;
private final CommInfrastructure sinkTopicCommInfrastructure;
private final CommInfrastructure sourceTopicCommInfrastructure;
@@ -55,18 +55,15 @@ public class BidirectionalTopicClient {
this.sourceTopic = sourceTopic;
// init sinkClient
- try {
- // if the manager is overridden here, then override it in the sink client, too
- this.sinkClient = new TopicSinkClient(sinkTopic) {
- @Override
- protected List<TopicSink> getTopicSinks(String topic) {
- return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
- }
- };
- } catch (TopicSinkClientException e) {
- throw new BidirectionalTopicClientException(e);
+ List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+ if (sinks.isEmpty()) {
+ throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+ } else if (sinks.size() > 1) {
+ throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
}
+ this.sink = sinks.get(0);
+
// init source
List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
if (sources.isEmpty()) {
@@ -77,16 +74,12 @@ public class BidirectionalTopicClient {
this.source = sources.get(0);
- this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+ this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
}
- public TopicSink getSink() {
- return sinkClient.getSink();
- }
-
- public boolean send(Object message) {
- return sinkClient.send(message);
+ public boolean send(String message) {
+ return sink.send(message);
}
public void register(TopicListener topicListener) {