summaryrefslogtreecommitdiffstats
path: root/models-sim/models-sim-dmaap/src
diff options
context:
space:
mode:
Diffstat (limited to 'models-sim/models-sim-dmaap/src')
-rw-r--r--models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java35
1 files changed, 21 insertions, 14 deletions
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
index 42a653d6f..9de29cdac 100644
--- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
+++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
@@ -42,6 +43,9 @@ import org.slf4j.LoggerFactory;
public class DmaapSimProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
+ // Recurring string constants
+ private static final String TOPIC_TAG = "Topic:";
+
// Time for a get to wait before checking of a message has come
private static final long DMAAP_SIM_WAIT_TIME = 50;
@@ -63,20 +67,20 @@ public class DmaapSimProvider {
* @return a response to the message
*/
public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
- LOGGER.debug("Topic:" + topicName + ", Received DMaaP message: " + dmaapMessage);
+ LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
synchronized (topicMessageMap) {
SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
if (messageMap == null) {
messageMap = new TreeMap<>();
topicMessageMap.put(topicName, messageMap);
- LOGGER.debug("Topic:" + topicName + ", created topic message map");
+ LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
}
int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
messageMap.put(nextKey, dmaapMessage);
- LOGGER.debug("Topic:" + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
+ LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
}
return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build();
@@ -94,7 +98,7 @@ public class DmaapSimProvider {
public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
final int timeout) {
- LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
+ LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
+ WITH_TIMEOUT + timeout);
MutablePair<Integer, String> consumerGroupPair = null;
@@ -104,41 +108,42 @@ public class DmaapSimProvider {
if (consumerGroupMap == null) {
consumerGroupMap = new LinkedHashMap<>();
consumerGroupsMap.put(topicName, consumerGroupMap);
- LOGGER.trace("Topic:" + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
+ LOGGER.trace(
+ TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
}
consumerGroupPair = consumerGroupMap.get(consumerGroup);
if (consumerGroupPair == null) {
consumerGroupPair = new MutablePair<>(-1, consumerId);
consumerGroupMap.put(consumerGroup, consumerGroupPair);
- LOGGER.trace("Topic:" + topicName + ", Created consumer group entry for consumer group " + consumerGroup
- + ":" + consumerId);
+ LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
+ + consumerGroup + ":" + consumerId);
}
}
long timeOfTimeout = System.currentTimeMillis() + timeout;
do {
-
Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
if (waitingMessages != null) {
- LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
+ LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
+ WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
return Response.status(Response.Status.OK).entity(waitingMessages).build();
}
try {
- Thread.sleep(DMAAP_SIM_WAIT_TIME);
+ TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
} catch (InterruptedException ie) {
String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
+ consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
LOGGER.warn(errorMessage, ie);
+ Thread.currentThread().interrupt();
throw new DmaapSimRuntimeException(errorMessage, ie);
}
}
while (timeOfTimeout > System.currentTimeMillis());
- LOGGER.trace("Topic:" + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
+ LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
+ WITH_TIMEOUT + timeout);
return Response.status(Response.Status.REQUEST_TIMEOUT).build();
}
@@ -168,13 +173,15 @@ public class DmaapSimProvider {
}
try {
foundMessageList += new StandardCoder().encode(dmaapMessage);
- } catch (CoderException e) {
- e.printStackTrace();
+ } catch (CoderException ce) {
+ String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
+ LOGGER.warn(errorMessage, ce);
+ return null;
}
}
foundMessageList += ']';
- LOGGER.debug("Topic:" + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
+ LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
+ " to " + messageMap.lastKey());
synchronized (consumerGroupsMap) {
consumerGroupPair.setLeft(messageMap.lastKey());