diff options
author | econwar <conor.ward@est.tech> | 2019-02-14 09:37:44 +0000 |
---|---|---|
committer | econwar <conor.ward@est.tech> | 2019-02-14 09:37:44 +0000 |
commit | c50374709585766e887f349a139de0a6595c1ca1 (patch) | |
tree | fe428b922da00abfabaafffdc72596c582625906 /datarouter-node/src/main | |
parent | 953dbd55595d28ecc7b65413b0edf910da6f45c0 (diff) |
Add optional API for PM Mapper
Added new field to Subscriber class to keep files after published
Added new Delete endpoint so that file can then be deleted
Change-Id: Id72da67689a7ceda8ddd4997cd6349b981cb1cdb
Issue-ID: DMAAP-981
Signed-off-by: econwar <conor.ward@est.tech>
Diffstat (limited to 'datarouter-node/src/main')
12 files changed, 356 insertions, 163 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java index ae4f13bf..d2600d23 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java @@ -267,4 +267,18 @@ public class Delivery { } } } + + /** + * Mark the task in spool a success + */ + public synchronized boolean markTaskSuccess(String spool, String pubId) { + boolean succeeded = false; + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + succeeded = dq.markTaskSuccess(pubId); + } + } + return succeeded; + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java index ad746255..abdfa718 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -64,16 +64,16 @@ import java.util.*; * failure timer is active or if no files are found in a directory scan. */ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { - private DeliveryQueueHelper dqh; - private DestInfo di; - private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>(); - private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>(); + private DeliveryQueueHelper deliveryQueueHelper; + private DestInfo destinationInfo; + private Hashtable<String, DeliveryTask> working = new Hashtable<>(); + private Hashtable<String, DeliveryTask> retry = new Hashtable<>(); private int todoindex; private boolean failed; private long failduration; private long resumetime; File dir; - private Vector<DeliveryTask> todo = new Vector<DeliveryTask>(); + private Vector<DeliveryTask> todo = new Vector<>(); /** * Try to cancel a delivery task. @@ -139,11 +139,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (!failed) { failed = true; if (failduration == 0) { - failduration = dqh.getInitFailureTimer(); + if (destinationInfo.isPrivilegedSubscriber()) { + failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer(); + } else{ + failduration = deliveryQueueHelper.getInitFailureTimer(); + } } resumetime = System.currentTimeMillis() + failduration; - long maxdur = dqh.getMaxFailureTimer(); - failduration = (long) (failduration * dqh.getFailureBackoff()); + long maxdur = deliveryQueueHelper.getMaxFailureTimer(); + failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff()); if (failduration > maxdur) { failduration = maxdur; } @@ -184,7 +188,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public synchronized DeliveryTask peekNext() { long now = System.currentTimeMillis(); - long mindate = now - dqh.getExpirationTimer(); + long mindate = now - deliveryQueueHelper.getExpirationTimer(); if (failed) { if (now > resumetime) { failed = false; @@ -246,32 +250,32 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Create a delivery queue for a given destination info */ - public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) { - this.dqh = dqh; - this.di = di; - dir = new File(di.getSpool()); + public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { + this.deliveryQueueHelper = deliveryQueueHelper; + this.destinationInfo = destinationInfo; + dir = new File(destinationInfo.getSpool()); dir.mkdirs(); } /** * Update the destination info for this delivery queue */ - public void config(DestInfo di) { - this.di = di; + public void config(DestInfo destinationInfo) { + this.destinationInfo = destinationInfo; } /** * Get the dest info */ - public DestInfo getDestInfo() { - return (di); + public DestInfo getDestinationInfo() { + return (destinationInfo); } /** * Get the config manager */ public DeliveryQueueHelper getConfig() { - return (dqh); + return (deliveryQueueHelper); } /** @@ -294,22 +298,26 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid); - markSuccess(task); - } else if (status < 400 && dqh.isFollowRedirects()) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); - if (dqh.handleRedirection(di, location, task.getFileId())) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); + if (destinationInfo.isPrivilegedSubscriber()) { + markFailWithRetry(task); + } else { + markSuccess(task); + } + } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); + if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) { markRedirect(task); } else { StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } - } else if (status < 500) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time. + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } else { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); markFailWithRetry(task); } } @@ -318,8 +326,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * Delivery failed by reason of an exception */ public void reportException(DeliveryTask task, Exception exception) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString()); - dqh.handleUnreachable(di); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString()); + deliveryQueueHelper.handleUnreachable(destinationInfo); markFailWithRetry(task); } @@ -330,14 +338,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * @return The feed ID */ public String getFeedId(String subid) { - return (dqh.getFeedId(subid)); + return (deliveryQueueHelper.getFeedId(subid)); } /** * Get the URL to deliver a message to given the file ID */ public String getDestURL(String fileid) { - return (dqh.getDestURL(di, fileid)); + return (deliveryQueueHelper.getDestURL(destinationInfo, fileid)); } /** @@ -346,8 +354,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void run() { DeliveryTask t; - long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit(); - int filestogo = dqh.getFairFileLimit(); + long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit(); + int filestogo = deliveryQueueHelper.getFairFileLimit(); while ((t = getNext()) != null) { t.run(); if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { @@ -369,4 +377,24 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { public void resetQueue() { resumetime = System.currentTimeMillis(); } + + /** + * Get task if in queue and mark as success + */ + public boolean markTaskSuccess(String pubId) { + DeliveryTask task = working.get(pubId); + if (task != null) { + markSuccess(task); + return true; + } + task = retry.get(pubId); + if (task != null) { + retry.remove(pubId); + task.clean(); + resumetime = 0; + failduration = 0; + return true; + } + return false; + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java index b1734cd4..5cf5fa4c 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java @@ -34,64 +34,69 @@ public interface DeliveryQueueHelper { /** * Get the timeout (milliseconds) before retrying after an initial delivery failure */ - public long getInitFailureTimer(); + long getInitFailureTimer(); + + /** + * Get the timeout before retrying after delivery and wait for file processing + */ + long getWaitForFileProcessFailureTimer(); /** * Get the ratio between timeouts on consecutive delivery attempts */ - public double getFailureBackoff(); + double getFailureBackoff(); /** * Get the maximum timeout (milliseconds) between delivery attempts */ - public long getMaxFailureTimer(); + long getMaxFailureTimer(); /** * Get the expiration timer (milliseconds) for deliveries */ - public long getExpirationTimer(); + long getExpirationTimer(); /** * Get the maximum number of file delivery attempts before checking * if another queue has work to be performed. */ - public int getFairFileLimit(); + int getFairFileLimit(); /** * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed. */ - public long getFairTimeLimit(); + long getFairTimeLimit(); /** * Get the URL for delivering a file * - * @param dest The destination information for the file to be delivered. + * @param destinationInfo The destination information for the file to be delivered. * @param fileid The file id for the file to be delivered. - * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid). + * @return The URL for delivering the file (typically, destinationInfo.getURL() + "/" + fileid). */ - public String getDestURL(DestInfo dest, String fileid); + String getDestURL(DestInfo destinationInfo, String fileid); /** * Forget redirections associated with a subscriber * - * @param dest Destination information to forget + * @param destinationInfo Destination information to forget */ - public void handleUnreachable(DestInfo dest); + void handleUnreachable(DestInfo destinationInfo); /** * Post redirection for a subscriber * - * @param dest Destination information to update + * @param destinationInfo Destination information to update * @param location Location given by subscriber * @param fileid File ID of request * @return true if this 3xx response is retryable, otherwise, false. */ - public boolean handleRedirection(DestInfo dest, String location, String fileid); + boolean handleRedirection(DestInfo destinationInfo, String location, String fileid); /** * Should I handle 3xx responses differently than 4xx responses? */ - public boolean isFollowRedirects(); + boolean isFollowRedirects(); /** * Get the feed ID for a subscription @@ -99,5 +104,5 @@ public interface DeliveryQueueHelper { * @param subid The subscription ID * @return The feed ID */ - public String getFeedId(String subid); + String getFeedId(String subid); } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index 80729905..b2c31691 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -47,9 +47,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); private static EELFLogger eelflogger = EELFManager.getInstance() .getLogger(DeliveryTask.class); - private DeliveryTaskHelper dth; + private DeliveryTaskHelper deliveryTaskHelper; private String pubid; - private DestInfo di; + private DestInfo destInfo; private String spool; private File datafile; private File metafile; @@ -69,25 +69,25 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Create a delivery task for a given delivery queue and pub ID * - * @param dth The delivery task helper for the queue this task is in. + * @param deliveryTaskHelper The delivery task helper for the queue this task is in. * @param pubid The publish ID for this file. This is used as * the base for the file name in the spool directory and is of * the form <milliseconds since 1970>.<fqdn of initial data router node> */ - public DeliveryTask(DeliveryTaskHelper dth, String pubid) { - this.dth = dth; + public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { + this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; - di = dth.getDestInfo(); - subid = di.getSubId(); - feedid = di.getLogData(); - spool = di.getSpool(); + destInfo = deliveryTaskHelper.getDestinationInfo(); + subid = destInfo.getSubId(); + feedid = destInfo.getLogData(); + spool = destInfo.getSpool(); String dfn = spool + "/" + pubid; String mfn = dfn + ".M"; datafile = new File(spool + "/" + pubid); metafile = new File(mfn); - boolean monly = di.isMetaDataOnly(); + boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); - Vector<String[]> hdrv = new Vector<String[]>(); + Vector<String[]> hdrv = new Vector<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { String s = br.readLine(); @@ -104,7 +104,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { String v = s.substring(i + 1); if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) { subid = v.replaceAll("[^ ]*/", ""); - feedid = dth.getFeedId(subid.replaceAll(" .*", "")); + feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); } if (length == 0 && h.toLowerCase().startsWith("content-")) { continue; @@ -126,7 +126,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); } hdrs = hdrv.toArray(new String[hdrv.size()][]); - url = dth.getDestURL(fileid); + url = deliveryTaskHelper.getDestURL(fileid); } /** * Is the object a DeliveryTask with the same publication ID? @@ -171,14 +171,14 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { public void run() { attempts++; try { - di = dth.getDestInfo(); - boolean expect100 = di.isUsing100(); - boolean monly = di.isMetaDataOnly(); + destInfo = deliveryTaskHelper.getDestinationInfo(); + boolean expect100 = destInfo.isUsing100(); + boolean monly = destInfo.isMetaDataOnly(); length = 0; if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - url = dth.getDestURL(fileid); + url = deliveryTaskHelper.getDestURL(fileid); URL u = new URL(url); HttpURLConnection uc = (HttpURLConnection) u.openConnection(); uc.setConnectTimeout(60000); @@ -186,7 +186,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { uc.setInstanceFollowRedirects(false); uc.setRequestMethod(method); uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", di.getAuth()); + uc.setRequestProperty("Authorization", destInfo.getAuth()); uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); for (String[] nv : hdrs) { uc.addRequestProperty(nv[0], nv[1]); @@ -201,7 +201,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { try { os = uc.getOutputStream(); } catch (ProtocolException pe) { - dth.reportDeliveryExtra(this, -1L); + deliveryTaskHelper.reportDeliveryExtra(this, -1L); // Rcvd error instead of 100-continue loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe); } @@ -223,7 +223,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } os.close(); } catch (IOException ioe) { - dth.reportDeliveryExtra(this, sofar); + deliveryTaskHelper.reportDeliveryExtra(this, sofar); throw ioe; } } @@ -257,10 +257,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } is.close(); } - dth.reportStatus(this, rc, xpubid, rmsg); + deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); - dth.reportException(this, e); + deliveryTaskHelper.reportException(this, e); } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java index 932b792a..d4ac8bd6 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java @@ -38,7 +38,7 @@ public interface DeliveryTaskHelper { * @param task The task that failed * @param exception The exception that occurred */ - public void reportException(DeliveryTask task, Exception exception); + void reportException(DeliveryTask task, Exception exception); /** * Report that a delivery attempt completed (successfully or unsuccessfully) @@ -48,7 +48,7 @@ public interface DeliveryTaskHelper { * @param xpubid The publish ID from the far end (if any) * @param location The redirection location for a 3XX response */ - public void reportStatus(DeliveryTask task, int status, String xpubid, String location); + void reportStatus(DeliveryTask task, int status, String xpubid, String location); /** * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue. @@ -56,14 +56,14 @@ public interface DeliveryTaskHelper { * @param task The task that failed * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue. */ - public void reportDeliveryExtra(DeliveryTask task, long sent); + void reportDeliveryExtra(DeliveryTask task, long sent); /** * Get the destination information for the delivery queue * * @return The destination information */ - public DestInfo getDestInfo(); + DestInfo getDestinationInfo(); /** * Given a file ID, get the URL to deliver to @@ -71,7 +71,7 @@ public interface DeliveryTaskHelper { * @param fileid The file id * @return The URL to deliver to */ - public String getDestURL(String fileid); + String getDestURL(String fileid); /** * Get the feed ID for a subscription @@ -79,5 +79,5 @@ public interface DeliveryTaskHelper { * @param subid The subscription ID * @return The feed iD */ - public String getFeedId(String subid); + String getFeedId(String subid); } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java index 12253314..c3e0057c 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java @@ -37,6 +37,7 @@ public class DestInfo { private String authentication; private boolean metaonly; private boolean use100; + private boolean privilegedSubscriber; /** * Create a destination information object. @@ -50,8 +51,9 @@ public class DestInfo { * @param authentication The credentials. * @param metaonly Is this a metadata only delivery? * @param use100 Should I use expect 100-continue? + * @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file */ - public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) { + public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) { this.name = name; this.spool = spool; this.subid = subid; @@ -61,6 +63,27 @@ public class DestInfo { this.authentication = authentication; this.metaonly = metaonly; this.use100 = use100; + this.privilegedSubscriber = privilegedSubscriber; + } + + /** + * Create a destination information object. + * + * @param name n:fqdn or s:subid + * @param spool The directory where files are spooled. + * @param subscription The subscription. + */ + public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) { + this.name = name; + this.spool = spool; + this.subid = subscription.getSubId(); + this.logdata = subscription.getFeedId(); + this.url = subscription.getURL(); + this.authuser = subscription.getAuthUser(); + this.authentication = subscription.getCredentials(); + this.metaonly = subscription.isMetaDataOnly(); + this.use100 = subscription.isUsing100(); + this.privilegedSubscriber = subscription.isPrivilegedSubscriber(); } public boolean equals(Object o) { @@ -150,4 +173,11 @@ public class DestInfo { public boolean isUsing100() { return (use100); } + + /** + * Should we wait to receive a file processed acknowledgement before deleting file + */ + public boolean isPrivilegedSubscriber() { + return (privilegedSubscriber); + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java index 1d5f76f6..ff803afc 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java @@ -57,6 +57,10 @@ public class LogManager extends TimerTask { return (10000L); } + public long getWaitForFileProcessFailureTimer() { + return (600000L); + } + public double getFailureBackoff() { return (2.0); } @@ -77,14 +81,14 @@ public class LogManager extends TimerTask { return (86400000); } - public String getDestURL(DestInfo dest, String fileid) { + public String getDestURL(DestInfo destinationInfo, String fileid) { return (config.getEventLogUrl()); } - public void handleUnreachable(DestInfo dest) { + public void handleUnreachable(DestInfo destinationInfo) { } - public boolean handleRedirection(DestInfo dest, String location, String fileid) { + public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) { return (false); } @@ -101,7 +105,7 @@ public class LogManager extends TimerTask { public Uploader() { dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, - false)); + false, false)); setDaemon(true); setName("Log Uploader"); start(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java index c40d29c3..d3d3d01b 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java @@ -231,6 +231,7 @@ public class NodeConfig { private String credentials; private boolean metaonly; private boolean use100; + private boolean privilegedSubscriber; /** * Construct a subscription configuration entry @@ -243,9 +244,10 @@ public class NodeConfig { * Authorization header. * @param metaonly Is this a meta data only subscription? * @param use100 Should we send Expect: 100-continue? + * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file */ public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, - boolean metaonly, boolean use100) { + boolean metaonly, boolean use100, boolean privilegedSubscriber) { this.subid = subid; this.feedid = feedid; this.url = url; @@ -253,6 +255,7 @@ public class NodeConfig { this.credentials = credentials; this.metaonly = metaonly; this.use100 = use100; + this.privilegedSubscriber = privilegedSubscriber; } /** @@ -303,6 +306,13 @@ public class NodeConfig { public boolean isUsing100() { return (use100); } + + /** + * Can we wait to receive a delete file call before deleting file + */ + public boolean isPrivilegedSubscriber() { + return (privilegedSubscriber); + } } /** @@ -462,11 +472,12 @@ public class NodeConfig { Target[] targets; } - private Hashtable<String, String> params = new Hashtable<String, String>(); - private Hashtable<String, Feed> feeds = new Hashtable<String, Feed>(); - private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>(); - private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>(); - private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>(); + private Hashtable<String, String> params = new Hashtable<>(); + private Hashtable<String, Feed> feeds = new Hashtable<>(); + private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>(); + private Hashtable<String, DestInfo> subinfo = new Hashtable<>(); + private Hashtable<String, IsFrom> nodes = new Hashtable<>(); + private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>(); private String myname; private String myauth; private DestInfo[] alldests; @@ -486,7 +497,7 @@ public class NodeConfig { for (ProvParam p : pd.getParams()) { params.put(p.getName(), p.getValue()); } - Vector<DestInfo> div = new Vector<DestInfo>(); + Vector<DestInfo> destInfos = new Vector<>(); myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); for (ProvNode pn : pd.getNodes()) { String cn = pn.getCName(); @@ -495,9 +506,9 @@ public class NodeConfig { } String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey); DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, - "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true); + "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false); (new File(di.getSpool())).mkdirs(); - div.add(di); + destInfos.add(di); nodeinfo.put(cn, di); nodes.put(auth, new IsFrom(cn)); } @@ -533,7 +544,7 @@ public class NodeConfig { } egrtab.put(pfe.getSubId(), pfe.getNode()); } - Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>(); + Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>(); for (ProvFeedSubnet pfs : pd.getFeedSubnets()) { Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId()); if (v == null) { @@ -542,46 +553,47 @@ public class NodeConfig { } v.add(new SubnetMatcher(pfs.getCidr())); } - Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>(); - HashSet<String> allfeeds = new HashSet<String>(); + Hashtable<String, StringBuffer> feedTargets = new Hashtable<>(); + HashSet<String> allfeeds = new HashSet<>(); for (ProvFeed pfx : pd.getFeeds()) { if (pfx.getStatus() == null) { allfeeds.add(pfx.getId()); } } - for (ProvSubscription ps : pd.getSubscriptions()) { - String sid = ps.getSubId(); - String fid = ps.getFeedId(); - if (!allfeeds.contains(fid)) { + for (ProvSubscription provSubscription : pd.getSubscriptions()) { + String subId = provSubscription.getSubId(); + String feedId = provSubscription.getFeedId(); + if (!allfeeds.contains(feedId)) { continue; } - if (subinfo.get(sid) != null) { + if (subinfo.get(subId) != null) { continue; } int sididx = 999; try { - sididx = Integer.parseInt(sid); + sididx = Integer.parseInt(subId); sididx -= sididx % 100; } catch (Exception e) { } - String siddir = sididx + "/" + sid; - DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), - ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100()); - (new File(di.getSpool())).mkdirs(); - div.add(di); - subinfo.put(sid, di); - String egr = egrtab.get(sid); + String subscriptionDirectory = sididx + "/" + subId; + DestInfo destinationInfo = new DestInfo("s:" + subId, + spooldir + "/s/" + subscriptionDirectory, provSubscription); + (new File(destinationInfo.getSpool())).mkdirs(); + destInfos.add(destinationInfo); + provSubscriptions.put(subId, provSubscription); + subinfo.put(subId, destinationInfo); + String egr = egrtab.get(subId); if (egr != null) { - sid = pf.getPath(egr) + sid; + subId = pf.getPath(egr) + subId; } - StringBuffer sb = ttab.get(fid); + StringBuffer sb = feedTargets.get(feedId); if (sb == null) { sb = new StringBuffer(); - ttab.put(fid, sb); + feedTargets.put(feedId, sb); } - sb.append(' ').append(sid); + sb.append(' ').append(subId); } - alldests = div.toArray(new DestInfo[div.size()]); + alldests = destInfos.toArray(new DestInfo[destInfos.size()]); for (ProvFeed pfx : pd.getFeeds()) { String fid = pfx.getId(); Feed f = feeds.get(fid); @@ -609,7 +621,7 @@ public class NodeConfig { } else { f.redirections = v2.toArray(new Redirection[v2.size()]); } - StringBuffer sb = ttab.get(fid); + StringBuffer sb = feedTargets.get(fid); if (sb == null) { f.targets = new Target[0]; } else { @@ -712,6 +724,16 @@ public class NodeConfig { } /** + * Check whether delete file is allowed. + * + * @param subId The ID of the subscription being requested. + */ + public boolean isDeletePermitted(String subId) { + ProvSubscription provSubscription = provSubscriptions.get(subId); + return provSubscription.isPrivilegedSubscriber(); + } + + /** * Get authenticated user */ public String getAuthUser(String feedid, String credentials) { diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java index 474f5dde..d98c47ae 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java @@ -57,6 +57,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { private Timer timer = new Timer("Node Configuration Timer", true); private long maxfailuretimer; private long initfailuretimer; + private long waitForFileProcessFailureTimer; private long expirationtimer; private double failurebackoff; private long fairtimelimit; @@ -187,6 +188,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s"); initfailuretimer = 10000; + waitForFileProcessFailureTimer = 600000; maxfailuretimer = 3600000; expirationtimer = 86400000; failurebackoff = 2.0; @@ -200,6 +202,10 @@ public class NodeConfigManager implements DeliveryQueueHelper { } catch (Exception e) { } try { + waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000); + } catch (Exception e) { + } + try { maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) { } @@ -329,6 +335,16 @@ public class NodeConfigManager implements DeliveryQueueHelper { } /** + * Check whether delete file is allowed. + * + * @param subId The ID of the subscription being requested + * @return True if the delete file is permitted for the subscriber. + */ + public boolean isDeletePermitted(String subId) { + return (config.isDeletePermitted(subId)); + } + + /** * Check who the user is given the feed ID and the offered credentials. * * @param feedid The ID of the feed specified @@ -407,13 +423,13 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Get the URL to deliver a message to. * - * @param destinfo The destination information + * @param destinationInfo The destination information * @param fileid The file ID * @return The URL to deliver to */ - public String getDestURL(DestInfo destinfo, String fileid) { - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); + public String getDestURL(DestInfo destinationInfo, String fileid) { + String subid = destinationInfo.getSubId(); + String purl = destinationInfo.getURL(); if (followredirects && subid != null) { purl = rdmgr.lookup(subid, purl); } @@ -430,10 +446,10 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Set up redirection on receipt of a 3XX from a target URL */ - public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) { + public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) { fileid = "/" + fileid; - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); + String subid = destinationInfo.getSubId(); + String purl = destinationInfo.getURL(); if (followredirects && subid != null && redirto.endsWith(fileid)) { redirto = redirto.substring(0, redirto.length() - fileid.length()); if (!redirto.equals(purl)) { @@ -447,8 +463,8 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Handle unreachable target URL */ - public void handleUnreachable(DestInfo destinfo) { - String subid = destinfo.getSubId(); + public void handleUnreachable(DestInfo destinationInfo) { + String subid = destinationInfo.getSubId(); if (followredirects && subid != null) { rdmgr.forget(subid); } @@ -462,6 +478,13 @@ public class NodeConfigManager implements DeliveryQueueHelper { } /** + * Get the timeout before retrying after delivery and wait for file processing + */ + public long getWaitForFileProcessFailureTimer() { + return (waitForFileProcessFailureTimer); + } + + /** * Get the maximum timeout between delivery attempts */ public long getMaxFailureTimer() { diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java index e07642c4..d25531a7 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java @@ -145,7 +145,7 @@ public class NodeMain { ctxt = new ServletContextHandler(0); ctxt.setContextPath("/"); server.setHandler(ctxt); - ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*"); + ctxt.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*"); nodeMainLogger.info("NODE0005 Data Router Node Activating Service"); server.start(); server.join(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java index 7e1d46d3..fae2c1f6 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java @@ -40,11 +40,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Enumeration; import java.util.regex.Pattern; -import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.jetbrains.annotations.Nullable; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; @@ -67,6 +67,7 @@ public class NodeServlet extends HttpServlet { //Adding EELF Logger Rally:US664892 private static EELFLogger eelflogger = EELFManager.getInstance() .getLogger(NodeServlet.class); + private Delivery delivery; static { final String ws = "\\s*"; @@ -80,6 +81,10 @@ public class NodeServlet extends HttpServlet { MetaDataPattern = Pattern.compile(object, Pattern.DOTALL); } + NodeServlet(Delivery delivery) { + this.delivery = delivery; + } + /** * Get the NodeConfigurationManager */ @@ -155,16 +160,13 @@ public class NodeServlet extends HttpServlet { } catch (IOException ioe) { logger.error("IOException" + ioe.getMessage()); eelflogger.info(EelfMsgs.EXIT); - } catch (ServletException se) { - logger.error("ServletException" + se.getMessage()); - eelflogger.info(EelfMsgs.EXIT); } } /** * Handle all DELETE requests */ - protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) { NodeUtils.setIpAndFqdnForEelf("doDelete"); NodeUtils.setRequestIdAndInvocationId(req); eelflogger.info(EelfMsgs.ENTRY); @@ -173,40 +175,26 @@ public class NodeServlet extends HttpServlet { try { common(req, resp, false); } catch (IOException ioe) { - logger.error("IOException" + ioe.getMessage()); - eelflogger.info(EelfMsgs.EXIT); - } catch (ServletException se) { - logger.error("ServletException" + se.getMessage()); + logger.error("IOException " + ioe.getMessage()); eelflogger.info(EelfMsgs.EXIT); } - } - private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) - throws ServletException, IOException { - if (down(resp)) { - eelflogger.info(EelfMsgs.EXIT); - return; - } - if (!req.isSecure()) { - logger.info( - "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req - .getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); - eelflogger.info(EelfMsgs.EXIT); - return; - } - String fileid = req.getPathInfo(); - if (fileid == null) { - logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req - .getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_NOT_FOUND, - "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); - eelflogger.info(EelfMsgs.EXIT); - return; - } + private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException { + String fileid = getFileId(req, resp); + if (fileid == null) return; String feedid = null; String user = null; + String ip = req.getRemoteAddr(); + String lip = req.getLocalAddr(); + String pubid = null; + String xpubid = null; + String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; + Target[] targets = null; + if (fileid.startsWith("/delete/")) { + deleteFile(req, resp, fileid, pubid); + return; + } String credentials = req.getHeader("Authorization"); if (credentials == null) { logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req @@ -215,12 +203,6 @@ public class NodeServlet extends HttpServlet { eelflogger.info(EelfMsgs.EXIT); return; } - String ip = req.getRemoteAddr(); - String lip = req.getLocalAddr(); - String pubid = null; - String xpubid = null; - String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; - Target[] targets = null; if (fileid.startsWith("/publish/")) { fileid = fileid.substring(9); int i = fileid.indexOf('/'); @@ -315,8 +297,8 @@ public class NodeServlet extends HttpServlet { mx.append(req.getMethod()).append('\t').append(fileid).append('\n'); Enumeration hnames = req.getHeaderNames(); String ctype = null; - Boolean hasRequestIdHeader = false; - Boolean hasInvocationIdHeader = false; + boolean hasRequestIdHeader = false; + boolean hasInvocationIdHeader = false; while (hnames.hasMoreElements()) { String hn = (String) hnames.nextElement(); String hnlc = hn.toLowerCase(); @@ -449,6 +431,90 @@ public class NodeServlet extends HttpServlet { } } + private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) { + try { + fileid = fileid.substring(8); + int i = fileid.indexOf('/'); + if (i == -1 || i == fileid.length() - 1) { + logger.info("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, + "Invalid request URI. Expecting <subId>/<pubId>."); + eelflogger.info(EelfMsgs.EXIT); + return; + } + String subscriptionId = fileid.substring(0, i); + int subId = Integer.parseInt(subscriptionId); + pubid = fileid.substring(i + 1); + String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: " + + config.getMyName() + "."; + int subIdDir = subId - (subId % 100); + if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) { + return; + } + boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid); + if (result) { + logger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: " + + config.getMyName()); + resp.setStatus(HttpServletResponse.SC_OK); + eelflogger.info(EelfMsgs.EXIT); + } else { + logger.error("NODE0116 " + errorMessage); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "File not found on server."); + eelflogger.info(EelfMsgs.EXIT); + } + } catch (IOException ioe) { + logger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: " + + config.getMyName() + ". Error: " + ioe.getMessage()); + eelflogger.info(EelfMsgs.EXIT); + } + } + + @Nullable + private String getFileId(HttpServletRequest req, HttpServletResponse resp) throws IOException { + if (down(resp)) { + eelflogger.info(EelfMsgs.EXIT); + return null; + } + if (!req.isSecure()) { + logger.info( + "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); + eelflogger.info(EelfMsgs.EXIT); + return null; + } + String fileid = req.getPathInfo(); + if (fileid == null) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, + "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); + eelflogger.info(EelfMsgs.EXIT); + return null; + } + return fileid; + } + + private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException { + try { + boolean deletePermitted = config.isDeletePermitted(subscriptionId); + if (!deletePermitted) { + logger.error("NODE0113 " + errorMessage + " Error: Subscription " + + subscriptionId + " is not a privileged subscription"); + resp.sendError(HttpServletResponse.SC_UNAUTHORIZED); + eelflogger.info(EelfMsgs.EXIT); + return false; + } + } catch (NullPointerException npe) { + logger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist"); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + eelflogger.info(EelfMsgs.EXIT); + return false; + } + return true; + } + private int getIdFromPath(HttpServletRequest req) { String path = req.getPathInfo(); if (path == null || path.length() < 2) { diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java index f0b81747..765a4075 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java @@ -173,7 +173,8 @@ public class ProvData { String password = gvas(jdel, "password"); boolean monly = jsub.getBoolean("metadataOnly"); boolean use100 = jdel.getBoolean("use100"); - psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100)); + boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber"); + psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber)); } } JSONObject jparams = jcfg.optJSONObject("parameters"); |