diff options
2 files changed, 14 insertions, 23 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index a1e03155..57a331c1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; public class BidirectionalTopicClient { private final String sinkTopic; private final String sourceTopic; - private final TopicSinkClient sinkClient; + private final TopicSink sink; private final TopicSource source; private final CommInfrastructure sinkTopicCommInfrastructure; private final CommInfrastructure sourceTopicCommInfrastructure; @@ -55,18 +55,15 @@ public class BidirectionalTopicClient { this.sourceTopic = sourceTopic; // init sinkClient - try { - // if the manager is overridden here, then override it in the sink client, too - this.sinkClient = new TopicSinkClient(sinkTopic) { - @Override - protected List<TopicSink> getTopicSinks(String topic) { - return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic); - } - }; - } catch (TopicSinkClientException e) { - throw new BidirectionalTopicClientException(e); + List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); + if (sinks.isEmpty()) { + throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic); + } else if (sinks.size() > 1) { + throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic); } + this.sink = sinks.get(0); + // init source List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); if (sources.isEmpty()) { @@ -77,16 +74,12 @@ public class BidirectionalTopicClient { this.source = sources.get(0); - this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure(); + this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure(); this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure(); } - public TopicSink getSink() { - return sinkClient.getSink(); - } - - public boolean send(Object message) { - return sinkClient.send(message); + public boolean send(String message) { + return sink.send(message); } public void register(TopicListener topicListener) { diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java index 9b1018d0..3f0a002c 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java @@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; -import java.util.Map; import java.util.Properties; import org.junit.AfterClass; import org.junit.Before; @@ -111,7 +110,6 @@ public class BidirectionalTopicClientTest { @Test public void testBidirectionalTopicClient_testGetters() { - assertNotNull(client.getSinkClient()); assertSame(sink, client.getSink()); assertSame(source, client.getSource()); assertEquals(SINK_TOPIC, client.getSinkTopic()); @@ -127,7 +125,7 @@ public class BidirectionalTopicClientTest { public void testBidirectionalTopicClientExceptions() { assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC)) .isInstanceOf(BidirectionalTopicClientException.class) - .hasCauseInstanceOf(TopicSinkClientException.class); + .hasMessage("no sinks for topic: unknown-sink"); assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source")) .isInstanceOf(BidirectionalTopicClientException.class) @@ -146,8 +144,8 @@ public class BidirectionalTopicClientTest { */ @Test public void testDelegates() { - assertTrue(client.send(Map.of("outgoing", "outgoing-text"))); - verify(sink).send("{\"outgoing\":\"outgoing-text\"}"); + assertTrue(client.send("hello")); + verify(sink).send("hello"); assertTrue(client.offer("incoming")); verify(source).offer("incoming"); |