diff options
-rw-r--r-- | datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java | 44 | ||||
-rw-r--r-- | datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java | 38 |
2 files changed, 52 insertions, 30 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java index abdfa718..bef8dab2 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -72,7 +72,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { private boolean failed; private long failduration; private long resumetime; - File dir; + private File dir; private Vector<DeliveryTask> todo = new Vector<>(); /** @@ -80,7 +80,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * * @return The length of the task in bytes or 0 if the task cannot be cancelled. */ - public synchronized long cancelTask(String pubid) { + synchronized long cancelTask(String pubid) { if (working.get(pubid) != null) { return (0); } @@ -111,7 +111,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has succeeded. */ - public synchronized void markSuccess(DeliveryTask task) { + private synchronized void markSuccess(DeliveryTask task) { working.remove(task.getPublishId()); task.clean(); failed = false; @@ -121,14 +121,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has expired. */ - public synchronized void markExpired(DeliveryTask task) { + private synchronized void markExpired(DeliveryTask task) { task.clean(); } /** * Mark that a delivery task has failed permanently. */ - public synchronized void markFailNoRetry(DeliveryTask task) { + private synchronized void markFailNoRetry(DeliveryTask task) { working.remove(task.getPublishId()); task.clean(); failed = false; @@ -157,7 +157,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has been redirected. */ - public synchronized void markRedirect(DeliveryTask task) { + private synchronized void markRedirect(DeliveryTask task) { working.remove(task.getPublishId()); retry.put(task.getPublishId(), task); } @@ -165,7 +165,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has temporarily failed. */ - public synchronized void markFailWithRetry(DeliveryTask task) { + private synchronized void markFailWithRetry(DeliveryTask task) { working.remove(task.getPublishId()); retry.put(task.getPublishId(), task); fdupdate(); @@ -174,7 +174,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Get the next task. */ - public synchronized DeliveryTask getNext() { + synchronized DeliveryTask getNext() { DeliveryTask ret = peekNext(); if (ret != null) { todoindex++; @@ -186,7 +186,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Peek at the next task. */ - public synchronized DeliveryTask peekNext() { + synchronized DeliveryTask peekNext() { long now = System.currentTimeMillis(); long mindate = now - deliveryQueueHelper.getExpirationTimer(); if (failed) { @@ -199,7 +199,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { while (true) { if (todoindex >= todo.size()) { todoindex = 0; - todo = new Vector<DeliveryTask>(); + todo = new Vector<>(); String[] files = dir.list(); Arrays.sort(files); for (String fname : files) { @@ -228,7 +228,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { } todo.add(dt); } - retry = new Hashtable<String, DeliveryTask>(); + retry = new Hashtable<>(); } if (todoindex < todo.size()) { DeliveryTask dt = todo.get(todoindex); @@ -236,6 +236,11 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { todoindex++; continue; } + if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) { + retry.put(dt.getPublishId(), dt); + todoindex++; + continue; + } if (dt.getDate() >= mindate) { return (dt); } @@ -250,7 +255,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Create a delivery queue for a given destination info */ - public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { + DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { this.deliveryQueueHelper = deliveryQueueHelper; this.destinationInfo = destinationInfo; dir = new File(destinationInfo.getSpool()); @@ -288,7 +293,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Message too old to deliver */ - public void reportExpiry(DeliveryTask task) { + void reportExpiry(DeliveryTask task) { StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); markExpired(task); } @@ -298,8 +303,9 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); - if (destinationInfo.isPrivilegedSubscriber()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); + if (destinationInfo.isPrivilegedSubscriber()) { + task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer()); markFailWithRetry(task); } else { markSuccess(task); @@ -367,21 +373,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Is there no work to do for this queue right now? */ - public synchronized boolean isSkipSet() { + synchronized boolean isSkipSet() { return (peekNext() == null); } /** * Reset the retry timer */ - public void resetQueue() { + void resetQueue() { resumetime = System.currentTimeMillis(); } /** * Get task if in queue and mark as success */ - public boolean markTaskSuccess(String pubId) { + boolean markTaskSuccess(String pubId) { DeliveryTask task = working.get(pubId); if (task != null) { markSuccess(task); @@ -391,7 +397,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (task != null) { retry.remove(pubId); task.clean(); - resumetime = 0; + resetQueue(); failduration = 0; return true; } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index a3af88fc..c085ebe7 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -66,6 +66,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { private int attempts; private String[][] hdrs; private String newInvocationId; + private long resumeTime; /** @@ -76,7 +77,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { * the base for the file name in the spool directory and is of * the form <milliseconds since 1970>.<fqdn of initial data router node> */ - public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { + DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; destInfo = deliveryTaskHelper.getDestinationInfo(); @@ -89,6 +90,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { metafile = new File(mfn); boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); + resumeTime = System.currentTimeMillis(); Vector<String[]> hdrv = new Vector<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { @@ -165,7 +167,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Get the publish ID */ - public String getPublishId() { + String getPublishId() { return (pubid); } @@ -332,7 +334,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Remove meta and data files */ - public void clean() { + void clean() { datafile.delete(); metafile.delete(); eelflogger.info(EelfMsgs.INVOKE, newInvocationId); @@ -341,9 +343,23 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } /** + * Set the resume time for a delivery task. + */ + void setResumeTime(long resumeTime) { + this.resumeTime = resumeTime; + } + + /** + * Get the resume time for a delivery task. + */ + long getResumeTime() { + return resumeTime; + } + + /** * Has this delivery task been cleaned? */ - public boolean isCleaned() { + boolean isCleaned() { return (hdrs == null); } @@ -357,7 +373,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Get creation date as encoded in the publish ID. */ - public long getDate() { + long getDate() { return (date); } @@ -371,42 +387,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Get the content type */ - public String getCType() { + String getCType() { return (ctype); } /** * Get the method */ - public String getMethod() { + String getMethod() { return (method); } /** * Get the file ID */ - public String getFileId() { + String getFileId() { return (fileid); } /** * Get the number of delivery attempts */ - public int getAttempts() { + int getAttempts() { return (attempts); } /** * Get the (space delimited list of) subscription ID for this delivery task */ - public String getSubId() { + String getSubId() { return (subid); } /** * Get the feed ID for this delivery task */ - public String getFeedId() { + String getFeedId() { return (feedid); } } |