aboutsummaryrefslogtreecommitdiffstats
path: root/models-sim/models-sim-dmaap/src
diff options
context:
space:
mode:
Diffstat (limited to 'models-sim/models-sim-dmaap/src')
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java58
1 files changed, 53 insertions, 5 deletions
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
index 066c38bb6..5a83b4693 100644
--- a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
@@ -19,14 +19,22 @@
package org.onap.policy.sim.dmaap.e2e;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
import java.io.File;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.security.GeneralSecurityException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -39,7 +47,6 @@ import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.network.NetworkUtil;
@@ -56,6 +63,7 @@ public class EndToEndTest extends CommonRestServer {
private static final int MAX_WAIT_SEC = 5;
private static final String TOPIC = "MY-TOPIC";
private static final String TOPIC2 = "MY-TOPIC-B";
+ private static final String TOPIC3 = "MY-TOPIC-C";
private static final int MAX_MSG = 200;
private static Main main;
@@ -76,6 +84,16 @@ public class EndToEndTest extends CommonRestServer {
private static TopicParameterGroup topicConfig;
/**
+ * The "host:port", extracted from <i>httpPrefix</i>.
+ */
+ private static String hostPort;
+
+ /**
+ * Unique consumer name used by a single test case.
+ */
+ private int consumerName;
+
+ /**
* Starts the rest server.
*
* @throws Exception if an error occurs
@@ -105,6 +123,8 @@ public class EndToEndTest extends CommonRestServer {
.register((infra, topic, event) -> queue.add(event));
TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
.register((infra, topic, event) -> queue2.add(event));
+
+ hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1);
}
/**
@@ -127,6 +147,8 @@ public class EndToEndTest extends CommonRestServer {
*/
@Before
public void setUp() {
+ ++consumerName;
+
queue.clear();
queue2.clear();
}
@@ -177,11 +199,16 @@ public class EndToEndTest extends CommonRestServer {
*/
private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
throws Exception {
+
+ /*
+ * Force consumer name to be registered with the server by attempting to fetch a message.
+ */
+ buildConsumer(0).fetch();
+
String msg1 = "{'abc':10.0}".replace('\'', '"');
String msg2 = "{'def':20.0}".replace('\'', '"');
- TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
- URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+ URL url = new URL(httpPrefix + "events/" + TOPIC3);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
@@ -195,8 +222,29 @@ public class EndToEndTest extends CommonRestServer {
assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
- assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
- assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ // fetch the messages
+ Iterator<String> iter = buildConsumer(1000).fetch().iterator();
+
+ assertTrue(testName + " have message 1", iter.hasNext());
+ assertEquals(testName + " message 1", msg1, iter.next());
+
+ assertTrue(testName + " have message 2", iter.hasNext());
+ assertEquals(testName + " message 2", msg2, iter.next());
+
+ // no more messages
+ assertFalse(testName + " extra message", iter.hasNext());
+ }
+
+ private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException {
+ ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+
+ builder.knownAs(String.valueOf(consumerName), "my-consumer-id")
+ .usingHosts(hostPort).onTopic(TOPIC3)
+ .waitAtServer(timeoutMs).receivingAtMost(5);
+
+ builder.withSocketTimeout(timeoutMs + 2000);
+
+ return builder.build();
}
/**