summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java29
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java7
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java16
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java8
4 files changed, 32 insertions, 28 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index a1e03155..57a331c1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
public class BidirectionalTopicClient {
private final String sinkTopic;
private final String sourceTopic;
- private final TopicSinkClient sinkClient;
+ private final TopicSink sink;
private final TopicSource source;
private final CommInfrastructure sinkTopicCommInfrastructure;
private final CommInfrastructure sourceTopicCommInfrastructure;
@@ -55,18 +55,15 @@ public class BidirectionalTopicClient {
this.sourceTopic = sourceTopic;
// init sinkClient
- try {
- // if the manager is overridden here, then override it in the sink client, too
- this.sinkClient = new TopicSinkClient(sinkTopic) {
- @Override
- protected List<TopicSink> getTopicSinks(String topic) {
- return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
- }
- };
- } catch (TopicSinkClientException e) {
- throw new BidirectionalTopicClientException(e);
+ List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+ if (sinks.isEmpty()) {
+ throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+ } else if (sinks.size() > 1) {
+ throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
}
+ this.sink = sinks.get(0);
+
// init source
List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
if (sources.isEmpty()) {
@@ -77,16 +74,12 @@ public class BidirectionalTopicClient {
this.source = sources.get(0);
- this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+ this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
}
- public TopicSink getSink() {
- return sinkClient.getSink();
- }
-
- public boolean send(Object message) {
- return sinkClient.send(message);
+ public boolean send(String message) {
+ return sink.send(message);
}
public void register(TopicListener topicListener) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java
index 9e4e412f..49874843 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.Future;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.onap.policy.common.capabilities.Startable;
@@ -213,4 +214,10 @@ public interface HttpClient extends Startable {
*/
String getBaseUrl();
+ /**
+ * Gets a web target associated with the base URL.
+ *
+ * @return a webtarget
+ */
+ WebTarget getWebTarget();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java
index 38ec6829..c6a4fa41 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java
@@ -36,6 +36,7 @@ import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.client.ClientProperties;
@@ -158,17 +159,22 @@ public class JerseyClient implements HttpClient {
}
@Override
+ public WebTarget getWebTarget() {
+ return this.client.target(this.baseUrl);
+ }
+
+ @Override
public Response get(String path) {
if (!StringUtils.isBlank(path)) {
- return this.client.target(this.baseUrl).path(path).request().get();
+ return getWebTarget().path(path).request().get();
} else {
- return this.client.target(this.baseUrl).request().get();
+ return getWebTarget().request().get();
}
}
@Override
public Response get() {
- return this.client.target(this.baseUrl).request().get();
+ return getWebTarget().request().get();
}
@Override
@@ -184,7 +190,7 @@ public class JerseyClient implements HttpClient {
@Override
public Future<Response> get(InvocationCallback<Response> callback, Map<String, Object> headers) {
- Builder builder = this.client.target(this.baseUrl).request();
+ Builder builder = getWebTarget().request();
if (headers != null) {
headers.forEach(builder::header);
}
@@ -328,7 +334,7 @@ public class JerseyClient implements HttpClient {
}
private Builder getBuilder(String path, Map<String, Object> headers) {
- Builder builder = this.client.target(this.baseUrl).path(path).request();
+ Builder builder = getWebTarget().path(path).request();
for (Entry<String, Object> header : headers.entrySet()) {
builder.header(header.getKey(), header.getValue());
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
index 9b1018d0..3f0a002c 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import java.util.Properties;
import org.junit.AfterClass;
import org.junit.Before;
@@ -111,7 +110,6 @@ public class BidirectionalTopicClientTest {
@Test
public void testBidirectionalTopicClient_testGetters() {
- assertNotNull(client.getSinkClient());
assertSame(sink, client.getSink());
assertSame(source, client.getSource());
assertEquals(SINK_TOPIC, client.getSinkTopic());
@@ -127,7 +125,7 @@ public class BidirectionalTopicClientTest {
public void testBidirectionalTopicClientExceptions() {
assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
.isInstanceOf(BidirectionalTopicClientException.class)
- .hasCauseInstanceOf(TopicSinkClientException.class);
+ .hasMessage("no sinks for topic: unknown-sink");
assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
.isInstanceOf(BidirectionalTopicClientException.class)
@@ -146,8 +144,8 @@ public class BidirectionalTopicClientTest {
*/
@Test
public void testDelegates() {
- assertTrue(client.send(Map.of("outgoing", "outgoing-text")));
- verify(sink).send("{\"outgoing\":\"outgoing-text\"}");
+ assertTrue(client.send("hello"));
+ verify(sink).send("hello");
assertTrue(client.offer("incoming"));
verify(source).offer("incoming");