From b124ec93d5b5ad22867a84624a0a07ab625d4356 Mon Sep 17 00:00:00 2001 From: Jorge Hernandez Date: Mon, 18 Mar 2019 15:59:04 -0500 Subject: TopicSinkClient support for unmanaged topics Change-Id: I6b92dcc0f225aa712b34adb9a1f9ab47df412c81 Issue-ID: POLICY-1608 Signed-off-by: Jorge Hernandez --- .../event/comm/client/TopicSinkClient.java | 31 ++++++++++----- .../event/comm/client/TopicSinkClientTest.java | 46 ++++++++++------------ 2 files changed, 43 insertions(+), 34 deletions(-) (limited to 'policy-endpoints/src') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java index e44d1f76..f08a1381 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -23,6 +23,7 @@ package org.onap.policy.common.endpoints.event.comm.client; import java.util.List; import lombok.Getter; +import lombok.NonNull; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.utils.coder.Coder; @@ -42,15 +43,10 @@ public class TopicSinkClient { */ private static final Coder CODER = new StandardCoder(); - /** - * Topic to which messages are published. - */ - @Getter - private final String topic; - /** * Where messages are published. */ + @Getter private final TopicSink sink; /** @@ -60,8 +56,6 @@ public class TopicSinkClient { * @throws TopicSinkClientException if the topic does not exist */ public TopicSinkClient(final String topic) throws TopicSinkClientException { - this.topic = topic; - final List lst = getTopicSinks(topic); if (lst.isEmpty()) { throw new TopicSinkClientException("no sinks for topic: " + topic); @@ -70,6 +64,25 @@ public class TopicSinkClient { this.sink = lst.get(0); } + /** + * Constructs the client from a sink object. + * + * @param sink topic sink publisher + */ + public TopicSinkClient(@NonNull TopicSink sink) { + this.sink = sink; + } + + + /** + * Gets the canonical topic name. + * + * @return topic name + */ + public String getTopic() { + return this.sink.getTopic(); + } + /** * Sends a message to the topic, after encoding the message as json. * @@ -82,7 +95,7 @@ public class TopicSinkClient { return sink.send(json); } catch (RuntimeException | CoderException e) { - logger.warn("send to {} failed because of {}", topic, e.getMessage(), e); + logger.warn("send to {} failed because of {}", sink.getTopic(), e.getMessage(), e); return false; } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java index 725c0418..07630cd4 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java @@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -36,18 +35,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; - import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; public class TopicSinkClientTest { - private static final String SINK_FIELD_NAME = "sink"; private static final String TOPIC = "my-topic"; private TopicSinkClient client; @@ -67,10 +61,17 @@ public class TopicSinkClientTest { sinks = Arrays.asList(sink, null); client = new TopicSinkClient2(TOPIC); + + Properties props = new Properties(); + props.setProperty("noop.sink.topics", TOPIC); + + // clear all topics and then configure one topic + TopicEndpoint.manager.shutdown(); + TopicEndpoint.manager.addTopicSinks(props); } @AfterClass - public static void tearDown() throws Exception { + public static void tearDown() { // clear all topics after the tests TopicEndpoint.manager.shutdown(); } @@ -80,25 +81,13 @@ public class TopicSinkClientTest { */ @Test public void testGetTopicSinks() throws Exception { - // clear all topics and then configure one topic - TopicEndpoint.manager.shutdown(); - - final Properties props = new Properties(); - props.setProperty("noop.sink.topics", TOPIC); - TopicEndpoint.manager.addTopicSinks(props); sink = TopicEndpoint.manager.getNoopTopicSink(TOPIC); assertNotNull(sink); final AtomicReference evref = new AtomicReference<>(null); - sink.register(new TopicListener() { - @Override - public void onTopicEvent(final CommInfrastructure infra, final String topic, final String event) { - evref.set(event); - } - }); - + sink.register((infra, topic, event) -> evref.set(event)); sink.start(); client = new TopicSinkClient(TOPIC); @@ -108,10 +97,7 @@ public class TopicSinkClientTest { } @Test - public void testTopicSinkClient_testGetTopic() { - assertEquals(TOPIC, client.getTopic()); - assertSame(sink, Whitebox.getInternalState(client, SINK_FIELD_NAME)); - + public void testTopicSinkClient() { // unknown topic -> should throw exception sinks = new LinkedList<>(); assertThatThrownBy(() -> new TopicSinkClient2(TOPIC)).isInstanceOf(TopicSinkClientException.class) @@ -119,7 +105,17 @@ public class TopicSinkClientTest { } @Test - public void testSend() throws Exception { + public void testTopicSinkClient_GetTopic() throws TopicSinkClientException { + assertEquals(TOPIC, new TopicSinkClient(TopicEndpoint.manager.getNoopTopicSink(TOPIC)).getTopic()); + assertEquals(TOPIC, new TopicSinkClient(TOPIC).getTopic()); + + assertThatThrownBy(() -> new TopicSinkClient((TopicSink) null)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> new TopicSinkClient("blah")).isInstanceOf(TopicSinkClientException.class) + .hasMessage("no sinks for topic: blah"); + } + + @Test + public void testSend() { client.send(Arrays.asList("abc", "def")); verify(sink).send("['abc','def']".replace('\'', '"')); -- cgit 1.2.3-korg