aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java31
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java46
2 files changed, 43 insertions, 34 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
index e44d1f76..f08a1381 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
@@ -23,6 +23,7 @@ package org.onap.policy.common.endpoints.event.comm.client;
import java.util.List;
import lombok.Getter;
+import lombok.NonNull;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.utils.coder.Coder;
@@ -43,14 +44,9 @@ public class TopicSinkClient {
private static final Coder CODER = new StandardCoder();
/**
- * Topic to which messages are published.
- */
- @Getter
- private final String topic;
-
- /**
* Where messages are published.
*/
+ @Getter
private final TopicSink sink;
/**
@@ -60,8 +56,6 @@ public class TopicSinkClient {
* @throws TopicSinkClientException if the topic does not exist
*/
public TopicSinkClient(final String topic) throws TopicSinkClientException {
- this.topic = topic;
-
final List<TopicSink> lst = getTopicSinks(topic);
if (lst.isEmpty()) {
throw new TopicSinkClientException("no sinks for topic: " + topic);
@@ -71,6 +65,25 @@ public class TopicSinkClient {
}
/**
+ * Constructs the client from a sink object.
+ *
+ * @param sink topic sink publisher
+ */
+ public TopicSinkClient(@NonNull TopicSink sink) {
+ this.sink = sink;
+ }
+
+
+ /**
+ * Gets the canonical topic name.
+ *
+ * @return topic name
+ */
+ public String getTopic() {
+ return this.sink.getTopic();
+ }
+
+ /**
* Sends a message to the topic, after encoding the message as json.
*
* @param message message to be encoded and sent
@@ -82,7 +95,7 @@ public class TopicSinkClient {
return sink.send(json);
} catch (RuntimeException | CoderException e) {
- logger.warn("send to {} failed because of {}", topic, e.getMessage(), e);
+ logger.warn("send to {} failed because of {}", sink.getTopic(), e.getMessage(), e);
return false;
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java
index 725c0418..07630cd4 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -36,18 +35,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
-
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
public class TopicSinkClientTest {
- private static final String SINK_FIELD_NAME = "sink";
private static final String TOPIC = "my-topic";
private TopicSinkClient client;
@@ -67,10 +61,17 @@ public class TopicSinkClientTest {
sinks = Arrays.asList(sink, null);
client = new TopicSinkClient2(TOPIC);
+
+ Properties props = new Properties();
+ props.setProperty("noop.sink.topics", TOPIC);
+
+ // clear all topics and then configure one topic
+ TopicEndpoint.manager.shutdown();
+ TopicEndpoint.manager.addTopicSinks(props);
}
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDown() {
// clear all topics after the tests
TopicEndpoint.manager.shutdown();
}
@@ -80,25 +81,13 @@ public class TopicSinkClientTest {
*/
@Test
public void testGetTopicSinks() throws Exception {
- // clear all topics and then configure one topic
- TopicEndpoint.manager.shutdown();
-
- final Properties props = new Properties();
- props.setProperty("noop.sink.topics", TOPIC);
- TopicEndpoint.manager.addTopicSinks(props);
sink = TopicEndpoint.manager.getNoopTopicSink(TOPIC);
assertNotNull(sink);
final AtomicReference<String> evref = new AtomicReference<>(null);
- sink.register(new TopicListener() {
- @Override
- public void onTopicEvent(final CommInfrastructure infra, final String topic, final String event) {
- evref.set(event);
- }
- });
-
+ sink.register((infra, topic, event) -> evref.set(event));
sink.start();
client = new TopicSinkClient(TOPIC);
@@ -108,10 +97,7 @@ public class TopicSinkClientTest {
}
@Test
- public void testTopicSinkClient_testGetTopic() {
- assertEquals(TOPIC, client.getTopic());
- assertSame(sink, Whitebox.getInternalState(client, SINK_FIELD_NAME));
-
+ public void testTopicSinkClient() {
// unknown topic -> should throw exception
sinks = new LinkedList<>();
assertThatThrownBy(() -> new TopicSinkClient2(TOPIC)).isInstanceOf(TopicSinkClientException.class)
@@ -119,7 +105,17 @@ public class TopicSinkClientTest {
}
@Test
- public void testSend() throws Exception {
+ public void testTopicSinkClient_GetTopic() throws TopicSinkClientException {
+ assertEquals(TOPIC, new TopicSinkClient(TopicEndpoint.manager.getNoopTopicSink(TOPIC)).getTopic());
+ assertEquals(TOPIC, new TopicSinkClient(TOPIC).getTopic());
+
+ assertThatThrownBy(() -> new TopicSinkClient((TopicSink) null)).isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> new TopicSinkClient("blah")).isInstanceOf(TopicSinkClientException.class)
+ .hasMessage("no sinks for topic: blah");
+ }
+
+ @Test
+ public void testSend() {
client.send(Arrays.asList("abc", "def"));
verify(sink).send("['abc','def']".replace('\'', '"'));