diff options
Diffstat (limited to 'models-sim/models-sim-dmaap/src')
-rw-r--r-- | models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java | 91 |
1 files changed, 42 insertions, 49 deletions
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java index 8c35de64e..50c319837 100644 --- a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -50,27 +50,24 @@ import org.onap.policy.models.sim.dmaap.rest.CommonRestServer; */ public class EndToEndTest extends CommonRestServer { private static final int MAX_WAIT_SEC = 5; - private static final String ORIG_TOPIC = "MY-TOPIC"; - private static final String ORIG_TOPIC2 = "MY-TOPIC-B"; + private static final String TOPIC = "MY-TOPIC"; + private static final String TOPIC2 = "MY-TOPIC-B"; private static final int MAX_MSG = 200; - private static int ntests = 0; - private static String topicJson; - - private TopicParameterGroup topicConfig; - - private String topic = "MY-TOPIC"; - private String topic2 = "MY-TOPIC-B"; - /** * Messages from the topic are placed here by the endpoint. */ - private BlockingQueue<String> queue; + private static BlockingQueue<String> queue; /** * Messages from topic 2 are placed here by the endpoint. */ - private BlockingQueue<String> queue2; + private static BlockingQueue<String> queue2; + + /** + * Topic configuration parameters. + */ + private static TopicParameterGroup topicConfig; /** * Starts the rest server. @@ -83,10 +80,34 @@ public class EndToEndTest extends CommonRestServer { CommonRestServer.setUpBeforeClass(); - topicJson = new String( + queue = new LinkedBlockingQueue<>(); + queue2 = new LinkedBlockingQueue<>(); + + String json = new String( Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()), StandardCharsets.UTF_8); - topicJson = topicJson.replace("${port}", String.valueOf(getPort())); + json = json.replace("${port}", String.valueOf(getPort())); + + topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class); + + TopicEndpointManager.getManager().addTopics(topicConfig); + TopicEndpointManager.getManager().start(); + + TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC) + .register((infra, topic, event) -> queue.add(event)); + TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2) + .register((infra, topic, event) -> queue2.add(event)); + } + + /** + * Stops the topics and clears the queues. + */ + @AfterClass + public static void tearDownAfterClass() { + TopicEndpointManager.getManager().shutdown(); + + queue = null; + queue2 = null; } /** @@ -97,42 +118,15 @@ public class EndToEndTest extends CommonRestServer { @Before @Override public void setUp() throws CoderException { - queue = new LinkedBlockingQueue<>(); - queue2 = new LinkedBlockingQueue<>(); - - /* - * change topic names for each test so any listeners that may still exist will not - * grab new messages - */ - - ++ntests; - topic = "my-topic-" + ntests; - topic2 = "my-topic-b" + ntests; - - String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic); - - topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class); - - TopicEndpointManager.getManager().addTopics(topicConfig); - TopicEndpointManager.getManager().start(); - } - - @After - public void tearDown() { - TopicEndpointManager.getManager().shutdown(); + queue.clear(); + queue2.clear(); } @Test public void testWithTopicEndpointAtEachEnd() throws InterruptedException { - // register listeners to add events to appropriate queue - TopicEndpointManager.getManager().getDmaapTopicSource(topic) - .register((infra, topic, event) -> queue.add(event)); - TopicEndpointManager.getManager().getDmaapTopicSource(topic2) - .register((infra, topic, event) -> queue2.add(event)); - // publish events - TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic); - TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2); + TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC); + TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2); for (int x = 0; x < MAX_MSG; ++x) { sink.send("hello-" + x); sink2.send("world-" + x); @@ -147,8 +141,10 @@ public class EndToEndTest extends CommonRestServer { @Test public void testCambriaFormat() throws Exception { + // @formatter:off test("testCambriaFormat", "application/cambria", (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n"))); + // @formatter:on } @Test @@ -175,9 +171,6 @@ public class EndToEndTest extends CommonRestServer { String msg1 = "{'abc':10.0}".replace('\'', '"'); String msg2 = "{'def':20.0}".replace('\'', '"'); - TopicEndpointManager.getManager().getDmaapTopicSource(topic) - .register((infra, topic, event) -> queue.add(event)); - TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0); URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic()); |