diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java | 29 |
1 files changed, 11 insertions, 18 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index a1e03155..57a331c1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; public class BidirectionalTopicClient { private final String sinkTopic; private final String sourceTopic; - private final TopicSinkClient sinkClient; + private final TopicSink sink; private final TopicSource source; private final CommInfrastructure sinkTopicCommInfrastructure; private final CommInfrastructure sourceTopicCommInfrastructure; @@ -55,18 +55,15 @@ public class BidirectionalTopicClient { this.sourceTopic = sourceTopic; // init sinkClient - try { - // if the manager is overridden here, then override it in the sink client, too - this.sinkClient = new TopicSinkClient(sinkTopic) { - @Override - protected List<TopicSink> getTopicSinks(String topic) { - return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic); - } - }; - } catch (TopicSinkClientException e) { - throw new BidirectionalTopicClientException(e); + List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); + if (sinks.isEmpty()) { + throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic); + } else if (sinks.size() > 1) { + throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic); } + this.sink = sinks.get(0); + // init source List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); if (sources.isEmpty()) { @@ -77,16 +74,12 @@ public class BidirectionalTopicClient { this.source = sources.get(0); - this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure(); + this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure(); this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure(); } - public TopicSink getSink() { - return sinkClient.getSink(); - } - - public boolean send(Object message) { - return sinkClient.send(message); + public boolean send(String message) { + return sink.send(message); } public void register(TopicListener topicListener) { |