summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java29
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java8
2 files changed, 14 insertions, 23 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index a1e03155..57a331c1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
public class BidirectionalTopicClient {
private final String sinkTopic;
private final String sourceTopic;
- private final TopicSinkClient sinkClient;
+ private final TopicSink sink;
private final TopicSource source;
private final CommInfrastructure sinkTopicCommInfrastructure;
private final CommInfrastructure sourceTopicCommInfrastructure;
@@ -55,18 +55,15 @@ public class BidirectionalTopicClient {
this.sourceTopic = sourceTopic;
// init sinkClient
- try {
- // if the manager is overridden here, then override it in the sink client, too
- this.sinkClient = new TopicSinkClient(sinkTopic) {
- @Override
- protected List<TopicSink> getTopicSinks(String topic) {
- return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
- }
- };
- } catch (TopicSinkClientException e) {
- throw new BidirectionalTopicClientException(e);
+ List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+ if (sinks.isEmpty()) {
+ throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+ } else if (sinks.size() > 1) {
+ throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
}
+ this.sink = sinks.get(0);
+
// init source
List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
if (sources.isEmpty()) {
@@ -77,16 +74,12 @@ public class BidirectionalTopicClient {
this.source = sources.get(0);
- this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+ this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
}
- public TopicSink getSink() {
- return sinkClient.getSink();
- }
-
- public boolean send(Object message) {
- return sinkClient.send(message);
+ public boolean send(String message) {
+ return sink.send(message);
}
public void register(TopicListener topicListener) {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
index 9b1018d0..3f0a002c 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import java.util.Properties;
import org.junit.AfterClass;
import org.junit.Before;
@@ -111,7 +110,6 @@ public class BidirectionalTopicClientTest {
@Test
public void testBidirectionalTopicClient_testGetters() {
- assertNotNull(client.getSinkClient());
assertSame(sink, client.getSink());
assertSame(source, client.getSource());
assertEquals(SINK_TOPIC, client.getSinkTopic());
@@ -127,7 +125,7 @@ public class BidirectionalTopicClientTest {
public void testBidirectionalTopicClientExceptions() {
assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
.isInstanceOf(BidirectionalTopicClientException.class)
- .hasCauseInstanceOf(TopicSinkClientException.class);
+ .hasMessage("no sinks for topic: unknown-sink");
assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
.isInstanceOf(BidirectionalTopicClientException.class)
@@ -146,8 +144,8 @@ public class BidirectionalTopicClientTest {
*/
@Test
public void testDelegates() {
- assertTrue(client.send(Map.of("outgoing", "outgoing-text")));
- verify(sink).send("{\"outgoing\":\"outgoing-text\"}");
+ assertTrue(client.send("hello"));
+ verify(sink).send("hello");
assertTrue(client.offer("incoming"));
verify(source).offer("incoming");