diff options
Diffstat (limited to 'feature-simulators/src/main')
-rw-r--r-- | feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java | 184 |
1 files changed, 99 insertions, 85 deletions
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java index 44700890..5e7861d2 100644 --- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java +++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java @@ -22,11 +22,10 @@ package org.onap.policy.drools.simulators; import java.io.IOException; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.concurrent.BlockingQueue; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; @@ -44,96 +43,111 @@ import org.slf4j.LoggerFactory; @Path("/events") public class DMaaPSimulatorJaxRs { - private static final String NO_DATA_MSG = "No Data"; - private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class); - private static int responseCode = 200; + private static final String NO_DATA_MSG = "No Data"; + private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class); + private static int responseCode = 200; + + /** + * Get consumer ID. + * + * @param timeout timeout value + * @param topicName the dmaap topic + * @param httpResponse http response object + * @return topic or error message + */ + @GET + @Path("/{topicName}/{consumeGroup}/{consumerId}") + public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") + String topicName, @Context final HttpServletResponse httpResponse) { + int currentRespCode = responseCode; + httpResponse.setStatus(currentRespCode); + try { + httpResponse.flushBuffer(); + } catch (IOException e) { + logger.error("flushBuffer threw: ", e); + return "Got an error"; + } - @GET - @Path("/{topicName}/{consumeGroup}/{consumerId}") - public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") String topicName, - @Context final HttpServletResponse httpResponse) { - int currentRespCode = responseCode; - httpResponse.setStatus(currentRespCode); - try { - httpResponse.flushBuffer(); - } catch (IOException e) { - logger.error("flushBuffer threw: ", e); - return "Got an error"; - } + if (currentRespCode < 200 || currentRespCode >= 300) { + return "You got response code: " + currentRespCode; + } - if (currentRespCode < 200 || currentRespCode >= 300) - { - return "You got response code: " + currentRespCode; - } + if (queues.containsKey(topicName)) { + return getNextMessageFromQueue(timeout, topicName); + } + else if (timeout > 0) { + return waitForNextMessageFromQueue(timeout, topicName); + } + return "No topic"; + } - if (queues.containsKey(topicName)) { - return getNextMessageFromQueue(timeout, topicName); - } - else if (timeout > 0) { - return waitForNextMessageFromQueue(timeout, topicName); - } - return "No topic"; - } + private String getNextMessageFromQueue(final int timeout, final String topicName) { + BlockingQueue<String> queue = queues.get(topicName); + String response = NO_DATA_MSG; + try { + response = queue.poll(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.debug("error in DMaaP simulator", e); + Thread.currentThread().interrupt(); + } + if (response == null) { + response = NO_DATA_MSG; + } + return response; + } - private String getNextMessageFromQueue(final int timeout, final String topicName) { - BlockingQueue<String> queue = queues.get(topicName); - String response = NO_DATA_MSG; - try { - response = queue.poll(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - logger.debug("error in DMaaP simulator", e); - Thread.currentThread().interrupt(); - } - if (response == null) { - response = NO_DATA_MSG; - } - return response; - } + private String waitForNextMessageFromQueue(int timeout, String topicName) { + try { + Thread.sleep(timeout); + if (queues.containsKey(topicName)) { + BlockingQueue<String> queue = queues.get(topicName); + String response = queue.poll(); + if (response == null) { + response = NO_DATA_MSG; + } + return response; + } + } catch (InterruptedException e) { + logger.debug("error in DMaaP simulator", e); + Thread.currentThread().interrupt(); + } + return "No topic"; + } - private String waitForNextMessageFromQueue(int timeout, String topicName) { - try { - Thread.sleep(timeout); - if (queues.containsKey(topicName)) { - BlockingQueue<String> queue = queues.get(topicName); - String response = queue.poll(); - if (response == null) { - response = NO_DATA_MSG; - } - return response; - } - } catch (InterruptedException e) { - logger.debug("error in DMaaP simulator", e); - Thread.currentThread().interrupt(); - } - return "No topic"; - } + /** + * Post to a topic. + * + * @param topicName name of the topic + * @param body message + * @return empty string + */ + @POST + @Path("/{topicName}") + @Consumes(MediaType.TEXT_PLAIN) + public String publish(@PathParam("topicName") String topicName, String body) { + BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>()); - @POST - @Path("/{topicName}") - @Consumes(MediaType.TEXT_PLAIN) - public String publish(@PathParam("topicName") String topicName, String body) { - BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>()); - - if (!queue.offer(body)) { - logger.warn("error on topic {}, failed to place body {} on queue", topicName, body); - } + if (!queue.offer(body)) { + logger.warn("error on topic {}, failed to place body {} on queue", topicName, body); + } - return ""; - } + return ""; + } - @POST - @Path("/setStatus") - public String setStatus(@QueryParam("statusCode") int statusCode) { - setResponseCode(statusCode); - return "Status code set"; - } + @POST + @Path("/setStatus") + public String setStatus(@QueryParam("statusCode") int statusCode) { + setResponseCode(statusCode); + return "Status code set"; + } - /** - * Static method to set static response code, synchronized for multiple possible uses - * @param incomingResponseCode - */ - private static synchronized void setResponseCode(final int incomingResponseCode) { - responseCode = incomingResponseCode; - } + /** + * Static method to set static response code, synchronized for multiple possible uses. + * + * @param incomingResponseCode the response code to set + */ + private static synchronized void setResponseCode(final int incomingResponseCode) { + responseCode = incomingResponseCode; + } } |