aboutsummaryrefslogtreecommitdiffstats
path: root/models-sim/models-sim-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'models-sim/models-sim-dmaap')
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java91
1 files changed, 42 insertions, 49 deletions
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
index 8c35de64e..50c319837 100644
--- a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
@@ -32,7 +32,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
-import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -50,27 +50,24 @@ import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
*/
public class EndToEndTest extends CommonRestServer {
private static final int MAX_WAIT_SEC = 5;
- private static final String ORIG_TOPIC = "MY-TOPIC";
- private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
+ private static final String TOPIC = "MY-TOPIC";
+ private static final String TOPIC2 = "MY-TOPIC-B";
private static final int MAX_MSG = 200;
- private static int ntests = 0;
- private static String topicJson;
-
- private TopicParameterGroup topicConfig;
-
- private String topic = "MY-TOPIC";
- private String topic2 = "MY-TOPIC-B";
-
/**
* Messages from the topic are placed here by the endpoint.
*/
- private BlockingQueue<String> queue;
+ private static BlockingQueue<String> queue;
/**
* Messages from topic 2 are placed here by the endpoint.
*/
- private BlockingQueue<String> queue2;
+ private static BlockingQueue<String> queue2;
+
+ /**
+ * Topic configuration parameters.
+ */
+ private static TopicParameterGroup topicConfig;
/**
* Starts the rest server.
@@ -83,10 +80,34 @@ public class EndToEndTest extends CommonRestServer {
CommonRestServer.setUpBeforeClass();
- topicJson = new String(
+ queue = new LinkedBlockingQueue<>();
+ queue2 = new LinkedBlockingQueue<>();
+
+ String json = new String(
Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
StandardCharsets.UTF_8);
- topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
+ json = json.replace("${port}", String.valueOf(getPort()));
+
+ topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
+
+ TopicEndpointManager.getManager().addTopics(topicConfig);
+ TopicEndpointManager.getManager().start();
+
+ TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
+ .register((infra, topic, event) -> queue.add(event));
+ TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
+ .register((infra, topic, event) -> queue2.add(event));
+ }
+
+ /**
+ * Stops the topics and clears the queues.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() {
+ TopicEndpointManager.getManager().shutdown();
+
+ queue = null;
+ queue2 = null;
}
/**
@@ -97,42 +118,15 @@ public class EndToEndTest extends CommonRestServer {
@Before
@Override
public void setUp() throws CoderException {
- queue = new LinkedBlockingQueue<>();
- queue2 = new LinkedBlockingQueue<>();
-
- /*
- * change topic names for each test so any listeners that may still exist will not
- * grab new messages
- */
-
- ++ntests;
- topic = "my-topic-" + ntests;
- topic2 = "my-topic-b" + ntests;
-
- String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
-
- topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
-
- TopicEndpointManager.getManager().addTopics(topicConfig);
- TopicEndpointManager.getManager().start();
- }
-
- @After
- public void tearDown() {
- TopicEndpointManager.getManager().shutdown();
+ queue.clear();
+ queue2.clear();
}
@Test
public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
- // register listeners to add events to appropriate queue
- TopicEndpointManager.getManager().getDmaapTopicSource(topic)
- .register((infra, topic, event) -> queue.add(event));
- TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
- .register((infra, topic, event) -> queue2.add(event));
-
// publish events
- TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
- TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
+ TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
+ TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2);
for (int x = 0; x < MAX_MSG; ++x) {
sink.send("hello-" + x);
sink2.send("world-" + x);
@@ -147,8 +141,10 @@ public class EndToEndTest extends CommonRestServer {
@Test
public void testCambriaFormat() throws Exception {
+ // @formatter:off
test("testCambriaFormat", "application/cambria",
(wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
+ // @formatter:on
}
@Test
@@ -175,9 +171,6 @@ public class EndToEndTest extends CommonRestServer {
String msg1 = "{'abc':10.0}".replace('\'', '"');
String msg2 = "{'def':20.0}".replace('\'', '"');
- TopicEndpointManager.getManager().getDmaapTopicSource(topic)
- .register((infra, topic, event) -> queue.add(event));
-
TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());