aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-03-02 23:43:06 -0500
committerJim Hahn <jrh3@att.com>2020-03-02 23:43:06 -0500
commita8af78641afbc7af25f78e6c60b1a40475758cec (patch)
tree56b104e82850e03f60c3ab44c4f9e3896ffce764 /policy-endpoints/src/main/java
parentd59c619324f5813127e5150e2b3fd5d446cf3a48 (diff)
BidirectionalTopic should use plain TopicSink
BidirectionalTopicClient use plain TopicSink instead of TopicSinkClient, because the latter encodes its message, while BidirectionalTopicClient should not, because encoding should be left up to the user of the class. Issue-ID: POLICY-1625 Signed-off-by: Jim Hahn <jrh3@att.com> Change-Id: I6c67e1ee0c56e96a0efcc90eaf1c0a940902e8b3
Diffstat (limited to 'policy-endpoints/src/main/java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java29
1 files changed, 11 insertions, 18 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index a1e03155..57a331c1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
public class BidirectionalTopicClient {
private final String sinkTopic;
private final String sourceTopic;
- private final TopicSinkClient sinkClient;
+ private final TopicSink sink;
private final TopicSource source;
private final CommInfrastructure sinkTopicCommInfrastructure;
private final CommInfrastructure sourceTopicCommInfrastructure;
@@ -55,18 +55,15 @@ public class BidirectionalTopicClient {
this.sourceTopic = sourceTopic;
// init sinkClient
- try {
- // if the manager is overridden here, then override it in the sink client, too
- this.sinkClient = new TopicSinkClient(sinkTopic) {
- @Override
- protected List<TopicSink> getTopicSinks(String topic) {
- return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
- }
- };
- } catch (TopicSinkClientException e) {
- throw new BidirectionalTopicClientException(e);
+ List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+ if (sinks.isEmpty()) {
+ throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+ } else if (sinks.size() > 1) {
+ throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
}
+ this.sink = sinks.get(0);
+
// init source
List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
if (sources.isEmpty()) {
@@ -77,16 +74,12 @@ public class BidirectionalTopicClient {
this.source = sources.get(0);
- this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+ this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
}
- public TopicSink getSink() {
- return sinkClient.getSink();
- }
-
- public boolean send(Object message) {
- return sinkClient.send(message);
+ public boolean send(String message) {
+ return sink.send(message);
}
public void register(TopicListener topicListener) {