diff options
Diffstat (limited to 'feature-simulators/src/main/java')
-rw-r--r-- | feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java | 122 |
1 files changed, 68 insertions, 54 deletions
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java index 2513a040..44700890 100644 --- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java +++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.concurrent.BlockingQueue; import javax.servlet.http.HttpServletResponse; @@ -43,83 +44,96 @@ import org.slf4j.LoggerFactory; @Path("/events") public class DMaaPSimulatorJaxRs { - private static final String NO_DATA_MSG = "No Data"; private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class); private static int responseCode = 200; - + @GET @Path("/{topicName}/{consumeGroup}/{consumerId}") public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") String topicName, - @Context final HttpServletResponse httpResponse) { - int currentRespCode = responseCode; - httpResponse.setStatus(currentRespCode); - try { - httpResponse.flushBuffer(); - } catch (IOException e) { - logger.error("flushBuffer threw: ", e); - return "Got an error"; - } - - if (currentRespCode < 200 || currentRespCode >= 300) - { - return "You got response code: " + currentRespCode; - } + @Context final HttpServletResponse httpResponse) { + int currentRespCode = responseCode; + httpResponse.setStatus(currentRespCode); + try { + httpResponse.flushBuffer(); + } catch (IOException e) { + logger.error("flushBuffer threw: ", e); + return "Got an error"; + } + + if (currentRespCode < 200 || currentRespCode >= 300) + { + return "You got response code: " + currentRespCode; + } + if (queues.containsKey(topicName)) { - BlockingQueue<String> queue = queues.get(topicName); - String response = NO_DATA_MSG; - try { - response = queue.poll(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - logger.debug("error in DMaaP simulator", e); - Thread.currentThread().interrupt(); - } - if (response == null) { - response = NO_DATA_MSG; - } - return response; + return getNextMessageFromQueue(timeout, topicName); } else if (timeout > 0) { - try { - Thread.sleep(timeout); - if (queues.containsKey(topicName)) { - BlockingQueue<String> queue = queues.get(topicName); - String response = queue.poll(); - if (response == null) { - response = NO_DATA_MSG; - } - return response; + return waitForNextMessageFromQueue(timeout, topicName); + } + return "No topic"; + } + + private String getNextMessageFromQueue(final int timeout, final String topicName) { + BlockingQueue<String> queue = queues.get(topicName); + String response = NO_DATA_MSG; + try { + response = queue.poll(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.debug("error in DMaaP simulator", e); + Thread.currentThread().interrupt(); + } + if (response == null) { + response = NO_DATA_MSG; + } + return response; + } + + private String waitForNextMessageFromQueue(int timeout, String topicName) { + try { + Thread.sleep(timeout); + if (queues.containsKey(topicName)) { + BlockingQueue<String> queue = queues.get(topicName); + String response = queue.poll(); + if (response == null) { + response = NO_DATA_MSG; } - } catch (InterruptedException e) { - logger.debug("error in DMaaP simulator", e); - Thread.currentThread().interrupt(); + return response; } + } catch (InterruptedException e) { + logger.debug("error in DMaaP simulator", e); + Thread.currentThread().interrupt(); } return "No topic"; } - + @POST @Path("/{topicName}") @Consumes(MediaType.TEXT_PLAIN) - public String publish(@PathParam("topicName") String topicName, String body) { - if (queues.containsKey(topicName)) { - BlockingQueue<String> queue = queues.get(topicName); - queue.offer(body); - } - else { - BlockingQueue<String> queue = new LinkedBlockingQueue<>(); - queue.offer(body); - queues.put(topicName, queue); - } + public String publish(@PathParam("topicName") String topicName, String body) { + BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>()); + if (!queue.offer(body)) { + logger.warn("error on topic {}, failed to place body {} on queue", topicName, body); + } + return ""; } - + @POST @Path("/setStatus") public String setStatus(@QueryParam("statusCode") int statusCode) { - responseCode = statusCode; - return "Status code set"; + setResponseCode(statusCode); + return "Status code set"; + } + + /** + * Static method to set static response code, synchronized for multiple possible uses + * @param incomingResponseCode + */ + private static synchronized void setResponseCode(final int incomingResponseCode) { + responseCode = incomingResponseCode; } } |