aboutsummaryrefslogtreecommitdiffstats
path: root/feature-simulators/src
diff options
context:
space:
mode:
Diffstat (limited to 'feature-simulators/src')
-rw-r--r--feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java123
-rw-r--r--feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java364
2 files changed, 487 insertions, 0 deletions
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
new file mode 100644
index 00000000..bdabc6ee
--- /dev/null
+++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
@@ -0,0 +1,123 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-simulators
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.simulators;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/events")
+public class DMaaPSimulatorJaxRs {
+
+ private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>();
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
+ private static int responseCode = 200;
+
+ @GET
+ @Path("/{topicName}/{consumeGroup}/{consumerId}")
+ public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") String topicName,
+ @Context final HttpServletResponse httpResponse) {
+ int currentRespCode = responseCode;
+ httpResponse.setStatus(currentRespCode);
+ try {
+ httpResponse.flushBuffer();
+ } catch (IOException e) {
+ final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
+ logger.error("flushBuffer threw: ", e);
+ return "Got an error";
+ }
+
+ if (currentRespCode < 200 || currentRespCode >= 300)
+ {
+ return "You got response code: " + currentRespCode;
+ }
+ if (queues.containsKey(topicName)) {
+ BlockingQueue<String> queue = queues.get(topicName);
+ String response = "No Data";
+ try {
+ response = queue.poll(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.debug("error in DMaaP simulator", e);
+ }
+ if (response == null) {
+ response = "No Data";
+ }
+ return response;
+ }
+ else if (timeout > 0) {
+ try {
+ Thread.sleep(timeout);
+ if (queues.containsKey(topicName)) {
+ BlockingQueue<String> queue = queues.get(topicName);
+ String response = queue.poll();
+ if (response == null) {
+ response = "No Data";
+ }
+ return response;
+ }
+ } catch (InterruptedException e) {
+ logger.debug("error in DMaaP simulator", e);
+ }
+ }
+ return "No topic";
+ }
+
+ @POST
+ @Path("/{topicName}")
+ @Consumes(MediaType.TEXT_PLAIN)
+ public String publish(@PathParam("topicName") String topicName, String body) {
+ if (queues.containsKey(topicName)) {
+ BlockingQueue<String> queue = queues.get(topicName);
+ queue.offer(body);
+ }
+ else {
+ BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+ queue.offer(body);
+ queues.put(topicName, queue);
+ }
+
+ return "";
+ }
+
+ @POST
+ @Path("/setStatus")
+ public String setStatus(@QueryParam("statusCode") int statusCode) {
+ responseCode = statusCode;
+ return "Status code set";
+ }
+}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
new file mode 100644
index 00000000..415c5206
--- /dev/null
+++ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
@@ -0,0 +1,364 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-simulators
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.simulators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.drools.http.server.HttpServletServer;
+import org.onap.policy.drools.utils.LoggerUtil;
+import org.onap.policy.drools.utils.NetworkUtil;
+
+public class DMaaPSimulatorTest {
+
+ private static final int DMAAPSIM_SERVER_PORT = 6670;
+ @BeforeClass
+ public static void setUpSimulator() {
+ LoggerUtil.setLevel("ROOT", "INFO");
+ LoggerUtil.setLevel("org.eclipse.jetty", "WARN");
+ try {
+ final HttpServletServer testServer = HttpServletServer.factory.build("dmaapSim",
+ "localhost", DMAAPSIM_SERVER_PORT, "/", false, true);
+ testServer.addServletClass("/*", DMaaPSimulatorJaxRs.class.getName());
+ testServer.waitedStart(5000);
+ if (!NetworkUtil.isTcpPortOpen("localhost", testServer.getPort(), 5, 10000L))
+ throw new IllegalStateException("cannot connect to port " + testServer.getPort());
+ } catch (final Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDownSimulator() {
+ HttpServletServer.factory.destroy();
+ }
+
+ @Test
+ public void testGetNoData() {
+ int timeout = 1000;
+ Pair <Integer, String> response = dmaapGet("myTopicNoData", timeout);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals("No topic", response.b);
+ }
+
+ @Test
+ public void testSinglePost() {
+ String myTopic = "myTopicSinglePost";
+ String testData = "This is some test data";
+ Pair<Integer, String> response = dmaapPost(myTopic, testData);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapGet(myTopic, 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(testData, response.b);
+ }
+
+ @Test
+ public void testOneTopicMultiPost() {
+ String[] data = {"data point 1", "data point 2", "something random"};
+ String myTopic = "myTopicMultiPost";
+ Pair<Integer, String> response = dmaapPost(myTopic, data[0]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapPost(myTopic, data[1]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapPost(myTopic, data[2]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapGet(myTopic, 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[0], response.b);
+
+ response = dmaapGet(myTopic, 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[1], response.b);
+
+ response = dmaapGet(myTopic, 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[2], response.b);
+ }
+
+ @Test
+ public void testMultiTopic() {
+ String[][] data = {{"Topic one message one", "Topic one message two"}, {"Topic two message one", "Topic two message two"}};
+ String[] topics = {"topic1", "topic2"};
+
+ Pair<Integer, String> response = dmaapPost(topics[0], data[0][0]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapGet(topics[0], 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[0][0], response.b);
+
+ response = dmaapGet(topics[1], 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals("No topic", response.b);
+
+ response = dmaapPost(topics[1], data[1][0]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapPost(topics[1], data[1][1]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapPost(topics[0], data[0][1]);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapGet(topics[1], 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[1][0], response.b);
+
+ response = dmaapGet(topics[0], 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[0][1], response.b);
+
+ response = dmaapGet(topics[1], 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals(data[1][1], response.b);
+
+ response = dmaapGet(topics[0], 1000);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertEquals("No Data", response.b);
+ }
+
+ @Test
+ public void testResponseCode() {
+ Pair<Integer, String> response = dmaapPost("myTopic", "myTopicData");
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = setStatus(503);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapGet("myTopic", 500);
+ assertNotNull(response);
+ assertEquals(503, response.a.intValue());
+ assertEquals("You got response code: 503", response.b);
+
+ response = setStatus(202);
+ assertNotNull(response);
+ assertNotNull(response.a);
+ assertNotNull(response.b);
+
+ response = dmaapGet("myTopic", 500);
+ assertNotNull(response);
+ assertEquals(202, response.a.intValue());
+ assertEquals("myTopicData", response.b);
+ }
+
+ private static Pair<Integer, String> dmaapGet (String topic, int timeout) {
+ return dmaapGet(topic, "1", "1", timeout);
+ }
+
+ private static Pair<Integer, String> dmaapGet (String topic, String consumerGroup, String consumerId, int timeout) {
+ String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic + "/" + consumerGroup + "/" + consumerId + "?timeout=" + timeout;
+ try {
+ URLConnection conn = new URL(url).openConnection();
+ HttpURLConnection httpConn = null;
+ if (conn instanceof HttpURLConnection) {
+ httpConn = (HttpURLConnection) conn;
+ }
+ else {
+ fail("connection not set up right");
+ }
+ httpConn.setRequestMethod("GET");
+ httpConn.connect();
+ String response = "";
+ try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+ String line;
+ while((line = connReader.readLine()) != null) {
+ response += line;
+ }
+ httpConn.disconnect();
+ return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+ }
+ catch (IOException e) {
+ if (e.getMessage().startsWith("Server returned HTTP response code")) {
+ System.out.println("hi");
+ BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+ String line;
+ while((line = connReader.readLine()) != null) {
+ response += line;
+ }
+ httpConn.disconnect();
+ return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+ }
+ else {
+ fail("we got an exception: " + e);
+ }
+ }
+ }
+ catch (Exception e) {
+ fail("we got an exception" + e);
+ }
+
+ return null;
+ }
+
+ private static Pair<Integer, String> dmaapPost (String topic, String data) {
+ String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic;
+ byte[] postData = data.getBytes(StandardCharsets.UTF_8);
+ try {
+ URLConnection conn = new URL(url).openConnection();
+ HttpURLConnection httpConn = null;
+ if (conn instanceof HttpURLConnection) {
+ httpConn = (HttpURLConnection) conn;
+ }
+ else {
+ fail("connection not set up right");
+ }
+ httpConn.setRequestMethod("POST");
+ httpConn.setDoOutput(true);
+ httpConn.setRequestProperty( "Content-Type", "text/plain");
+ httpConn.setRequestProperty("Content-Length", ""+postData.length);
+ httpConn.connect();
+ String response = "";
+ try (DataOutputStream connWriter = new DataOutputStream(httpConn.getOutputStream())) {
+ connWriter.write(postData);
+ connWriter.flush();
+ }
+ try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+ String line;
+ while((line = connReader.readLine()) != null) {
+ response += line;
+ }
+ httpConn.disconnect();
+ return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+ }
+ catch (IOException e) {
+ if (e.getMessage().startsWith("Server returned HTTP response code")) {
+ System.out.println("hi");
+ BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+ String line;
+ while((line = connReader.readLine()) != null) {
+ response += line;
+ }
+ httpConn.disconnect();
+ return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+ }
+ else {
+ fail("we got an exception: " + e);
+ }
+ }
+ }
+ catch (Exception e) {
+ fail("we got an exception: " + e);
+ }
+ return null;
+ }
+
+ private static Pair<Integer, String> setStatus (int status) {
+ String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/setStatus?statusCode=" + status;
+ try {
+ URLConnection conn = new URL(url).openConnection();
+ HttpURLConnection httpConn = null;
+ if (conn instanceof HttpURLConnection) {
+ httpConn = (HttpURLConnection) conn;
+ }
+ else {
+ fail("connection not set up right");
+ }
+ httpConn.setRequestMethod("POST");
+ httpConn.connect();
+ String response = "";
+ try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+ String line;
+ while((line = connReader.readLine()) != null) {
+ response += line;
+ }
+ httpConn.disconnect();
+ return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+ }
+ catch (IOException e) {
+ if (e.getMessage().startsWith("Server returned HTTP response code")) {
+ System.out.println("hi");
+ BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+ String line;
+ while((line = connReader.readLine()) != null) {
+ response += line;
+ }
+ httpConn.disconnect();
+ return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+ }
+ else {
+ fail("we got an exception: " + e);
+ }
+ }
+ }
+ catch (Exception e) {
+ fail("we got an exception" + e);
+ }
+ return null;
+ }
+
+ private static class Pair<A, B> {
+ public final A a;
+ public final B b;
+
+ public Pair(A a, B b) {
+ this.a = a;
+ this.b = b;
+ }
+ }
+} \ No newline at end of file