summaryrefslogtreecommitdiffstats
path: root/datarouter-node/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'datarouter-node/src/main/java/org')
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java14
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java92
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java35
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java44
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java12
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java32
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java12
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java84
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java41
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java2
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java148
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java3
12 files changed, 356 insertions, 163 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
index ae4f13bf..d2600d23 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
@@ -267,4 +267,18 @@ public class Delivery {
}
}
}
+
+ /**
+ * Mark the task in spool a success
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
index ad746255..abdfa718 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
@@ -64,16 +64,16 @@ import java.util.*;
* failure timer is active or if no files are found in a directory scan.
*/
public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
- private DeliveryQueueHelper dqh;
- private DestInfo di;
- private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>();
- private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();
+ private DeliveryQueueHelper deliveryQueueHelper;
+ private DestInfo destinationInfo;
+ private Hashtable<String, DeliveryTask> working = new Hashtable<>();
+ private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
private int todoindex;
private boolean failed;
private long failduration;
private long resumetime;
File dir;
- private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();
+ private Vector<DeliveryTask> todo = new Vector<>();
/**
* Try to cancel a delivery task.
@@ -139,11 +139,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
if (!failed) {
failed = true;
if (failduration == 0) {
- failduration = dqh.getInitFailureTimer();
+ if (destinationInfo.isPrivilegedSubscriber()) {
+ failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
+ } else{
+ failduration = deliveryQueueHelper.getInitFailureTimer();
+ }
}
resumetime = System.currentTimeMillis() + failduration;
- long maxdur = dqh.getMaxFailureTimer();
- failduration = (long) (failduration * dqh.getFailureBackoff());
+ long maxdur = deliveryQueueHelper.getMaxFailureTimer();
+ failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
if (failduration > maxdur) {
failduration = maxdur;
}
@@ -184,7 +188,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
public synchronized DeliveryTask peekNext() {
long now = System.currentTimeMillis();
- long mindate = now - dqh.getExpirationTimer();
+ long mindate = now - deliveryQueueHelper.getExpirationTimer();
if (failed) {
if (now > resumetime) {
failed = false;
@@ -246,32 +250,32 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
/**
* Create a delivery queue for a given destination info
*/
- public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {
- this.dqh = dqh;
- this.di = di;
- dir = new File(di.getSpool());
+ public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+ this.deliveryQueueHelper = deliveryQueueHelper;
+ this.destinationInfo = destinationInfo;
+ dir = new File(destinationInfo.getSpool());
dir.mkdirs();
}
/**
* Update the destination info for this delivery queue
*/
- public void config(DestInfo di) {
- this.di = di;
+ public void config(DestInfo destinationInfo) {
+ this.destinationInfo = destinationInfo;
}
/**
* Get the dest info
*/
- public DestInfo getDestInfo() {
- return (di);
+ public DestInfo getDestinationInfo() {
+ return (destinationInfo);
}
/**
* Get the config manager
*/
public DeliveryQueueHelper getConfig() {
- return (dqh);
+ return (deliveryQueueHelper);
}
/**
@@ -294,22 +298,26 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
if (status < 300) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);
- markSuccess(task);
- } else if (status < 400 && dqh.isFollowRedirects()) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
- if (dqh.handleRedirection(di, location, task.getFileId())) {
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+ if (destinationInfo.isPrivilegedSubscriber()) {
+ markFailWithRetry(task);
+ } else {
+ markSuccess(task);
+ }
+ } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+ if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
markRedirect(task);
} else {
StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
}
- } else if (status < 500) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+ } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
} else {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
markFailWithRetry(task);
}
}
@@ -318,8 +326,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
* Delivery failed by reason of an exception
*/
public void reportException(DeliveryTask task, Exception exception) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());
- dqh.handleUnreachable(di);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
+ deliveryQueueHelper.handleUnreachable(destinationInfo);
markFailWithRetry(task);
}
@@ -330,14 +338,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
* @return The feed ID
*/
public String getFeedId(String subid) {
- return (dqh.getFeedId(subid));
+ return (deliveryQueueHelper.getFeedId(subid));
}
/**
* Get the URL to deliver a message to given the file ID
*/
public String getDestURL(String fileid) {
- return (dqh.getDestURL(di, fileid));
+ return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
}
/**
@@ -346,8 +354,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
public void run() {
DeliveryTask t;
- long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();
- int filestogo = dqh.getFairFileLimit();
+ long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
+ int filestogo = deliveryQueueHelper.getFairFileLimit();
while ((t = getNext()) != null) {
t.run();
if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
@@ -369,4 +377,24 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
public void resetQueue() {
resumetime = System.currentTimeMillis();
}
+
+ /**
+ * Get task if in queue and mark as success
+ */
+ public boolean markTaskSuccess(String pubId) {
+ DeliveryTask task = working.get(pubId);
+ if (task != null) {
+ markSuccess(task);
+ return true;
+ }
+ task = retry.get(pubId);
+ if (task != null) {
+ retry.remove(pubId);
+ task.clean();
+ resumetime = 0;
+ failduration = 0;
+ return true;
+ }
+ return false;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
index b1734cd4..5cf5fa4c 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
@@ -34,64 +34,69 @@ public interface DeliveryQueueHelper {
/**
* Get the timeout (milliseconds) before retrying after an initial delivery failure
*/
- public long getInitFailureTimer();
+ long getInitFailureTimer();
+
+ /**
+ * Get the timeout before retrying after delivery and wait for file processing
+ */
+ long getWaitForFileProcessFailureTimer();
/**
* Get the ratio between timeouts on consecutive delivery attempts
*/
- public double getFailureBackoff();
+ double getFailureBackoff();
/**
* Get the maximum timeout (milliseconds) between delivery attempts
*/
- public long getMaxFailureTimer();
+ long getMaxFailureTimer();
/**
* Get the expiration timer (milliseconds) for deliveries
*/
- public long getExpirationTimer();
+ long getExpirationTimer();
/**
* Get the maximum number of file delivery attempts before checking
* if another queue has work to be performed.
*/
- public int getFairFileLimit();
+ int getFairFileLimit();
/**
* Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
*/
- public long getFairTimeLimit();
+ long getFairTimeLimit();
/**
* Get the URL for delivering a file
*
- * @param dest The destination information for the file to be delivered.
+ * @param destinationInfo The destination information for the file to be delivered.
* @param fileid The file id for the file to be delivered.
- * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid).
+ * @return The URL for delivering the file (typically, destinationInfo.getURL() + "/" + fileid).
*/
- public String getDestURL(DestInfo dest, String fileid);
+ String getDestURL(DestInfo destinationInfo, String fileid);
/**
* Forget redirections associated with a subscriber
*
- * @param dest Destination information to forget
+ * @param destinationInfo Destination information to forget
*/
- public void handleUnreachable(DestInfo dest);
+ void handleUnreachable(DestInfo destinationInfo);
/**
* Post redirection for a subscriber
*
- * @param dest Destination information to update
+ * @param destinationInfo Destination information to update
* @param location Location given by subscriber
* @param fileid File ID of request
* @return true if this 3xx response is retryable, otherwise, false.
*/
- public boolean handleRedirection(DestInfo dest, String location, String fileid);
+ boolean handleRedirection(DestInfo destinationInfo, String location, String fileid);
/**
* Should I handle 3xx responses differently than 4xx responses?
*/
- public boolean isFollowRedirects();
+ boolean isFollowRedirects();
/**
* Get the feed ID for a subscription
@@ -99,5 +104,5 @@ public interface DeliveryQueueHelper {
* @param subid The subscription ID
* @return The feed ID
*/
- public String getFeedId(String subid);
+ String getFeedId(String subid);
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
index 80729905..b2c31691 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
@@ -47,9 +47,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
private static EELFLogger eelflogger = EELFManager.getInstance()
.getLogger(DeliveryTask.class);
- private DeliveryTaskHelper dth;
+ private DeliveryTaskHelper deliveryTaskHelper;
private String pubid;
- private DestInfo di;
+ private DestInfo destInfo;
private String spool;
private File datafile;
private File metafile;
@@ -69,25 +69,25 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
/**
* Create a delivery task for a given delivery queue and pub ID
*
- * @param dth The delivery task helper for the queue this task is in.
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
* @param pubid The publish ID for this file. This is used as
* the base for the file name in the spool directory and is of
* the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
- public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
- this.dth = dth;
+ public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+ this.deliveryTaskHelper = deliveryTaskHelper;
this.pubid = pubid;
- di = dth.getDestInfo();
- subid = di.getSubId();
- feedid = di.getLogData();
- spool = di.getSpool();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ subid = destInfo.getSubId();
+ feedid = destInfo.getLogData();
+ spool = destInfo.getSpool();
String dfn = spool + "/" + pubid;
String mfn = dfn + ".M";
datafile = new File(spool + "/" + pubid);
metafile = new File(mfn);
- boolean monly = di.isMetaDataOnly();
+ boolean monly = destInfo.isMetaDataOnly();
date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
- Vector<String[]> hdrv = new Vector<String[]>();
+ Vector<String[]> hdrv = new Vector<>();
try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
String s = br.readLine();
@@ -104,7 +104,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
String v = s.substring(i + 1);
if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
subid = v.replaceAll("[^ ]*/", "");
- feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+ feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
}
if (length == 0 && h.toLowerCase().startsWith("content-")) {
continue;
@@ -126,7 +126,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
}
hdrs = hdrv.toArray(new String[hdrv.size()][]);
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
}
/**
* Is the object a DeliveryTask with the same publication ID?
@@ -171,14 +171,14 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
public void run() {
attempts++;
try {
- di = dth.getDestInfo();
- boolean expect100 = di.isUsing100();
- boolean monly = di.isMetaDataOnly();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ boolean expect100 = destInfo.isUsing100();
+ boolean monly = destInfo.isMetaDataOnly();
length = 0;
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
URL u = new URL(url);
HttpURLConnection uc = (HttpURLConnection) u.openConnection();
uc.setConnectTimeout(60000);
@@ -186,7 +186,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
uc.setInstanceFollowRedirects(false);
uc.setRequestMethod(method);
uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", di.getAuth());
+ uc.setRequestProperty("Authorization", destInfo.getAuth());
uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
for (String[] nv : hdrs) {
uc.addRequestProperty(nv[0], nv[1]);
@@ -201,7 +201,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
try {
os = uc.getOutputStream();
} catch (ProtocolException pe) {
- dth.reportDeliveryExtra(this, -1L);
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
// Rcvd error instead of 100-continue
loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
}
@@ -223,7 +223,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
os.close();
} catch (IOException ioe) {
- dth.reportDeliveryExtra(this, sofar);
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
throw ioe;
}
}
@@ -257,10 +257,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
is.close();
}
- dth.reportStatus(this, rc, xpubid, rmsg);
+ deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
- dth.reportException(this, e);
+ deliveryTaskHelper.reportException(this, e);
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
index 932b792a..d4ac8bd6 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
@@ -38,7 +38,7 @@ public interface DeliveryTaskHelper {
* @param task The task that failed
* @param exception The exception that occurred
*/
- public void reportException(DeliveryTask task, Exception exception);
+ void reportException(DeliveryTask task, Exception exception);
/**
* Report that a delivery attempt completed (successfully or unsuccessfully)
@@ -48,7 +48,7 @@ public interface DeliveryTaskHelper {
* @param xpubid The publish ID from the far end (if any)
* @param location The redirection location for a 3XX response
*/
- public void reportStatus(DeliveryTask task, int status, String xpubid, String location);
+ void reportStatus(DeliveryTask task, int status, String xpubid, String location);
/**
* Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
@@ -56,14 +56,14 @@ public interface DeliveryTaskHelper {
* @param task The task that failed
* @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
*/
- public void reportDeliveryExtra(DeliveryTask task, long sent);
+ void reportDeliveryExtra(DeliveryTask task, long sent);
/**
* Get the destination information for the delivery queue
*
* @return The destination information
*/
- public DestInfo getDestInfo();
+ DestInfo getDestinationInfo();
/**
* Given a file ID, get the URL to deliver to
@@ -71,7 +71,7 @@ public interface DeliveryTaskHelper {
* @param fileid The file id
* @return The URL to deliver to
*/
- public String getDestURL(String fileid);
+ String getDestURL(String fileid);
/**
* Get the feed ID for a subscription
@@ -79,5 +79,5 @@ public interface DeliveryTaskHelper {
* @param subid The subscription ID
* @return The feed iD
*/
- public String getFeedId(String subid);
+ String getFeedId(String subid);
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
index 12253314..c3e0057c 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
@@ -37,6 +37,7 @@ public class DestInfo {
private String authentication;
private boolean metaonly;
private boolean use100;
+ private boolean privilegedSubscriber;
/**
* Create a destination information object.
@@ -50,8 +51,9 @@ public class DestInfo {
* @param authentication The credentials.
* @param metaonly Is this a metadata only delivery?
* @param use100 Should I use expect 100-continue?
+ * @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file
*/
- public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) {
+ public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) {
this.name = name;
this.spool = spool;
this.subid = subid;
@@ -61,6 +63,27 @@ public class DestInfo {
this.authentication = authentication;
this.metaonly = metaonly;
this.use100 = use100;
+ this.privilegedSubscriber = privilegedSubscriber;
+ }
+
+ /**
+ * Create a destination information object.
+ *
+ * @param name n:fqdn or s:subid
+ * @param spool The directory where files are spooled.
+ * @param subscription The subscription.
+ */
+ public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
+ this.name = name;
+ this.spool = spool;
+ this.subid = subscription.getSubId();
+ this.logdata = subscription.getFeedId();
+ this.url = subscription.getURL();
+ this.authuser = subscription.getAuthUser();
+ this.authentication = subscription.getCredentials();
+ this.metaonly = subscription.isMetaDataOnly();
+ this.use100 = subscription.isUsing100();
+ this.privilegedSubscriber = subscription.isPrivilegedSubscriber();
}
public boolean equals(Object o) {
@@ -150,4 +173,11 @@ public class DestInfo {
public boolean isUsing100() {
return (use100);
}
+
+ /**
+ * Should we wait to receive a file processed acknowledgement before deleting file
+ */
+ public boolean isPrivilegedSubscriber() {
+ return (privilegedSubscriber);
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
index 1d5f76f6..ff803afc 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
@@ -57,6 +57,10 @@ public class LogManager extends TimerTask {
return (10000L);
}
+ public long getWaitForFileProcessFailureTimer() {
+ return (600000L);
+ }
+
public double getFailureBackoff() {
return (2.0);
}
@@ -77,14 +81,14 @@ public class LogManager extends TimerTask {
return (86400000);
}
- public String getDestURL(DestInfo dest, String fileid) {
+ public String getDestURL(DestInfo destinationInfo, String fileid) {
return (config.getEventLogUrl());
}
- public void handleUnreachable(DestInfo dest) {
+ public void handleUnreachable(DestInfo destinationInfo) {
}
- public boolean handleRedirection(DestInfo dest, String location, String fileid) {
+ public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
return (false);
}
@@ -101,7 +105,7 @@ public class LogManager extends TimerTask {
public Uploader() {
dq = new DeliveryQueue(this,
new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false,
- false));
+ false, false));
setDaemon(true);
setName("Log Uploader");
start();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
index c40d29c3..d3d3d01b 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
@@ -231,6 +231,7 @@ public class NodeConfig {
private String credentials;
private boolean metaonly;
private boolean use100;
+ private boolean privilegedSubscriber;
/**
* Construct a subscription configuration entry
@@ -243,9 +244,10 @@ public class NodeConfig {
* Authorization header.
* @param metaonly Is this a meta data only subscription?
* @param use100 Should we send Expect: 100-continue?
+ * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
*/
public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
- boolean metaonly, boolean use100) {
+ boolean metaonly, boolean use100, boolean privilegedSubscriber) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
@@ -253,6 +255,7 @@ public class NodeConfig {
this.credentials = credentials;
this.metaonly = metaonly;
this.use100 = use100;
+ this.privilegedSubscriber = privilegedSubscriber;
}
/**
@@ -303,6 +306,13 @@ public class NodeConfig {
public boolean isUsing100() {
return (use100);
}
+
+ /**
+ * Can we wait to receive a delete file call before deleting file
+ */
+ public boolean isPrivilegedSubscriber() {
+ return (privilegedSubscriber);
+ }
}
/**
@@ -462,11 +472,12 @@ public class NodeConfig {
Target[] targets;
}
- private Hashtable<String, String> params = new Hashtable<String, String>();
- private Hashtable<String, Feed> feeds = new Hashtable<String, Feed>();
- private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>();
- private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>();
- private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>();
+ private Hashtable<String, String> params = new Hashtable<>();
+ private Hashtable<String, Feed> feeds = new Hashtable<>();
+ private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
+ private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
+ private Hashtable<String, IsFrom> nodes = new Hashtable<>();
+ private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
private String myname;
private String myauth;
private DestInfo[] alldests;
@@ -486,7 +497,7 @@ public class NodeConfig {
for (ProvParam p : pd.getParams()) {
params.put(p.getName(), p.getValue());
}
- Vector<DestInfo> div = new Vector<DestInfo>();
+ Vector<DestInfo> destInfos = new Vector<>();
myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
for (ProvNode pn : pd.getNodes()) {
String cn = pn.getCName();
@@ -495,9 +506,9 @@ public class NodeConfig {
}
String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
- "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true);
+ "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false);
(new File(di.getSpool())).mkdirs();
- div.add(di);
+ destInfos.add(di);
nodeinfo.put(cn, di);
nodes.put(auth, new IsFrom(cn));
}
@@ -533,7 +544,7 @@ public class NodeConfig {
}
egrtab.put(pfe.getSubId(), pfe.getNode());
}
- Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>();
+ Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
if (v == null) {
@@ -542,46 +553,47 @@ public class NodeConfig {
}
v.add(new SubnetMatcher(pfs.getCidr()));
}
- Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>();
- HashSet<String> allfeeds = new HashSet<String>();
+ Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
+ HashSet<String> allfeeds = new HashSet<>();
for (ProvFeed pfx : pd.getFeeds()) {
if (pfx.getStatus() == null) {
allfeeds.add(pfx.getId());
}
}
- for (ProvSubscription ps : pd.getSubscriptions()) {
- String sid = ps.getSubId();
- String fid = ps.getFeedId();
- if (!allfeeds.contains(fid)) {
+ for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+ String subId = provSubscription.getSubId();
+ String feedId = provSubscription.getFeedId();
+ if (!allfeeds.contains(feedId)) {
continue;
}
- if (subinfo.get(sid) != null) {
+ if (subinfo.get(subId) != null) {
continue;
}
int sididx = 999;
try {
- sididx = Integer.parseInt(sid);
+ sididx = Integer.parseInt(subId);
sididx -= sididx % 100;
} catch (Exception e) {
}
- String siddir = sididx + "/" + sid;
- DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(),
- ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100());
- (new File(di.getSpool())).mkdirs();
- div.add(di);
- subinfo.put(sid, di);
- String egr = egrtab.get(sid);
+ String subscriptionDirectory = sididx + "/" + subId;
+ DestInfo destinationInfo = new DestInfo("s:" + subId,
+ spooldir + "/s/" + subscriptionDirectory, provSubscription);
+ (new File(destinationInfo.getSpool())).mkdirs();
+ destInfos.add(destinationInfo);
+ provSubscriptions.put(subId, provSubscription);
+ subinfo.put(subId, destinationInfo);
+ String egr = egrtab.get(subId);
if (egr != null) {
- sid = pf.getPath(egr) + sid;
+ subId = pf.getPath(egr) + subId;
}
- StringBuffer sb = ttab.get(fid);
+ StringBuffer sb = feedTargets.get(feedId);
if (sb == null) {
sb = new StringBuffer();
- ttab.put(fid, sb);
+ feedTargets.put(feedId, sb);
}
- sb.append(' ').append(sid);
+ sb.append(' ').append(subId);
}
- alldests = div.toArray(new DestInfo[div.size()]);
+ alldests = destInfos.toArray(new DestInfo[destInfos.size()]);
for (ProvFeed pfx : pd.getFeeds()) {
String fid = pfx.getId();
Feed f = feeds.get(fid);
@@ -609,7 +621,7 @@ public class NodeConfig {
} else {
f.redirections = v2.toArray(new Redirection[v2.size()]);
}
- StringBuffer sb = ttab.get(fid);
+ StringBuffer sb = feedTargets.get(fid);
if (sb == null) {
f.targets = new Target[0];
} else {
@@ -712,6 +724,16 @@ public class NodeConfig {
}
/**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested.
+ */
+ public boolean isDeletePermitted(String subId) {
+ ProvSubscription provSubscription = provSubscriptions.get(subId);
+ return provSubscription.isPrivilegedSubscriber();
+ }
+
+ /**
* Get authenticated user
*/
public String getAuthUser(String feedid, String credentials) {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
index 474f5dde..d98c47ae 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
@@ -57,6 +57,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
private Timer timer = new Timer("Node Configuration Timer", true);
private long maxfailuretimer;
private long initfailuretimer;
+ private long waitForFileProcessFailureTimer;
private long expirationtimer;
private double failurebackoff;
private long fairtimelimit;
@@ -187,6 +188,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
initfailuretimer = 10000;
+ waitForFileProcessFailureTimer = 600000;
maxfailuretimer = 3600000;
expirationtimer = 86400000;
failurebackoff = 2.0;
@@ -200,6 +202,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
} catch (Exception e) {
}
try {
+ waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+ } catch (Exception e) {
+ }
+ try {
maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
}
@@ -329,6 +335,16 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested
+ * @return True if the delete file is permitted for the subscriber.
+ */
+ public boolean isDeletePermitted(String subId) {
+ return (config.isDeletePermitted(subId));
+ }
+
+ /**
* Check who the user is given the feed ID and the offered credentials.
*
* @param feedid The ID of the feed specified
@@ -407,13 +423,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Get the URL to deliver a message to.
*
- * @param destinfo The destination information
+ * @param destinationInfo The destination information
* @param fileid The file ID
* @return The URL to deliver to
*/
- public String getDestURL(DestInfo destinfo, String fileid) {
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
+ public String getDestURL(DestInfo destinationInfo, String fileid) {
+ String subid = destinationInfo.getSubId();
+ String purl = destinationInfo.getURL();
if (followredirects && subid != null) {
purl = rdmgr.lookup(subid, purl);
}
@@ -430,10 +446,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Set up redirection on receipt of a 3XX from a target URL
*/
- public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {
+ public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
fileid = "/" + fileid;
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
+ String subid = destinationInfo.getSubId();
+ String purl = destinationInfo.getURL();
if (followredirects && subid != null && redirto.endsWith(fileid)) {
redirto = redirto.substring(0, redirto.length() - fileid.length());
if (!redirto.equals(purl)) {
@@ -447,8 +463,8 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Handle unreachable target URL
*/
- public void handleUnreachable(DestInfo destinfo) {
- String subid = destinfo.getSubId();
+ public void handleUnreachable(DestInfo destinationInfo) {
+ String subid = destinationInfo.getSubId();
if (followredirects && subid != null) {
rdmgr.forget(subid);
}
@@ -462,6 +478,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
+ * Get the timeout before retrying after delivery and wait for file processing
+ */
+ public long getWaitForFileProcessFailureTimer() {
+ return (waitForFileProcessFailureTimer);
+ }
+
+ /**
* Get the maximum timeout between delivery attempts
*/
public long getMaxFailureTimer() {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
index e07642c4..d25531a7 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
@@ -145,7 +145,7 @@ public class NodeMain {
ctxt = new ServletContextHandler(0);
ctxt.setContextPath("/");
server.setHandler(ctxt);
- ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*");
+ ctxt.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
nodeMainLogger.info("NODE0005 Data Router Node Activating Service");
server.start();
server.join();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
index 7e1d46d3..fae2c1f6 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
@@ -40,11 +40,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.regex.Pattern;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
@@ -67,6 +67,7 @@ public class NodeServlet extends HttpServlet {
//Adding EELF Logger Rally:US664892
private static EELFLogger eelflogger = EELFManager.getInstance()
.getLogger(NodeServlet.class);
+ private Delivery delivery;
static {
final String ws = "\\s*";
@@ -80,6 +81,10 @@ public class NodeServlet extends HttpServlet {
MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
}
+ NodeServlet(Delivery delivery) {
+ this.delivery = delivery;
+ }
+
/**
* Get the NodeConfigurationManager
*/
@@ -155,16 +160,13 @@ public class NodeServlet extends HttpServlet {
} catch (IOException ioe) {
logger.error("IOException" + ioe.getMessage());
eelflogger.info(EelfMsgs.EXIT);
- } catch (ServletException se) {
- logger.error("ServletException" + se.getMessage());
- eelflogger.info(EelfMsgs.EXIT);
}
}
/**
* Handle all DELETE requests
*/
- protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doDelete(HttpServletRequest req, HttpServletResponse resp) {
NodeUtils.setIpAndFqdnForEelf("doDelete");
NodeUtils.setRequestIdAndInvocationId(req);
eelflogger.info(EelfMsgs.ENTRY);
@@ -173,40 +175,26 @@ public class NodeServlet extends HttpServlet {
try {
common(req, resp, false);
} catch (IOException ioe) {
- logger.error("IOException" + ioe.getMessage());
- eelflogger.info(EelfMsgs.EXIT);
- } catch (ServletException se) {
- logger.error("ServletException" + se.getMessage());
+ logger.error("IOException " + ioe.getMessage());
eelflogger.info(EelfMsgs.EXIT);
}
-
}
- private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput)
- throws ServletException, IOException {
- if (down(resp)) {
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
- if (!req.isSecure()) {
- logger.info(
- "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
- .getRemoteAddr());
- resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
- String fileid = req.getPathInfo();
- if (fileid == null) {
- logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
- .getRemoteAddr());
- resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
+ private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+ String fileid = getFileId(req, resp);
+ if (fileid == null) return;
String feedid = null;
String user = null;
+ String ip = req.getRemoteAddr();
+ String lip = req.getLocalAddr();
+ String pubid = null;
+ String xpubid = null;
+ String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
+ Target[] targets = null;
+ if (fileid.startsWith("/delete/")) {
+ deleteFile(req, resp, fileid, pubid);
+ return;
+ }
String credentials = req.getHeader("Authorization");
if (credentials == null) {
logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
@@ -215,12 +203,6 @@ public class NodeServlet extends HttpServlet {
eelflogger.info(EelfMsgs.EXIT);
return;
}
- String ip = req.getRemoteAddr();
- String lip = req.getLocalAddr();
- String pubid = null;
- String xpubid = null;
- String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
- Target[] targets = null;
if (fileid.startsWith("/publish/")) {
fileid = fileid.substring(9);
int i = fileid.indexOf('/');
@@ -315,8 +297,8 @@ public class NodeServlet extends HttpServlet {
mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
Enumeration hnames = req.getHeaderNames();
String ctype = null;
- Boolean hasRequestIdHeader = false;
- Boolean hasInvocationIdHeader = false;
+ boolean hasRequestIdHeader = false;
+ boolean hasInvocationIdHeader = false;
while (hnames.hasMoreElements()) {
String hn = (String) hnames.nextElement();
String hnlc = hn.toLowerCase();
@@ -449,6 +431,90 @@ public class NodeServlet extends HttpServlet {
}
}
+ private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+ try {
+ fileid = fileid.substring(8);
+ int i = fileid.indexOf('/');
+ if (i == -1 || i == fileid.length() - 1) {
+ logger.info("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <subId>/<pubId>.");
+ eelflogger.info(EelfMsgs.EXIT);
+ return;
+ }
+ String subscriptionId = fileid.substring(0, i);
+ int subId = Integer.parseInt(subscriptionId);
+ pubid = fileid.substring(i + 1);
+ String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName() + ".";
+ int subIdDir = subId - (subId % 100);
+ if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
+ return;
+ }
+ boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
+ if (result) {
+ logger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName());
+ resp.setStatus(HttpServletResponse.SC_OK);
+ eelflogger.info(EelfMsgs.EXIT);
+ } else {
+ logger.error("NODE0116 " + errorMessage);
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "File not found on server.");
+ eelflogger.info(EelfMsgs.EXIT);
+ }
+ } catch (IOException ioe) {
+ logger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName() + ". Error: " + ioe.getMessage());
+ eelflogger.info(EelfMsgs.EXIT);
+ }
+ }
+
+ @Nullable
+ private String getFileId(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ if (down(resp)) {
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ if (!req.isSecure()) {
+ logger.info(
+ "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ String fileid = req.getPathInfo();
+ if (fileid == null) {
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ return fileid;
+ }
+
+ private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+ try {
+ boolean deletePermitted = config.isDeletePermitted(subscriptionId);
+ if (!deletePermitted) {
+ logger.error("NODE0113 " + errorMessage + " Error: Subscription "
+ + subscriptionId + " is not a privileged subscription");
+ resp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ eelflogger.info(EelfMsgs.EXIT);
+ return false;
+ }
+ } catch (NullPointerException npe) {
+ logger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND);
+ eelflogger.info(EelfMsgs.EXIT);
+ return false;
+ }
+ return true;
+ }
+
private int getIdFromPath(HttpServletRequest req) {
String path = req.getPathInfo();
if (path == null || path.length() < 2) {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
index f0b81747..765a4075 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
@@ -173,7 +173,8 @@ public class ProvData {
String password = gvas(jdel, "password");
boolean monly = jsub.getBoolean("metadataOnly");
boolean use100 = jdel.getBoolean("use100");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100));
+ boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+ psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber));
}
}
JSONObject jparams = jcfg.optJSONObject("parameters");