diff options
Diffstat (limited to 'feature-simulators/src/main/java/org')
-rw-r--r-- | feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java index 5e7861d2..8b653aae 100644 --- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java +++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java @@ -1,8 +1,8 @@ -/* +/*- * ============LICENSE_START======================================================= * feature-simulators * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,14 +43,16 @@ import org.slf4j.LoggerFactory; @Path("/events") public class DMaaPSimulatorJaxRs { - private static final String NO_DATA_MSG = "No Data"; + public static final String NO_TOPIC_MSG = "No topic"; + public static final String NO_DATA_MSG = "No Data"; + private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class); private static int responseCode = 200; /** * Get consumer ID. - * + * * @param timeout timeout value * @param topicName the dmaap topic * @param httpResponse http response object @@ -58,7 +60,7 @@ public class DMaaPSimulatorJaxRs { */ @GET @Path("/{topicName}/{consumeGroup}/{consumerId}") - public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") + public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") String topicName, @Context final HttpServletResponse httpResponse) { int currentRespCode = responseCode; httpResponse.setStatus(currentRespCode); @@ -79,14 +81,14 @@ public class DMaaPSimulatorJaxRs { else if (timeout > 0) { return waitForNextMessageFromQueue(timeout, topicName); } - return "No topic"; + return NO_TOPIC_MSG; } private String getNextMessageFromQueue(final int timeout, final String topicName) { BlockingQueue<String> queue = queues.get(topicName); String response = NO_DATA_MSG; try { - response = queue.poll(timeout, TimeUnit.MILLISECONDS); + response = poll(queue, timeout); } catch (InterruptedException e) { logger.debug("error in DMaaP simulator", e); Thread.currentThread().interrupt(); @@ -99,7 +101,7 @@ public class DMaaPSimulatorJaxRs { private String waitForNextMessageFromQueue(int timeout, String topicName) { try { - Thread.sleep(timeout); + sleep(timeout); if (queues.containsKey(topicName)) { BlockingQueue<String> queue = queues.get(topicName); String response = queue.poll(); @@ -112,12 +114,12 @@ public class DMaaPSimulatorJaxRs { logger.debug("error in DMaaP simulator", e); Thread.currentThread().interrupt(); } - return "No topic"; + return NO_TOPIC_MSG; } /** * Post to a topic. - * + * * @param topicName name of the topic * @param body message * @return empty string @@ -142,12 +144,30 @@ public class DMaaPSimulatorJaxRs { return "Status code set"; } + // the following non-static methods may be overridden by junit tests + + protected String poll(BlockingQueue<String> queue, final int timeout) throws InterruptedException { + return queue.poll(timeout, TimeUnit.MILLISECONDS); + } + + protected void sleep(int timeout) throws InterruptedException { + Thread.sleep(timeout); + } + /** * Static method to set static response code, synchronized for multiple possible uses. - * + * * @param incomingResponseCode the response code to set */ private static synchronized void setResponseCode(final int incomingResponseCode) { - responseCode = incomingResponseCode; + responseCode = incomingResponseCode; + } + + /** + * Used only by junit tests to reset the simulator. + */ + protected static void reset() { + responseCode = 200; + queues.clear(); } } |