aboutsummaryrefslogtreecommitdiffstats
path: root/datarouter-node/src/main/java
diff options
context:
space:
mode:
authoreconwar <conor.ward@est.tech>2019-02-14 09:37:44 +0000
committereconwar <conor.ward@est.tech>2019-02-14 09:37:44 +0000
commitc50374709585766e887f349a139de0a6595c1ca1 (patch)
treefe428b922da00abfabaafffdc72596c582625906 /datarouter-node/src/main/java
parent953dbd55595d28ecc7b65413b0edf910da6f45c0 (diff)
Add optional API for PM Mapper
Added new field to Subscriber class to keep files after published Added new Delete endpoint so that file can then be deleted Change-Id: Id72da67689a7ceda8ddd4997cd6349b981cb1cdb Issue-ID: DMAAP-981 Signed-off-by: econwar <conor.ward@est.tech>
Diffstat (limited to 'datarouter-node/src/main/java')
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java14
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java92
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java35
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java44
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java12
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java32
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java12
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java84
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java41
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java2
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java148
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java3
12 files changed, 356 insertions, 163 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
index ae4f13bf..d2600d23 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
@@ -267,4 +267,18 @@ public class Delivery {
}
}
}
+
+ /**
+ * Mark the task in spool a success
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
index ad746255..abdfa718 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
@@ -64,16 +64,16 @@ import java.util.*;
* failure timer is active or if no files are found in a directory scan.
*/
public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
- private DeliveryQueueHelper dqh;
- private DestInfo di;
- private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>();
- private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();
+ private DeliveryQueueHelper deliveryQueueHelper;
+ private DestInfo destinationInfo;
+ private Hashtable<String, DeliveryTask> working = new Hashtable<>();
+ private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
private int todoindex;
private boolean failed;
private long failduration;
private long resumetime;
File dir;
- private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();
+ private Vector<DeliveryTask> todo = new Vector<>();
/**
* Try to cancel a delivery task.
@@ -139,11 +139,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
if (!failed) {
failed = true;
if (failduration == 0) {
- failduration = dqh.getInitFailureTimer();
+ if (destinationInfo.isPrivilegedSubscriber()) {
+ failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
+ } else{
+ failduration = deliveryQueueHelper.getInitFailureTimer();
+ }
}
resumetime = System.currentTimeMillis() + failduration;
- long maxdur = dqh.getMaxFailureTimer();
- failduration = (long) (failduration * dqh.getFailureBackoff());
+ long maxdur = deliveryQueueHelper.getMaxFailureTimer();
+ failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
if (failduration > maxdur) {
failduration = maxdur;
}
@@ -184,7 +188,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
public synchronized DeliveryTask peekNext() {
long now = System.currentTimeMillis();
- long mindate = now - dqh.getExpirationTimer();
+ long mindate = now - deliveryQueueHelper.getExpirationTimer();
if (failed) {
if (now > resumetime) {
failed = false;
@@ -246,32 +250,32 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
/**
* Create a delivery queue for a given destination info
*/
- public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {
- this.dqh = dqh;
- this.di = di;
- dir = new File(di.getSpool());
+ public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+ this.deliveryQueueHelper = deliveryQueueHelper;
+ this.destinationInfo = destinationInfo;
+ dir = new File(destinationInfo.getSpool());
dir.mkdirs();
}
/**
* Update the destination info for this delivery queue
*/
- public void config(DestInfo di) {
- this.di = di;
+ public void config(DestInfo destinationInfo) {
+ this.destinationInfo = destinationInfo;
}
/**
* Get the dest info
*/
- public DestInfo getDestInfo() {
- return (di);
+ public DestInfo getDestinationInfo() {
+ return (destinationInfo);
}
/**
* Get the config manager
*/
public DeliveryQueueHelper getConfig() {
- return (dqh);
+ return (deliveryQueueHelper);
}
/**
@@ -294,22 +298,26 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
if (status < 300) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);
- markSuccess(task);
- } else if (status < 400 && dqh.isFollowRedirects()) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
- if (dqh.handleRedirection(di, location, task.getFileId())) {
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+ if (destinationInfo.isPrivilegedSubscriber()) {
+ markFailWithRetry(task);
+ } else {
+ markSuccess(task);
+ }
+ } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+ if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
markRedirect(task);
} else {
StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
}
- } else if (status < 500) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+ } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
} else {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
markFailWithRetry(task);
}
}
@@ -318,8 +326,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
* Delivery failed by reason of an exception
*/
public void reportException(DeliveryTask task, Exception exception) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());
- dqh.handleUnreachable(di);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
+ deliveryQueueHelper.handleUnreachable(destinationInfo);
markFailWithRetry(task);
}
@@ -330,14 +338,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
* @return The feed ID
*/
public String getFeedId(String subid) {
- return (dqh.getFeedId(subid));
+ return (deliveryQueueHelper.getFeedId(subid));
}
/**
* Get the URL to deliver a message to given the file ID
*/
public String getDestURL(String fileid) {
- return (dqh.getDestURL(di, fileid));
+ return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
}
/**
@@ -346,8 +354,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
public void run() {
DeliveryTask t;
- long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();
- int filestogo = dqh.getFairFileLimit();
+ long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
+ int filestogo = deliveryQueueHelper.getFairFileLimit();
while ((t = getNext()) != null) {
t.run();
if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
@@ -369,4 +377,24 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
public void resetQueue() {
resumetime = System.currentTimeMillis();
}
+
+ /**
+ * Get task if in queue and mark as success
+ */
+ public boolean markTaskSuccess(String pubId) {
+ DeliveryTask task = working.get(pubId);
+ if (task != null) {
+ markSuccess(task);
+ return true;
+ }
+ task = retry.get(pubId);
+ if (task != null) {
+ retry.remove(pubId);
+ task.clean();
+ resumetime = 0;
+ failduration = 0;
+ return true;
+ }
+ return false;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
index b1734cd4..5cf5fa4c 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
@@ -34,64 +34,69 @@ public interface DeliveryQueueHelper {
/**
* Get the timeout (milliseconds) before retrying after an initial delivery failure
*/
- public long getInitFailureTimer();
+ long getInitFailureTimer();
+
+ /**
+ * Get the timeout before retrying after delivery and wait for file processing
+ */
+ long getWaitForFileProcessFailureTimer();
/**
* Get the ratio between timeouts on consecutive delivery attempts
*/
- public double getFailureBackoff();
+ double getFailureBackoff();
/**
* Get the maximum timeout (milliseconds) between delivery attempts
*/
- public long getMaxFailureTimer();
+ long getMaxFailureTimer();
/**
* Get the expiration timer (milliseconds) for deliveries
*/
- public long getExpirationTimer();
+ long getExpirationTimer();
/**
* Get the maximum number of file delivery attempts before checking
* if another queue has work to be performed.
*/
- public int getFairFileLimit();
+ int getFairFileLimit();
/**
* Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
*/
- public long getFairTimeLimit();
+ long getFairTimeLimit();
/**
* Get the URL for delivering a file
*
- * @param dest The destination information for the file to be delivered.
+ * @param destinationInfo The destination information for the file to be delivered.
* @param fileid The file id for the file to be delivered.
- * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid).
+ * @return The URL for delivering the file (typically, destinationInfo.getURL() + "/" + fileid).
*/
- public String getDestURL(DestInfo dest, String fileid);
+ String getDestURL(DestInfo destinationInfo, String fileid);
/**
* Forget redirections associated with a subscriber
*
- * @param dest Destination information to forget
+ * @param destinationInfo Destination information to forget
*/
- public void handleUnreachable(DestInfo dest);
+ void handleUnreachable(DestInfo destinationInfo);
/**
* Post redirection for a subscriber
*
- * @param dest Destination information to update
+ * @param destinationInfo Destination information to update
* @param location Location given by subscriber
* @param fileid File ID of request
* @return true if this 3xx response is retryable, otherwise, false.
*/
- public boolean handleRedirection(DestInfo dest, String location, String fileid);
+ boolean handleRedirection(DestInfo destinationInfo, String location, String fileid);
/**
* Should I handle 3xx responses differently than 4xx responses?
*/
- public boolean isFollowRedirects();
+ boolean isFollowRedirects();
/**
* Get the feed ID for a subscription
@@ -99,5 +104,5 @@ public interface DeliveryQueueHelper {
* @param subid The subscription ID
* @return The feed ID
*/
- public String getFeedId(String subid);
+ String getFeedId(String subid);
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
index 80729905..b2c31691 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
@@ -47,9 +47,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
private static EELFLogger eelflogger = EELFManager.getInstance()
.getLogger(DeliveryTask.class);
- private DeliveryTaskHelper dth;
+ private DeliveryTaskHelper deliveryTaskHelper;
private String pubid;
- private DestInfo di;
+ private DestInfo destInfo;
private String spool;
private File datafile;
private File metafile;
@@ -69,25 +69,25 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
/**
* Create a delivery task for a given delivery queue and pub ID
*
- * @param dth The delivery task helper for the queue this task is in.
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
* @param pubid The publish ID for this file. This is used as
* the base for the file name in the spool directory and is of
* the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
- public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
- this.dth = dth;
+ public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+ this.deliveryTaskHelper = deliveryTaskHelper;
this.pubid = pubid;
- di = dth.getDestInfo();
- subid = di.getSubId();
- feedid = di.getLogData();
- spool = di.getSpool();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ subid = destInfo.getSubId();
+ feedid = destInfo.getLogData();
+ spool = destInfo.getSpool();
String dfn = spool + "/" + pubid;
String mfn = dfn + ".M";
datafile = new File(spool + "/" + pubid);
metafile = new File(mfn);
- boolean monly = di.isMetaDataOnly();
+ boolean monly = destInfo.isMetaDataOnly();
date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
- Vector<String[]> hdrv = new Vector<String[]>();
+ Vector<String[]> hdrv = new Vector<>();
try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
String s = br.readLine();
@@ -104,7 +104,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
String v = s.substring(i + 1);
if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
subid = v.replaceAll("[^ ]*/", "");
- feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+ feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
}
if (length == 0 && h.toLowerCase().startsWith("content-")) {
continue;
@@ -126,7 +126,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
}
hdrs = hdrv.toArray(new String[hdrv.size()][]);
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
}
/**
* Is the object a DeliveryTask with the same publication ID?
@@ -171,14 +171,14 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
public void run() {
attempts++;
try {
- di = dth.getDestInfo();
- boolean expect100 = di.isUsing100();
- boolean monly = di.isMetaDataOnly();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ boolean expect100 = destInfo.isUsing100();
+ boolean monly = destInfo.isMetaDataOnly();
length = 0;
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
URL u = new URL(url);
HttpURLConnection uc = (HttpURLConnection) u.openConnection();
uc.setConnectTimeout(60000);
@@ -186,7 +186,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
uc.setInstanceFollowRedirects(false);
uc.setRequestMethod(method);
uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", di.getAuth());
+ uc.setRequestProperty("Authorization", destInfo.getAuth());
uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
for (String[] nv : hdrs) {
uc.addRequestProperty(nv[0], nv[1]);
@@ -201,7 +201,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
try {
os = uc.getOutputStream();
} catch (ProtocolException pe) {
- dth.reportDeliveryExtra(this, -1L);
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
// Rcvd error instead of 100-continue
loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
}
@@ -223,7 +223,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
os.close();
} catch (IOException ioe) {
- dth.reportDeliveryExtra(this, sofar);
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
throw ioe;
}
}
@@ -257,10 +257,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
is.close();
}
- dth.reportStatus(this, rc, xpubid, rmsg);
+ deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
- dth.reportException(this, e);
+ deliveryTaskHelper.reportException(this, e);
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
index 932b792a..d4ac8bd6 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
@@ -38,7 +38,7 @@ public interface DeliveryTaskHelper {
* @param task The task that failed
* @param exception The exception that occurred
*/
- public void reportException(DeliveryTask task, Exception exception);
+ void reportException(DeliveryTask task, Exception exception);
/**
* Report that a delivery attempt completed (successfully or unsuccessfully)
@@ -48,7 +48,7 @@ public interface DeliveryTaskHelper {
* @param xpubid The publish ID from the far end (if any)
* @param location The redirection location for a 3XX response
*/
- public void reportStatus(DeliveryTask task, int status, String xpubid, String location);
+ void reportStatus(DeliveryTask task, int status, String xpubid, String location);
/**
* Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
@@ -56,14 +56,14 @@ public interface DeliveryTaskHelper {
* @param task The task that failed
* @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
*/
- public void reportDeliveryExtra(DeliveryTask task, long sent);
+ void reportDeliveryExtra(DeliveryTask task, long sent);
/**
* Get the destination information for the delivery queue
*
* @return The destination information
*/
- public DestInfo getDestInfo();
+ DestInfo getDestinationInfo();
/**
* Given a file ID, get the URL to deliver to
@@ -71,7 +71,7 @@ public interface DeliveryTaskHelper {
* @param fileid The file id
* @return The URL to deliver to
*/
- public String getDestURL(String fileid);
+ String getDestURL(String fileid);
/**
* Get the feed ID for a subscription
@@ -79,5 +79,5 @@ public interface DeliveryTaskHelper {
* @param subid The subscription ID
* @return The feed iD
*/
- public String getFeedId(String subid);
+ String getFeedId(String subid);
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
index 12253314..c3e0057c 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
@@ -37,6 +37,7 @@ public class DestInfo {
private String authentication;
private boolean metaonly;
private boolean use100;
+ private boolean privilegedSubscriber;
/**
* Create a destination information object.
@@ -50,8 +51,9 @@ public class DestInfo {
* @param authentication The credentials.
* @param metaonly Is this a metadata only delivery?
* @param use100 Should I use expect 100-continue?
+ * @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file
*/
- public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) {
+ public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) {
this.name = name;
this.spool = spool;
this.subid = subid;
@@ -61,6 +63,27 @@ public class DestInfo {
this.authentication = authentication;
this.metaonly = metaonly;
this.use100 = use100;
+ this.privilegedSubscriber = privilegedSubscriber;
+ }
+
+ /**
+ * Create a destination information object.
+ *
+ * @param name n:fqdn or s:subid
+ * @param spool The directory where files are spooled.
+ * @param subscription The subscription.
+ */
+ public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
+ this.name = name;
+ this.spool = spool;
+ this.subid = subscription.getSubId();
+ this.logdata = subscription.getFeedId();
+ this.url = subscription.getURL();
+ this.authuser = subscription.getAuthUser();
+ this.authentication = subscription.getCredentials();
+ this.metaonly = subscription.isMetaDataOnly();
+ this.use100 = subscription.isUsing100();
+ this.privilegedSubscriber = subscription.isPrivilegedSubscriber();
}
public boolean equals(Object o) {
@@ -150,4 +173,11 @@ public class DestInfo {
public boolean isUsing100() {
return (use100);
}
+
+ /**
+ * Should we wait to receive a file processed acknowledgement before deleting file
+ */
+ public boolean isPrivilegedSubscriber() {
+ return (privilegedSubscriber);
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
index 1d5f76f6..ff803afc 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
@@ -57,6 +57,10 @@ public class LogManager extends TimerTask {
return (10000L);
}
+ public long getWaitForFileProcessFailureTimer() {
+ return (600000L);
+ }
+
public double getFailureBackoff() {
return (2.0);
}
@@ -77,14 +81,14 @@ public class LogManager extends TimerTask {
return (86400000);
}
- public String getDestURL(DestInfo dest, String fileid) {
+ public String getDestURL(DestInfo destinationInfo, String fileid) {
return (config.getEventLogUrl());
}
- public void handleUnreachable(DestInfo dest) {
+ public void handleUnreachable(DestInfo destinationInfo) {
}
- public boolean handleRedirection(DestInfo dest, String location, String fileid) {
+ public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
return (false);
}
@@ -101,7 +105,7 @@ public class LogManager extends TimerTask {
public Uploader() {
dq = new DeliveryQueue(this,
new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false,
- false));
+ false, false));
setDaemon(true);
setName("Log Uploader");
start();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
index c40d29c3..d3d3d01b 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
@@ -231,6 +231,7 @@ public class NodeConfig {
private String credentials;
private boolean metaonly;
private boolean use100;
+ private boolean privilegedSubscriber;
/**
* Construct a subscription configuration entry
@@ -243,9 +244,10 @@ public class NodeConfig {
* Authorization header.
* @param metaonly Is this a meta data only subscription?
* @param use100 Should we send Expect: 100-continue?
+ * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
*/
public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
- boolean metaonly, boolean use100) {
+ boolean metaonly, boolean use100, boolean privilegedSubscriber) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
@@ -253,6 +255,7 @@ public class NodeConfig {
this.credentials = credentials;
this.metaonly = metaonly;
this.use100 = use100;
+ this.privilegedSubscriber = privilegedSubscriber;
}
/**
@@ -303,6 +306,13 @@ public class NodeConfig {
public boolean isUsing100() {
return (use100);
}
+
+ /**
+ * Can we wait to receive a delete file call before deleting file
+ */
+ public boolean isPrivilegedSubscriber() {
+ return (privilegedSubscriber);
+ }
}
/**
@@ -462,11 +472,12 @@ public class NodeConfig {
Target[] targets;
}
- private Hashtable<String, String> params = new Hashtable<String, String>();
- private Hashtable<String, Feed> feeds = new Hashtable<String, Feed>();
- private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>();
- private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>();
- private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>();
+ private Hashtable<String, String> params = new Hashtable<>();
+ private Hashtable<String, Feed> feeds = new Hashtable<>();
+ private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
+ private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
+ private Hashtable<String, IsFrom> nodes = new Hashtable<>();
+ private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
private String myname;
private String myauth;
private DestInfo[] alldests;
@@ -486,7 +497,7 @@ public class NodeConfig {
for (ProvParam p : pd.getParams()) {
params.put(p.getName(), p.getValue());
}
- Vector<DestInfo> div = new Vector<DestInfo>();
+ Vector<DestInfo> destInfos = new Vector<>();
myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
for (ProvNode pn : pd.getNodes()) {
String cn = pn.getCName();
@@ -495,9 +506,9 @@ public class NodeConfig {
}
String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
- "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true);
+ "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false);
(new File(di.getSpool())).mkdirs();
- div.add(di);
+ destInfos.add(di);
nodeinfo.put(cn, di);
nodes.put(auth, new IsFrom(cn));
}
@@ -533,7 +544,7 @@ public class NodeConfig {
}
egrtab.put(pfe.getSubId(), pfe.getNode());
}
- Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>();
+ Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
if (v == null) {
@@ -542,46 +553,47 @@ public class NodeConfig {
}
v.add(new SubnetMatcher(pfs.getCidr()));
}
- Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>();
- HashSet<String> allfeeds = new HashSet<String>();
+ Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
+ HashSet<String> allfeeds = new HashSet<>();
for (ProvFeed pfx : pd.getFeeds()) {
if (pfx.getStatus() == null) {
allfeeds.add(pfx.getId());
}
}
- for (ProvSubscription ps : pd.getSubscriptions()) {
- String sid = ps.getSubId();
- String fid = ps.getFeedId();
- if (!allfeeds.contains(fid)) {
+ for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+ String subId = provSubscription.getSubId();
+ String feedId = provSubscription.getFeedId();
+ if (!allfeeds.contains(feedId)) {
continue;
}
- if (subinfo.get(sid) != null) {
+ if (subinfo.get(subId) != null) {
continue;
}
int sididx = 999;
try {
- sididx = Integer.parseInt(sid);
+ sididx = Integer.parseInt(subId);
sididx -= sididx % 100;
} catch (Exception e) {
}
- String siddir = sididx + "/" + sid;
- DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(),
- ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100());
- (new File(di.getSpool())).mkdirs();
- div.add(di);
- subinfo.put(sid, di);
- String egr = egrtab.get(sid);
+ String subscriptionDirectory = sididx + "/" + subId;
+ DestInfo destinationInfo = new DestInfo("s:" + subId,
+ spooldir + "/s/" + subscriptionDirectory, provSubscription);
+ (new File(destinationInfo.getSpool())).mkdirs();
+ destInfos.add(destinationInfo);
+ provSubscriptions.put(subId, provSubscription);
+ subinfo.put(subId, destinationInfo);
+ String egr = egrtab.get(subId);
if (egr != null) {
- sid = pf.getPath(egr) + sid;
+ subId = pf.getPath(egr) + subId;
}
- StringBuffer sb = ttab.get(fid);
+ StringBuffer sb = feedTargets.get(feedId);
if (sb == null) {
sb = new StringBuffer();
- ttab.put(fid, sb);
+ feedTargets.put(feedId, sb);
}
- sb.append(' ').append(sid);
+ sb.append(' ').append(subId);
}
- alldests = div.toArray(new DestInfo[div.size()]);
+ alldests = destInfos.toArray(new DestInfo[destInfos.size()]);
for (ProvFeed pfx : pd.getFeeds()) {
String fid = pfx.getId();
Feed f = feeds.get(fid);
@@ -609,7 +621,7 @@ public class NodeConfig {
} else {
f.redirections = v2.toArray(new Redirection[v2.size()]);
}
- StringBuffer sb = ttab.get(fid);
+ StringBuffer sb = feedTargets.get(fid);
if (sb == null) {
f.targets = new Target[0];
} else {
@@ -712,6 +724,16 @@ public class NodeConfig {
}
/**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested.
+ */
+ public boolean isDeletePermitted(String subId) {
+ ProvSubscription provSubscription = provSubscriptions.get(subId);
+ return provSubscription.isPrivilegedSubscriber();
+ }
+
+ /**
* Get authenticated user
*/
public String getAuthUser(String feedid, String credentials) {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
index 474f5dde..d98c47ae 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
@@ -57,6 +57,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
private Timer timer = new Timer("Node Configuration Timer", true);
private long maxfailuretimer;
private long initfailuretimer;
+ private long waitForFileProcessFailureTimer;
private long expirationtimer;
private double failurebackoff;
private long fairtimelimit;
@@ -187,6 +188,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
initfailuretimer = 10000;
+ waitForFileProcessFailureTimer = 600000;
maxfailuretimer = 3600000;
expirationtimer = 86400000;
failurebackoff = 2.0;
@@ -200,6 +202,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
} catch (Exception e) {
}
try {
+ waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+ } catch (Exception e) {
+ }
+ try {
maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
}
@@ -329,6 +335,16 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested
+ * @return True if the delete file is permitted for the subscriber.
+ */
+ public boolean isDeletePermitted(String subId) {
+ return (config.isDeletePermitted(subId));
+ }
+
+ /**
* Check who the user is given the feed ID and the offered credentials.
*
* @param feedid The ID of the feed specified
@@ -407,13 +423,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Get the URL to deliver a message to.
*
- * @param destinfo The destination information
+ * @param destinationInfo The destination information
* @param fileid The file ID
* @return The URL to deliver to
*/
- public String getDestURL(DestInfo destinfo, String fileid) {
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
+ public String getDestURL(DestInfo destinationInfo, String fileid) {
+ String subid = destinationInfo.getSubId();
+ String purl = destinationInfo.getURL();
if (followredirects && subid != null) {
purl = rdmgr.lookup(subid, purl);
}
@@ -430,10 +446,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Set up redirection on receipt of a 3XX from a target URL
*/
- public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {
+ public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
fileid = "/" + fileid;
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
+ String subid = destinationInfo.getSubId();
+ String purl = destinationInfo.getURL();
if (followredirects && subid != null && redirto.endsWith(fileid)) {
redirto = redirto.substring(0, redirto.length() - fileid.length());
if (!redirto.equals(purl)) {
@@ -447,8 +463,8 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Handle unreachable target URL
*/
- public void handleUnreachable(DestInfo destinfo) {
- String subid = destinfo.getSubId();
+ public void handleUnreachable(DestInfo destinationInfo) {
+ String subid = destinationInfo.getSubId();
if (followredirects && subid != null) {
rdmgr.forget(subid);
}
@@ -462,6 +478,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
+ * Get the timeout before retrying after delivery and wait for file processing
+ */
+ public long getWaitForFileProcessFailureTimer() {
+ return (waitForFileProcessFailureTimer);
+ }
+
+ /**
* Get the maximum timeout between delivery attempts
*/
public long getMaxFailureTimer() {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
index e07642c4..d25531a7 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
@@ -145,7 +145,7 @@ public class NodeMain {
ctxt = new ServletContextHandler(0);
ctxt.setContextPath("/");
server.setHandler(ctxt);
- ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*");
+ ctxt.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
nodeMainLogger.info("NODE0005 Data Router Node Activating Service");
server.start();
server.join();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
index 7e1d46d3..fae2c1f6 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
@@ -40,11 +40,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.regex.Pattern;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
@@ -67,6 +67,7 @@ public class NodeServlet extends HttpServlet {
//Adding EELF Logger Rally:US664892
private static EELFLogger eelflogger = EELFManager.getInstance()
.getLogger(NodeServlet.class);
+ private Delivery delivery;
static {
final String ws = "\\s*";
@@ -80,6 +81,10 @@ public class NodeServlet extends HttpServlet {
MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
}
+ NodeServlet(Delivery delivery) {
+ this.delivery = delivery;
+ }
+
/**
* Get the NodeConfigurationManager
*/
@@ -155,16 +160,13 @@ public class NodeServlet extends HttpServlet {
} catch (IOException ioe) {
logger.error("IOException" + ioe.getMessage());
eelflogger.info(EelfMsgs.EXIT);
- } catch (ServletException se) {
- logger.error("ServletException" + se.getMessage());
- eelflogger.info(EelfMsgs.EXIT);
}
}
/**
* Handle all DELETE requests
*/
- protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doDelete(HttpServletRequest req, HttpServletResponse resp) {
NodeUtils.setIpAndFqdnForEelf("doDelete");
NodeUtils.setRequestIdAndInvocationId(req);
eelflogger.info(EelfMsgs.ENTRY);
@@ -173,40 +175,26 @@ public class NodeServlet extends HttpServlet {
try {
common(req, resp, false);
} catch (IOException ioe) {
- logger.error("IOException" + ioe.getMessage());
- eelflogger.info(EelfMsgs.EXIT);
- } catch (ServletException se) {
- logger.error("ServletException" + se.getMessage());
+ logger.error("IOException " + ioe.getMessage());
eelflogger.info(EelfMsgs.EXIT);
}
-
}
- private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput)
- throws ServletException, IOException {
- if (down(resp)) {
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
- if (!req.isSecure()) {
- logger.info(
- "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
- .getRemoteAddr());
- resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
- String fileid = req.getPathInfo();
- if (fileid == null) {
- logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
- .getRemoteAddr());
- resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
+ private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+ String fileid = getFileId(req, resp);
+ if (fileid == null) return;
String feedid = null;
String user = null;
+ String ip = req.getRemoteAddr();
+ String lip = req.getLocalAddr();
+ String pubid = null;
+ String xpubid = null;
+ String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
+ Target[] targets = null;
+ if (fileid.startsWith("/delete/")) {
+ deleteFile(req, resp, fileid, pubid);
+ return;
+ }
String credentials = req.getHeader("Authorization");
if (credentials == null) {
logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
@@ -215,12 +203,6 @@ public class NodeServlet extends HttpServlet {
eelflogger.info(EelfMsgs.EXIT);
return;
}
- String ip = req.getRemoteAddr();
- String lip = req.getLocalAddr();
- String pubid = null;
- String xpubid = null;
- String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
- Target[] targets = null;
if (fileid.startsWith("/publish/")) {
fileid = fileid.substring(9);
int i = fileid.indexOf('/');
@@ -315,8 +297,8 @@ public class NodeServlet extends HttpServlet {
mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
Enumeration hnames = req.getHeaderNames();
String ctype = null;
- Boolean hasRequestIdHeader = false;
- Boolean hasInvocationIdHeader = false;
+ boolean hasRequestIdHeader = false;
+ boolean hasInvocationIdHeader = false;
while (hnames.hasMoreElements()) {
String hn = (String) hnames.nextElement();
String hnlc = hn.toLowerCase();
@@ -449,6 +431,90 @@ public class NodeServlet extends HttpServlet {
}
}
+ private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+ try {
+ fileid = fileid.substring(8);
+ int i = fileid.indexOf('/');
+ if (i == -1 || i == fileid.length() - 1) {
+ logger.info("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <subId>/<pubId>.");
+ eelflogger.info(EelfMsgs.EXIT);
+ return;
+ }
+ String subscriptionId = fileid.substring(0, i);
+ int subId = Integer.parseInt(subscriptionId);
+ pubid = fileid.substring(i + 1);
+ String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName() + ".";
+ int subIdDir = subId - (subId % 100);
+ if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
+ return;
+ }
+ boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
+ if (result) {
+ logger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName());
+ resp.setStatus(HttpServletResponse.SC_OK);
+ eelflogger.info(EelfMsgs.EXIT);
+ } else {
+ logger.error("NODE0116 " + errorMessage);
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "File not found on server.");
+ eelflogger.info(EelfMsgs.EXIT);
+ }
+ } catch (IOException ioe) {
+ logger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName() + ". Error: " + ioe.getMessage());
+ eelflogger.info(EelfMsgs.EXIT);
+ }
+ }
+
+ @Nullable
+ private String getFileId(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ if (down(resp)) {
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ if (!req.isSecure()) {
+ logger.info(
+ "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ String fileid = req.getPathInfo();
+ if (fileid == null) {
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ return fileid;
+ }
+
+ private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+ try {
+ boolean deletePermitted = config.isDeletePermitted(subscriptionId);
+ if (!deletePermitted) {
+ logger.error("NODE0113 " + errorMessage + " Error: Subscription "
+ + subscriptionId + " is not a privileged subscription");
+ resp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ eelflogger.info(EelfMsgs.EXIT);
+ return false;
+ }
+ } catch (NullPointerException npe) {
+ logger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND);
+ eelflogger.info(EelfMsgs.EXIT);
+ return false;
+ }
+ return true;
+ }
+
private int getIdFromPath(HttpServletRequest req) {
String path = req.getPathInfo();
if (path == null || path.length() < 2) {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
index f0b81747..765a4075 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
@@ -173,7 +173,8 @@ public class ProvData {
String password = gvas(jdel, "password");
boolean monly = jsub.getBoolean("metadataOnly");
boolean use100 = jdel.getBoolean("use100");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100));
+ boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+ psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber));
}
}
JSONObject jparams = jcfg.optJSONObject("parameters");