diff options
4 files changed, 32 insertions, 28 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index a1e03155..57a331c1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; public class BidirectionalTopicClient { private final String sinkTopic; private final String sourceTopic; - private final TopicSinkClient sinkClient; + private final TopicSink sink; private final TopicSource source; private final CommInfrastructure sinkTopicCommInfrastructure; private final CommInfrastructure sourceTopicCommInfrastructure; @@ -55,18 +55,15 @@ public class BidirectionalTopicClient { this.sourceTopic = sourceTopic; // init sinkClient - try { - // if the manager is overridden here, then override it in the sink client, too - this.sinkClient = new TopicSinkClient(sinkTopic) { - @Override - protected List<TopicSink> getTopicSinks(String topic) { - return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic); - } - }; - } catch (TopicSinkClientException e) { - throw new BidirectionalTopicClientException(e); + List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); + if (sinks.isEmpty()) { + throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic); + } else if (sinks.size() > 1) { + throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic); } + this.sink = sinks.get(0); + // init source List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); if (sources.isEmpty()) { @@ -77,16 +74,12 @@ public class BidirectionalTopicClient { this.source = sources.get(0); - this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure(); + this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure(); this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure(); } - public TopicSink getSink() { - return sinkClient.getSink(); - } - - public boolean send(Object message) { - return sinkClient.send(message); + public boolean send(String message) { + return sink.send(message); } public void register(TopicListener topicListener) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java index 9e4e412f..49874843 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.Future; import javax.ws.rs.client.Entity; import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import org.onap.policy.common.capabilities.Startable; @@ -213,4 +214,10 @@ public interface HttpClient extends Startable { */ String getBaseUrl(); + /** + * Gets a web target associated with the base URL. + * + * @return a webtarget + */ + WebTarget getWebTarget(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java index 38ec6829..c6a4fa41 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java @@ -36,6 +36,7 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation.Builder; import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.glassfish.jersey.client.ClientProperties; @@ -158,17 +159,22 @@ public class JerseyClient implements HttpClient { } @Override + public WebTarget getWebTarget() { + return this.client.target(this.baseUrl); + } + + @Override public Response get(String path) { if (!StringUtils.isBlank(path)) { - return this.client.target(this.baseUrl).path(path).request().get(); + return getWebTarget().path(path).request().get(); } else { - return this.client.target(this.baseUrl).request().get(); + return getWebTarget().request().get(); } } @Override public Response get() { - return this.client.target(this.baseUrl).request().get(); + return getWebTarget().request().get(); } @Override @@ -184,7 +190,7 @@ public class JerseyClient implements HttpClient { @Override public Future<Response> get(InvocationCallback<Response> callback, Map<String, Object> headers) { - Builder builder = this.client.target(this.baseUrl).request(); + Builder builder = getWebTarget().request(); if (headers != null) { headers.forEach(builder::header); } @@ -328,7 +334,7 @@ public class JerseyClient implements HttpClient { } private Builder getBuilder(String path, Map<String, Object> headers) { - Builder builder = this.client.target(this.baseUrl).path(path).request(); + Builder builder = getWebTarget().path(path).request(); for (Entry<String, Object> header : headers.entrySet()) { builder.header(header.getKey(), header.getValue()); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java index 9b1018d0..3f0a002c 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java @@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; -import java.util.Map; import java.util.Properties; import org.junit.AfterClass; import org.junit.Before; @@ -111,7 +110,6 @@ public class BidirectionalTopicClientTest { @Test public void testBidirectionalTopicClient_testGetters() { - assertNotNull(client.getSinkClient()); assertSame(sink, client.getSink()); assertSame(source, client.getSource()); assertEquals(SINK_TOPIC, client.getSinkTopic()); @@ -127,7 +125,7 @@ public class BidirectionalTopicClientTest { public void testBidirectionalTopicClientExceptions() { assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC)) .isInstanceOf(BidirectionalTopicClientException.class) - .hasCauseInstanceOf(TopicSinkClientException.class); + .hasMessage("no sinks for topic: unknown-sink"); assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source")) .isInstanceOf(BidirectionalTopicClientException.class) @@ -146,8 +144,8 @@ public class BidirectionalTopicClientTest { */ @Test public void testDelegates() { - assertTrue(client.send(Map.of("outgoing", "outgoing-text"))); - verify(sink).send("{\"outgoing\":\"outgoing-text\"}"); + assertTrue(client.send("hello")); + verify(sink).send("hello"); assertTrue(client.offer("incoming")); verify(source).offer("incoming"); |