diff options
Diffstat (limited to 'models-sim/models-sim-dmaap')
-rw-r--r-- | models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java | 58 |
1 files changed, 53 insertions, 5 deletions
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java index 066c38bb6..5a83b4693 100644 --- a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java @@ -19,14 +19,22 @@ package org.onap.policy.sim.dmaap.e2e; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; import java.io.File; import java.io.PrintWriter; import java.net.HttpURLConnection; +import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.security.GeneralSecurityException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -39,7 +47,6 @@ import org.junit.Test; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; -import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.network.NetworkUtil; @@ -56,6 +63,7 @@ public class EndToEndTest extends CommonRestServer { private static final int MAX_WAIT_SEC = 5; private static final String TOPIC = "MY-TOPIC"; private static final String TOPIC2 = "MY-TOPIC-B"; + private static final String TOPIC3 = "MY-TOPIC-C"; private static final int MAX_MSG = 200; private static Main main; @@ -76,6 +84,16 @@ public class EndToEndTest extends CommonRestServer { private static TopicParameterGroup topicConfig; /** + * The "host:port", extracted from <i>httpPrefix</i>. + */ + private static String hostPort; + + /** + * Unique consumer name used by a single test case. + */ + private int consumerName; + + /** * Starts the rest server. * * @throws Exception if an error occurs @@ -105,6 +123,8 @@ public class EndToEndTest extends CommonRestServer { .register((infra, topic, event) -> queue.add(event)); TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2) .register((infra, topic, event) -> queue2.add(event)); + + hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1); } /** @@ -127,6 +147,8 @@ public class EndToEndTest extends CommonRestServer { */ @Before public void setUp() { + ++consumerName; + queue.clear(); queue2.clear(); } @@ -177,11 +199,16 @@ public class EndToEndTest extends CommonRestServer { */ private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages) throws Exception { + + /* + * Force consumer name to be registered with the server by attempting to fetch a message. + */ + buildConsumer(0).fetch(); + String msg1 = "{'abc':10.0}".replace('\'', '"'); String msg2 = "{'def':20.0}".replace('\'', '"'); - TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0); - URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic()); + URL url = new URL(httpPrefix + "events/" + TOPIC3); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("POST"); @@ -195,8 +222,29 @@ public class EndToEndTest extends CommonRestServer { assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode()); - assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); - assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + // fetch the messages + Iterator<String> iter = buildConsumer(1000).fetch().iterator(); + + assertTrue(testName + " have message 1", iter.hasNext()); + assertEquals(testName + " message 1", msg1, iter.next()); + + assertTrue(testName + " have message 2", iter.hasNext()); + assertEquals(testName + " message 2", msg2, iter.next()); + + // no more messages + assertFalse(testName + " extra message", iter.hasNext()); + } + + private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException { + ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); + + builder.knownAs(String.valueOf(consumerName), "my-consumer-id") + .usingHosts(hostPort).onTopic(TOPIC3) + .waitAtServer(timeoutMs).receivingAtMost(5); + + builder.withSocketTimeout(timeoutMs + 2000); + + return builder.build(); } /** |