diff options
Diffstat (limited to 'models-sim/models-sim-dmaap/src')
-rw-r--r-- | models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java index 42a653d6f..9de29cdac 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java @@ -24,6 +24,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.Response; @@ -42,6 +43,9 @@ import org.slf4j.LoggerFactory; public class DmaapSimProvider { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class); + // Recurring string constants + private static final String TOPIC_TAG = "Topic:"; + // Time for a get to wait before checking of a message has come private static final long DMAAP_SIM_WAIT_TIME = 50; @@ -63,20 +67,20 @@ public class DmaapSimProvider { * @return a response to the message */ public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) { - LOGGER.debug("Topic:" + topicName + ", Received DMaaP message: " + dmaapMessage); + LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage); synchronized (topicMessageMap) { SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName); if (messageMap == null) { messageMap = new TreeMap<>(); topicMessageMap.put(topicName, messageMap); - LOGGER.debug("Topic:" + topicName + ", created topic message map"); + LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map"); } int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1); messageMap.put(nextKey, dmaapMessage); - LOGGER.debug("Topic:" + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage); + LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage); } return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build(); @@ -94,7 +98,7 @@ public class DmaapSimProvider { public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId, final int timeout) { - LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId + LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout); MutablePair<Integer, String> consumerGroupPair = null; @@ -104,41 +108,42 @@ public class DmaapSimProvider { if (consumerGroupMap == null) { consumerGroupMap = new LinkedHashMap<>(); consumerGroupsMap.put(topicName, consumerGroupMap); - LOGGER.trace("Topic:" + topicName + ", Created consumer map entry for consumer group " + consumerGroup); + LOGGER.trace( + TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup); } consumerGroupPair = consumerGroupMap.get(consumerGroup); if (consumerGroupPair == null) { consumerGroupPair = new MutablePair<>(-1, consumerId); consumerGroupMap.put(consumerGroup, consumerGroupPair); - LOGGER.trace("Topic:" + topicName + ", Created consumer group entry for consumer group " + consumerGroup - + ":" + consumerId); + LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group " + + consumerGroup + ":" + consumerId); } } long timeOfTimeout = System.currentTimeMillis() + timeout; do { - Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair); if (waitingMessages != null) { - LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId + LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages); return Response.status(Response.Status.OK).entity(waitingMessages).build(); } try { - Thread.sleep(DMAAP_SIM_WAIT_TIME); + TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME); } catch (InterruptedException ie) { String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID " + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout; LOGGER.warn(errorMessage, ie); + Thread.currentThread().interrupt(); throw new DmaapSimRuntimeException(errorMessage, ie); } } while (timeOfTimeout > System.currentTimeMillis()); - LOGGER.trace("Topic:" + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId + LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout); return Response.status(Response.Status.REQUEST_TIMEOUT).build(); } @@ -168,13 +173,15 @@ public class DmaapSimProvider { } try { foundMessageList += new StandardCoder().encode(dmaapMessage); - } catch (CoderException e) { - e.printStackTrace(); + } catch (CoderException ce) { + String errorMessage = "Encoding error on message on DMaaP topic " + topicName; + LOGGER.warn(errorMessage, ce); + return null; } } foundMessageList += ']'; - LOGGER.debug("Topic:" + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft() + LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft() + " to " + messageMap.lastKey()); synchronized (consumerGroupsMap) { consumerGroupPair.setLeft(messageMap.lastKey()); |