summaryrefslogtreecommitdiffstats
path: root/feature-simulators/src
diff options
context:
space:
mode:
Diffstat (limited to 'feature-simulators/src')
-rw-r--r--feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java167
-rw-r--r--feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java219
-rw-r--r--feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java200
3 files changed, 0 insertions, 586 deletions
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
deleted file mode 100644
index c330b80c..00000000
--- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * feature-simulators
- * ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.simulators;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Path("/events")
-public class DMaaPSimulatorJaxRs {
- public static final String NO_TOPIC_MSG = "No topic";
- public static final String NO_DATA_MSG = "No Data";
-
- private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>();
- private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
- private static int responseCode = 200;
-
- /**
- * Get consumer ID.
- *
- * @param timeout timeout value
- * @param topicName the dmaap topic
- * @param httpResponse http response object
- * @return topic or error message
- */
- @GET
- @Path("/{topicName}/{consumeGroup}/{consumerId}")
- public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName")
- String topicName, @Context final HttpServletResponse httpResponse) {
- int currentRespCode = responseCode;
- httpResponse.setStatus(currentRespCode);
- try {
- httpResponse.flushBuffer();
- } catch (IOException e) {
- logger.error("flushBuffer threw: ", e);
- return "Got an error";
- }
-
- if (currentRespCode < 200 || currentRespCode >= 300) {
- return "You got response code: " + currentRespCode;
- }
-
- if (queues.containsKey(topicName)) {
- return getNextMessageFromQueue(timeout, topicName);
- } else if (timeout > 0) {
- return waitForNextMessageFromQueue(timeout, topicName);
- }
- return NO_TOPIC_MSG;
- }
-
- private String getNextMessageFromQueue(final int timeout, final String topicName) {
- BlockingQueue<String> queue = queues.get(topicName);
- String response = NO_DATA_MSG;
- try {
- response = poll(queue, timeout);
- } catch (InterruptedException e) {
- logger.debug("error in DMaaP simulator", e);
- Thread.currentThread().interrupt();
- }
- if (response == null) {
- response = NO_DATA_MSG;
- }
- return response;
- }
-
- protected String waitForNextMessageFromQueue(int timeout, String topicName) {
- try {
- sleep(timeout);
- if (queues.containsKey(topicName)) {
- BlockingQueue<String> queue = queues.get(topicName);
- String response = queue.poll();
- if (response == null) {
- response = NO_DATA_MSG;
- }
- return response;
- }
- } catch (InterruptedException e) {
- logger.debug("error in DMaaP simulator", e);
- Thread.currentThread().interrupt();
- }
- return NO_TOPIC_MSG;
- }
-
- /**
- * Post to a topic.
- *
- * @param topicName name of the topic
- * @param body message
- * @return empty string
- */
- @POST
- @Path("/{topicName}")
- @Consumes(MediaType.TEXT_PLAIN)
- public String publish(@PathParam("topicName") String topicName, String body) {
- BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>());
- queue.add(body);
-
- return "";
- }
-
- @POST
- @Path("/setStatus")
- public String setStatus(@QueryParam("statusCode") int statusCode) {
- setResponseCode(statusCode);
- return "Status code set";
- }
-
- // the following non-static methods may be overridden by junit tests
-
- protected String poll(BlockingQueue<String> queue, final int timeout) throws InterruptedException {
- return queue.poll(timeout, TimeUnit.MILLISECONDS);
- }
-
- protected void sleep(int timeout) throws InterruptedException {
- Thread.sleep(timeout);
- }
-
- /**
- * Static method to set static response code, synchronized for multiple possible uses.
- *
- * @param incomingResponseCode the response code to set
- */
- private static synchronized void setResponseCode(final int incomingResponseCode) {
- responseCode = incomingResponseCode;
- }
-
- /**
- * Used only by junit tests to reset the simulator.
- */
- protected static void reset() {
- responseCode = 200;
- queues.clear();
- }
-}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
deleted file mode 100644
index 6e1a8d68..00000000
--- a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.simulators;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.servlet.http.HttpServletResponse;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class DMaaPSimulatorJaxRsTest {
- private static final String MESSAGE = "hello";
- private static final String MESSAGE2 = "world";
- private static final String EXPECTED_EXCEPTION = "expected exception";
- private static final String TOPIC = "my-topic";
- private static final int TIMEOUT_MS = 10;
- private static final int LONG_TIMEOUT_MS = 250;
-
- @Mock
- private HttpServletResponse resp;
-
- private DMaaPSimulatorJaxRs sim;
-
- /**
- * Initializes objects and creates the simulator.
- */
- @Before
- public void setUp() {
- sim = new DMaaPSimulatorJaxRs();
- }
-
- @After
- public void tearDown() {
- DMaaPSimulatorJaxRs.reset();
- }
-
- @Test
- public void testSubscribe() {
- sim.publish(TOPIC, MESSAGE);
-
- assertEquals(MESSAGE, sim.subscribe(0, TOPIC, resp));
- }
-
- @Test
- public void testSubscribe_FlushEx() throws IOException {
- doThrow(new IOException(EXPECTED_EXCEPTION)).when(resp).flushBuffer();
-
- assertEquals("Got an error", sim.subscribe(TIMEOUT_MS, TOPIC, resp));
- }
-
- @Test
- public void testSubscribe_BadStatus_testSetResponseCode() {
- sim.setStatus(199);
- assertEquals("You got response code: 199", sim.subscribe(TIMEOUT_MS, TOPIC, resp));
-
- sim.setStatus(300);
- assertEquals("You got response code: 300", sim.subscribe(TIMEOUT_MS, TOPIC, resp));
- }
-
- @Test
- public void testSubscribe_UnknownTopic_ZeroTimeout() {
- assertEquals(DMaaPSimulatorJaxRs.NO_TOPIC_MSG, sim.subscribe(0, TOPIC, resp));
- }
-
- @Test
- public void testSubscribe_UnknownTopic_NonZeroTimeout() {
- assertEquals(DMaaPSimulatorJaxRs.NO_TOPIC_MSG, sim.subscribe(TIMEOUT_MS, TOPIC, resp));
- }
-
- @Test
- public void testGetNextMessageFromQueue() {
- sim.publish(TOPIC, MESSAGE);
- sim.publish(TOPIC, MESSAGE2);
-
- assertEquals(MESSAGE, sim.subscribe(0, TOPIC, resp));
- assertEquals(MESSAGE2, sim.subscribe(0, TOPIC, resp));
-
- // repeat - no message
- assertEquals(DMaaPSimulatorJaxRs.NO_DATA_MSG, sim.subscribe(0, TOPIC, resp));
- }
-
- @Test
- public void testGetNextMessageFromQueue_Interrupted() throws InterruptedException {
- sim = new DMaaPSimulatorJaxRs() {
- @Override
- protected String poll(BlockingQueue<String> queue, int timeout) throws InterruptedException {
- throw new InterruptedException(EXPECTED_EXCEPTION);
- }
- };
-
- sim.publish(TOPIC, MESSAGE);
-
- // put it in the background so we don't interrupt the test thread
- BlockingQueue<String> queue = new LinkedBlockingQueue<>();
- backgroundSubscribe(queue);
-
- assertEquals(DMaaPSimulatorJaxRs.NO_DATA_MSG, queue.take());
- }
-
- @Test
- public void testWaitForNextMessageFromQueue() throws InterruptedException {
- CountDownLatch waitCalled = new CountDownLatch(1);
-
- sim = new DMaaPSimulatorJaxRs() {
- @Override
- protected String waitForNextMessageFromQueue(int timeout, String topicName) {
- waitCalled.countDown();
- return super.waitForNextMessageFromQueue(timeout, topicName);
- }
- };
-
- BlockingQueue<String> queue = new LinkedBlockingQueue<>();
-
- CountDownLatch latch1 = backgroundSubscribe(queue);
- CountDownLatch latch2 = backgroundSubscribe(queue);
-
- // wait for both threads to start
- latch1.await();
- latch2.await();
-
- /*
- * Must pause to prevent the topic from being created before subscribe() is
- * invoked.
- */
- assertTrue(waitCalled.await(1, TimeUnit.SECONDS));
-
- // only publish one message
- sim.publish(TOPIC, MESSAGE);
-
- // wait for both subscribers to add their messages to the queue
- List<String> messages = new ArrayList<>();
- messages.add(queue.take());
- messages.add(queue.take());
-
- // sort them so the order is consistent
- Collections.sort(messages);
-
- assertEquals("[No Data, hello]", messages.toString());
- }
-
- @Test
- public void testWaitForNextMessageFromQueue_Interrupted() throws InterruptedException {
- sim = new DMaaPSimulatorJaxRs() {
- @Override
- protected void sleep(int timeout) throws InterruptedException {
- throw new InterruptedException(EXPECTED_EXCEPTION);
- }
- };
-
- // put it in the background so we don't interrupt the test thread
- BlockingQueue<String> queue = new LinkedBlockingQueue<>();
- backgroundSubscribe(queue);
-
- assertEquals(DMaaPSimulatorJaxRs.NO_TOPIC_MSG, queue.take());
- }
-
- @Test
- public void testPublish() {
- assertEquals("", sim.publish(TOPIC, MESSAGE));
- assertEquals(MESSAGE, sim.subscribe(0, TOPIC, resp));
- }
-
- @Test
- public void testSetStatus() {
- assertEquals("Status code set", sim.setStatus(500));
- assertEquals("You got response code: 500", sim.subscribe(TIMEOUT_MS, TOPIC, resp));
- }
-
- /**
- * Invokes subscribe() in a background thread.
- *
- * @param queue where to place the returned result
- * @return a latch that will be counted down just before the background thread invokes
- * subscribe()
- */
- private CountDownLatch backgroundSubscribe(BlockingQueue<String> queue) {
- CountDownLatch latch = new CountDownLatch(1);
-
- new Thread(() -> {
- latch.countDown();
- queue.add(sim.subscribe(LONG_TIMEOUT_MS, TOPIC, resp));
- }).start();
-
- return latch;
- }
-}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
deleted file mode 100644
index ad01870b..00000000
--- a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * feature-simulators
- * ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.simulators;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.http.server.HttpServletServer;
-import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
-import org.onap.policy.common.utils.network.NetworkUtil;
-import org.onap.policy.drools.utils.logging.LoggerUtil;
-
-public class DMaaPSimulatorTest {
-
- private static int DMAAPSIM_SERVER_PORT;
-
- /**
- * Setup the simulator.
- * @throws IOException if a server port cannot be allocated
- */
- @BeforeClass
- public static void setUpSimulator() throws IOException {
- LoggerUtil.setLevel("ROOT", "INFO");
- LoggerUtil.setLevel("org.eclipse.jetty", "WARN");
- DMAAPSIM_SERVER_PORT = NetworkUtil.allocPort();
- try {
- final HttpServletServer testServer = HttpServletServerFactoryInstance.getServerFactory().build("dmaapSim",
- "localhost", DMAAPSIM_SERVER_PORT, "/", false, true);
- testServer.addServletClass("/*", DMaaPSimulatorJaxRs.class.getName());
- testServer.waitedStart(5000);
- if (!NetworkUtil.isTcpPortOpen("localhost", testServer.getPort(), 5, 10000L)) {
- throw new IllegalStateException("cannot connect to port " + testServer.getPort());
- }
- } catch (final Exception e) {
- fail(e.getMessage());
- }
- }
-
- @AfterClass
- public static void tearDownSimulator() {
- HttpServletServerFactoryInstance.getServerFactory().destroy();
- }
-
- @Test
- public void testGetNoData() throws IOException {
- validateResponse(DMaaPSimulatorJaxRs.NO_TOPIC_MSG, dmaapGet("myTopicNoData", 1000));
- }
-
- @Test
- public void testSinglePost() throws IOException {
- String myTopic = "myTopicSinglePost";
- String testData = "This is some test data";
-
- validateResponse(dmaapPost(myTopic, testData));
-
- validateResponse(testData, dmaapGet(myTopic, 1000));
- }
-
- @Test
- public void testOneTopicMultiPost() throws IOException {
- String[] data = {"data point 1", "data point 2", "something random"};
- String myTopic = "myTopicMultiPost";
-
- validateResponse(dmaapPost(myTopic, data[0]));
- validateResponse(dmaapPost(myTopic, data[1]));
- validateResponse(dmaapPost(myTopic, data[2]));
-
- validateResponse(data[0], dmaapGet(myTopic, 1000));
- validateResponse(data[1], dmaapGet(myTopic, 1000));
- validateResponse(data[2], dmaapGet(myTopic, 1000));
- }
-
- @Test
- public void testMultiTopic() throws IOException {
- String[][] data = {{"Topic one message one", "Topic one message two"},
- {"Topic two message one", "Topic two message two"}};
- String[] topics = {"topic1", "topic2"};
-
- validateResponse(dmaapPost(topics[0], data[0][0]));
-
- validateResponse(data[0][0], dmaapGet(topics[0], 1000));
- validateResponse(DMaaPSimulatorJaxRs.NO_TOPIC_MSG, dmaapGet(topics[1], 1000));
-
- validateResponse(dmaapPost(topics[1], data[1][0]));
- validateResponse(dmaapPost(topics[1], data[1][1]));
- validateResponse(dmaapPost(topics[0], data[0][1]));
-
- validateResponse(data[1][0], dmaapGet(topics[1], 1000));
- validateResponse(data[0][1], dmaapGet(topics[0], 1000));
- validateResponse(data[1][1], dmaapGet(topics[1], 1000));
- validateResponse(DMaaPSimulatorJaxRs.NO_DATA_MSG, dmaapGet(topics[0], 1000));
- }
-
- @Test
- public void testResponseCode() throws IOException {
- validateResponse(dmaapPost("myTopic", "myTopicData"));
-
- validateResponse(setStatus(503));
- validateResponse(503, "You got response code: 503", dmaapGet("myTopic", 500));
-
- validateResponse(setStatus(202));
- validateResponse(202, "myTopicData", dmaapGet("myTopic", 500));
- }
-
- private void validateResponse(Pair<Integer, String> response) {
- assertNotNull(response);
- assertNotNull(response.getLeft());
- assertNotNull(response.getRight());
- }
-
- private void validateResponse(int expectedCode, String expectedResponse, Pair<Integer, String> response) {
- assertNotNull(response);
- assertEquals(expectedCode, response.getLeft().intValue());
- assertEquals(expectedResponse, response.getRight());
- }
-
- private void validateResponse(String expectedResponse, Pair<Integer, String> response) {
- assertNotNull(response);
- assertNotNull(response.getLeft());
- assertEquals(expectedResponse, response.getRight());
- }
-
- private static Pair<Integer, String> dmaapGet(String topic, int timeout) throws IOException {
- return dmaapGet(topic, "1", "1", timeout);
- }
-
- private static Pair<Integer, String> dmaapGet(String topic, String consumerGroup, String consumerId, int timeout)
- throws IOException {
- String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic + "/" + consumerGroup + "/"
- + consumerId + "?timeout=" + timeout;
- HttpURLConnection httpConn = (HttpURLConnection) new URL(url).openConnection();
- httpConn.setRequestMethod("GET");
- httpConn.connect();
- return getResponse(httpConn);
- }
-
- private static Pair<Integer, String> dmaapPost(String topic, String data) throws IOException {
- String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic;
- byte[] postData = data.getBytes(StandardCharsets.UTF_8);
- HttpURLConnection httpConn = (HttpURLConnection) new URL(url).openConnection();
- httpConn.setRequestMethod("POST");
- httpConn.setDoOutput(true);
- httpConn.setRequestProperty("Content-Type", "text/plain");
- httpConn.setRequestProperty("Content-Length", "" + postData.length);
- httpConn.connect();
- IOUtils.write(postData, httpConn.getOutputStream());
- return getResponse(httpConn);
- }
-
- private static Pair<Integer, String> setStatus(int status) throws IOException {
- String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/setStatus?statusCode=" + status;
- HttpURLConnection httpConn = (HttpURLConnection) new URL(url).openConnection();
- httpConn.setRequestMethod("POST");
- httpConn.connect();
- return getResponse(httpConn);
- }
-
- private static Pair<Integer, String> getResponse(HttpURLConnection httpConn) throws IOException {
- try {
- String response = IOUtils.toString(httpConn.getInputStream(), StandardCharsets.UTF_8);
- return Pair.of(httpConn.getResponseCode(), response);
-
- } catch (IOException e) {
- if (e.getMessage().startsWith("Server returned HTTP response code")) {
- String response = IOUtils.toString(httpConn.getErrorStream(), StandardCharsets.UTF_8);
- return Pair.of(httpConn.getResponseCode(), response);
- }
-
- throw e;
- }
- }
-}