summaryrefslogtreecommitdiffstats
path: root/datarouter-node/src
diff options
context:
space:
mode:
Diffstat (limited to 'datarouter-node/src')
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java54
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java219
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java264
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java298
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java27
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java150
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java149
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java33
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java141
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java895
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java111
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java157
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java233
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java82
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java125
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java52
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java478
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java4
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java42
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java55
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java260
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java1
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java3
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java50
-rw-r--r--datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java2
-rw-r--r--datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java2
26 files changed, 2123 insertions, 1764 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
index 991d8660..245dbccd 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
@@ -17,23 +17,24 @@
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.onap.aaf.cadi.PropAccess;
-import org.onap.aaf.cadi.filter.CadiFilter;
-
+import java.io.IOException;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
+import org.onap.aaf.cadi.PropAccess;
+import org.onap.aaf.cadi.filter.CadiFilter;
public class DRNodeCadiFilter extends CadiFilter {
+
private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeServlet.class);
DRNodeCadiFilter(boolean init, PropAccess access) throws ServletException {
@@ -41,23 +42,16 @@ public class DRNodeCadiFilter extends CadiFilter {
}
@Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String path = httpRequest.getPathInfo();
if (!(path.startsWith("/internal"))) {
- if (!(httpRequest.getMethod().equalsIgnoreCase("POST"))) {
- if (httpRequest.getMethod().equalsIgnoreCase("DELETE") && path.startsWith("/delete")) {
+ if (!("POST".equalsIgnoreCase(httpRequest.getMethod()))) {
+ if ("DELETE".equalsIgnoreCase(httpRequest.getMethod()) && path.startsWith("/delete")) {
chain.doFilter(request, response);
} else {
- String feedId = getFeedId(request, response);
- String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId);
- if (aafDbInstance != null && !aafDbInstance.equals("") && !aafDbInstance.equalsIgnoreCase("legacy")) {
- logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance);
- super.doFilter(request, response, chain);
- } else {
- logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed");
- chain.doFilter(request, response);
- }
+ doFilterWithFeedId(request, response, chain);
}
}
} else {
@@ -72,7 +66,8 @@ public class DRNodeCadiFilter extends CadiFilter {
if (fileid == null) {
logger.error("NODE0105 Rejecting bad URI for PUT " + req.getPathInfo() + " from " + req.getRemoteAddr());
try {
- resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
} catch (IOException e) {
logger.error("NODE0541 DRNodeCadiFilter.getFeedId: ", e);
}
@@ -82,19 +77,34 @@ public class DRNodeCadiFilter extends CadiFilter {
if (fileid.startsWith("/publish/")) {
fileid = fileid.substring(9);
- int i = fileid.indexOf('/');
- if (i == -1 || i == fileid.length() - 1) {
- logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req.getRemoteAddr());
+ int index = fileid.indexOf('/');
+ if (index == -1 || index == fileid.length() - 1) {
+ logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
try {
- resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid.");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. "
+ + "Possible missing fileid.");
} catch (IOException e) {
logger.error("NODE0542 DRNodeCadiFilter.getFeedId: ", e);
}
return null;
}
- feedid = fileid.substring(0, i);
+ feedid = fileid.substring(0, index);
}
return feedid;
}
+ private void doFilterWithFeedId(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+ String feedId = getFeedId(request, response);
+ String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId);
+ if (aafDbInstance != null && !"".equals(aafDbInstance) && !"legacy".equalsIgnoreCase(aafDbInstance)) {
+ logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance);
+ super.doFilter(request, response, chain);
+ } else {
+ logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed");
+ chain.doFilter(request, response);
+ }
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
index 4c21b342..df73c1e9 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
@@ -23,85 +23,37 @@
package org.onap.dmaap.datarouter.node;
-import java.util.*;
-import java.io.*;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Objects;
/**
* Main control point for delivering files to destinations.
- * <p>
- * The Delivery class manages assignment of delivery threads to delivery
- * queues and creation and destruction of delivery queues as
- * configuration changes. DeliveryQueues are assigned threads based on a
- * modified round-robin approach giving priority to queues with more work
- * as measured by both bytes to deliver and files to deliver and lower
- * priority to queues that already have delivery threads working.
- * A delivery thread continues to work for a delivery queue as long as
- * that queue has more files to deliver.
+ *
+ * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
+ * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
+ * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
+ * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
+ * queue as long as that queue has more files to deliver.
*/
public class Delivery {
- private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
-
- private static class DelItem implements Comparable<DelItem> {
- private String pubid;
- private String spool;
-
- public int compareTo(DelItem x) {
- int i = pubid.compareTo(x.pubid);
- if (i == 0) {
- i = spool.compareTo(x.spool);
- }
- return (i);
- }
-
- public String getPublishId() {
- return (pubid);
- }
-
- public String getSpool() {
- return (spool);
- }
-
- public DelItem(String pubid, String spool) {
- this.pubid = pubid;
- this.spool = spool;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DelItem delItem = (DelItem) o;
- return Objects.equals(pubid, delItem.pubid) &&
- Objects.equals(getSpool(), delItem.getSpool());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(pubid, getSpool());
- }
- }
+ private static final String TOTAL = " total=";
+ private static final String YELLOW = " yellow=";
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
private double fdstart;
private double fdstop;
private int threads;
private int curthreads;
private NodeConfigManager config;
- private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
+ private Hashtable<String, DeliveryQueue> dqs = new Hashtable<>();
private DeliveryQueue[] queues = new DeliveryQueue[0];
private int qpos = 0;
private long nextcheck;
- private Runnable cmon = new Runnable() {
- public void run() {
- checkconfig();
- }
- };
/**
* Constructs a new Delivery system using the specified configuration manager.
@@ -110,10 +62,37 @@ public class Delivery {
*/
public Delivery(NodeConfigManager config) {
this.config = config;
+ Runnable cmon = this::checkconfig;
config.registerConfigTask(cmon);
checkconfig();
}
+ /**
+ * Reset the retry timer for a delivery queue.
+ */
+ public synchronized void resetQueue(String spool) {
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ dq.resetQueue();
+ }
+ }
+ }
+
+ /**
+ * Mark the task in spool a success.
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
+
private void cleardir(String dir) {
if (dqs.get(dir) != null) {
return;
@@ -131,12 +110,11 @@ public class Delivery {
File spoolfile = new File(config.getSpoolBase());
long tspace = spoolfile.getTotalSpace();
long start = (long) (tspace * fdstart);
- long stop = (long) (tspace * fdstop);
long cur = spoolfile.getUsableSpace();
if (cur >= start) {
return;
}
- Vector<DelItem> cv = new Vector<DelItem>();
+ ArrayList<DelItem> cv = new ArrayList<>();
for (String sdir : dqs.keySet()) {
for (String meta : (new File(sdir)).list()) {
if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
@@ -147,27 +125,21 @@ public class Delivery {
}
DelItem[] items = cv.toArray(new DelItem[cv.size()]);
Arrays.sort(items);
- logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace);
- for (DelItem item : items) {
- long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
- logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
- if (amount > 0) {
- cur += amount;
- if (cur >= stop) {
- cur = spoolfile.getUsableSpace();
- }
- if (cur >= stop) {
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
- return;
- }
- }
+ long stop = (long) (tspace * fdstop);
+ logger.info(
+ "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace);
+ if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
+ return;
}
cur = spoolfile.getUsableSpace();
if (cur >= stop) {
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
+ logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
return;
}
- logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace);
+ logger.warn(
+ "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW
+ + stop + TOTAL + tspace);
}
private void cleardirs() {
@@ -206,7 +178,7 @@ public class Delivery {
DestInfo[] alldis = config.getAllDests();
DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
qpos = 0;
- Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
+ Hashtable<String, DeliveryQueue> ndqs = new Hashtable<>();
for (DestInfo di : alldis) {
String spl = di.getSpool();
DeliveryQueue dq = dqs.get(spl);
@@ -223,11 +195,8 @@ public class Delivery {
cleardirs();
while (curthreads < threads) {
curthreads++;
- (new Thread() {
- {
- setName("Delivery Thread");
- }
-
+ (new Thread("Delivery Thread") {
+ @Override
public void run() {
dodelivery();
}
@@ -276,29 +245,69 @@ public class Delivery {
}
}
- /**
- * Reset the retry timer for a delivery queue
- */
- public synchronized void resetQueue(String spool) {
- if (spool != null) {
- DeliveryQueue dq = dqs.get(spool);
- if (dq != null) {
- dq.resetQueue();
+ private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
+ for (DelItem item : items) {
+ long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
+ logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
+ + " to free up disk");
+ if (amount > 0) {
+ cur += amount;
+ if (cur >= stop) {
+ cur = spoolfile.getUsableSpace();
+ }
+ if (cur >= stop) {
+ logger.info(
+ "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
+ return true;
+ }
}
}
+ return false;
}
- /**
- * Mark the task in spool a success
- */
- public synchronized boolean markTaskSuccess(String spool, String pubId) {
- boolean succeeded = false;
- if (spool != null) {
- DeliveryQueue dq = dqs.get(spool);
- if (dq != null) {
- succeeded = dq.markTaskSuccess(pubId);
+ private static class DelItem implements Comparable<DelItem> {
+
+ private String pubid;
+ private String spool;
+
+ public DelItem(String pubid, String spool) {
+ this.pubid = pubid;
+ this.spool = spool;
+ }
+
+ public int compareTo(DelItem other) {
+ int diff = pubid.compareTo(other.pubid);
+ if (diff == 0) {
+ diff = spool.compareTo(other.spool);
}
+ return (diff);
+ }
+
+ public String getPublishId() {
+ return (pubid);
+ }
+
+ public String getSpool() {
+ return (spool);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ DelItem delItem = (DelItem) object;
+ return Objects.equals(pubid, delItem.pubid)
+ && Objects.equals(getSpool(), delItem.getSpool());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pubid, getSpool());
}
- return succeeded;
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
index 3d485878..8cdafd6f 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
@@ -26,46 +26,41 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Vector;
+import org.jetbrains.annotations.Nullable;
/**
* Mechanism for monitoring and controlling delivery of files to a destination.
- * <p>
- * The DeliveryQueue class maintains lists of DeliveryTasks for a single
- * destination (a subscription or another data router node) and assigns
- * delivery threads to try to deliver them. It also maintains a delivery
- * status that causes it to back off on delivery attempts after a failure.
- * <p>
- * If the most recent delivery result was a failure, then no more attempts
- * will be made for a period of time. Initially, and on the first failure
- * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
- * If, after this delay, additional failures occur, each failure will
- * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
- * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
- * Note that this behavior applies to the delivery queue as a whole and not
- * to individual files in the queue. If multiple files are being
- * delivered and one fails, the delay will be started. If a second
- * delivery fails while the delay was active, it will not change the delay
- * or change the duration of any subsequent delay.
- * If, however, it succeeds, it will cancel the delay.
- * <p>
- * The queue maintains 3 collections of files to deliver: A todo list of
- * files that will be attempted, a working set of files that are being
- * attempted, and a retry set of files that were attempted and failed.
- * Whenever the todo list is empty and needs to be refilled, a scan of the
- * spool directory is made and the file names sorted. Any files in the working set are ignored.
- * If a DeliveryTask for the file is in the retry set, then that delivery
- * task is placed on the todo list. Otherwise, a new DeliveryTask for the
- * file is created and placed on the todo list.
- * If, when a DeliveryTask is about to be removed from the todo list, its
- * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
- * marked as expired.
- * <p>
- * A delivery queue also maintains a skip flag. This flag is true if the
- * failure timer is active or if no files are found in a directory scan.
+ *
+ * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single destination (a subscription or another data
+ * router node) and assigns delivery threads to try to deliver them. It also maintains a delivery status that causes it
+ * to back off on delivery attempts after a failure.
+ *
+ * <p>If the most recent delivery result was a failure, then no more attempts will be made for a period of time.
+ * Initially, and on the first failure following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer()
+ * (milliseconds). If, after this delay, additional failures occur, each failure will multiply the delay by
+ * DeliveryQueueHelper.getFailureBackoff() up to a maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
+ * Note that this behavior applies to the delivery queue as a whole and not to individual files in the queue. If
+ * multiple files are being delivered and one fails, the delay will be started. If a second delivery fails while the
+ * delay was active, it will not change the delay or change the duration of any subsequent delay. If, however, it
+ * succeeds, it will cancel the delay.
+ *
+ * <p>The queue maintains 3 collections of files to deliver: A to do list of files that will be attempted, a working
+ * set of files that are being attempted, and a retry set of files that were attempted and failed. Whenever the to do
+ * list is empty and needs to be refilled, a scan of the spool directory is made and the file names sorted. Any files
+ * in the working set are ignored. If a DeliveryTask for the file is in the retry set, then that delivery task is placed
+ * on the to do list. Otherwise, a new DeliveryTask for the file is created and placed on the to do list. If, when a
+ * DeliveryTask is about to be removed from the to do list, its age exceeds DeliveryQueueHelper.getExpirationTimer(),
+ * then it is instead marked as expired.
+ *
+ * <p>A delivery queue also maintains a skip flag. This flag is true if the failure timer is active or if no files are
+ * found in a directory scan.
*/
public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
+
private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
private DeliveryQueueHelper deliveryQueueHelper;
private DestInfo destinationInfo;
@@ -79,6 +74,16 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
private Vector<DeliveryTask> todo = new Vector<>();
/**
+ * Create a delivery queue for a given destination info.
+ */
+ DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+ this.deliveryQueueHelper = deliveryQueueHelper;
+ this.destinationInfo = destinationInfo;
+ dir = new File(destinationInfo.getSpool());
+ dir.mkdirs();
+ }
+
+ /**
* Try to cancel a delivery task.
*
* @return The length of the task in bytes or 0 if the task cannot be cancelled.
@@ -106,7 +111,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
if (dt.isCleaned()) {
return (0);
}
- StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
+ StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(),
+ dt.getLength(), "diskFull", dt.getAttempts());
dt.clean();
return (dt.getLength());
}
@@ -144,7 +150,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
if (failduration == 0) {
if (destinationInfo.isPrivilegedSubscriber()) {
failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
- } else{
+ } else {
failduration = deliveryQueueHelper.getInitFailureTimer();
}
}
@@ -205,144 +211,106 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
todo = new Vector<>();
String[] files = dir.list();
Arrays.sort(files);
- for (String fname : files) {
- if (!fname.endsWith(".M")) {
- continue;
- }
- String fname2 = fname.substring(0, fname.length() - 2);
- long pidtime = 0;
- int dot = fname2.indexOf('.');
- if (dot < 1) {
- continue;
- }
- try {
- pidtime = Long.parseLong(fname2.substring(0, dot));
- } catch (Exception e) {
- logger.error("Exception", e);
- }
- if (pidtime < 1000000000000L) {
- continue;
- }
- if (working.get(fname2) != null) {
- continue;
- }
- DeliveryTask dt = retry.get(fname2);
- if (dt == null) {
- dt = new DeliveryTask(this, fname2);
- }
- todo.add(dt);
- }
+ scanForNextTask(files);
retry = new Hashtable<>();
}
- if (todoindex < todo.size()) {
- DeliveryTask dt = todo.get(todoindex);
- if (dt.isCleaned()) {
- todoindex++;
- continue;
- }
- if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
- retry.put(dt.getPublishId(), dt);
- todoindex++;
- continue;
- }
- if (dt.getDate() >= mindate) {
- return (dt);
- }
- todoindex++;
- reportExpiry(dt);
- continue;
+ DeliveryTask dt = getDeliveryTask(mindate);
+ if (dt != null) {
+ return dt;
}
- return (null);
+ return null;
}
}
/**
- * Create a delivery queue for a given destination info
- */
- DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
- this.deliveryQueueHelper = deliveryQueueHelper;
- this.destinationInfo = destinationInfo;
- dir = new File(destinationInfo.getSpool());
- dir.mkdirs();
- }
-
- /**
- * Update the destination info for this delivery queue
+ * Update the destination info for this delivery queue.
*/
public void config(DestInfo destinationInfo) {
this.destinationInfo = destinationInfo;
}
/**
- * Get the dest info
+ * Get the dest info.
*/
public DestInfo getDestinationInfo() {
return (destinationInfo);
}
/**
- * Get the config manager
+ * Get the config manager.
*/
public DeliveryQueueHelper getConfig() {
return (deliveryQueueHelper);
}
/**
- * Exceptional condition occurred during delivery
+ * Exceptional condition occurred during delivery.
*/
public void reportDeliveryExtra(DeliveryTask task, long sent) {
StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
}
/**
- * Message too old to deliver
+ * Message too old to deliver.
*/
void reportExpiry(DeliveryTask task) {
- StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
+ StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
markExpired(task);
}
/**
- * Completed a delivery attempt
+ * Completed a delivery attempt.
*/
public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
if (status < 300) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
if (destinationInfo.isPrivilegedSubscriber()) {
- task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
+ task.setResumeTime(
+ System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
markFailWithRetry(task);
} else {
markSuccess(task);
}
} else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
markRedirect(task);
} else {
- StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
+ StatusLog
+ .logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
}
- } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
- StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
+ } else if (status < 500 && status != 429) {
+ // Status 429 is the standard response for Too Many Requests and indicates
+ // that a file needs to be delivered again at a later time.
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+ StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
} else {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
markFailWithRetry(task);
}
}
/**
- * Delivery failed by reason of an exception
+ * Delivery failed by reason of an exception.
*/
public void reportException(DeliveryTask task, Exception exception) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+ task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
deliveryQueueHelper.handleUnreachable(destinationInfo);
markFailWithRetry(task);
}
/**
- * Get the feed ID for a subscription
+ * Get the feed ID for a subscription.
*
* @param subid The subscription ID
* @return The feed ID
@@ -352,22 +320,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
}
/**
- * Get the URL to deliver a message to given the file ID
+ * Get the URL to deliver a message to given the file ID.
*/
public String getDestURL(String fileid) {
return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
}
/**
- * Deliver files until there's a failure or there are no more
- * files to deliver
+ * Deliver files until there's a failure or there are no more files to deliver.
*/
public void run() {
- DeliveryTask t;
+ DeliveryTask task;
long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
int filestogo = deliveryQueueHelper.getFairFileLimit();
- while ((t = getNext()) != null) {
- t.run();
+ while ((task = getNext()) != null) {
+ task.run();
if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
break;
}
@@ -375,21 +342,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
}
/**
- * Is there no work to do for this queue right now?
+ * Is there no work to do for this queue right now.
*/
synchronized boolean isSkipSet() {
return (peekNext() == null);
}
/**
- * Reset the retry timer
+ * Reset the retry timer.
*/
void resetQueue() {
resumetime = System.currentTimeMillis();
}
/**
- * Get task if in queue and mark as success
+ * Get task if in queue and mark as success.
*/
boolean markTaskSuccess(String pubId) {
DeliveryTask task = working.get(pubId);
@@ -407,4 +374,63 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
}
return false;
}
+
+ private void scanForNextTask(String[] files) {
+ for (String fname : files) {
+ String pubId = getPubId(fname);
+ if (pubId == null) {
+ continue;
+ }
+ DeliveryTask dt = retry.get(pubId);
+ if (dt == null) {
+ dt = new DeliveryTask(this, pubId);
+ }
+ todo.add(dt);
+ }
+ }
+
+ @Nullable
+ private DeliveryTask getDeliveryTask(long mindate) {
+ if (todoindex < todo.size()) {
+ DeliveryTask dt = todo.get(todoindex);
+ if (dt.isCleaned()) {
+ todoindex++;
+ }
+ if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
+ retry.put(dt.getPublishId(), dt);
+ todoindex++;
+ }
+ if (dt.getDate() >= mindate) {
+ return (dt);
+ }
+ todoindex++;
+ reportExpiry(dt);
+ }
+ return null;
+ }
+
+ @Nullable
+ private String getPubId(String fname) {
+ if (!fname.endsWith(".M")) {
+ return null;
+ }
+ String fname2 = fname.substring(0, fname.length() - 2);
+ long pidtime = 0;
+ int dot = fname2.indexOf('.');
+ if (dot < 1) {
+ return null;
+ }
+ try {
+ pidtime = Long.parseLong(fname2.substring(0, dot));
+ } catch (Exception e) {
+ logger.error("Exception", e);
+ }
+ if (pidtime < 1000000000000L) {
+ return null;
+ }
+ if (working.get(fname2) != null) {
+ return null;
+ }
+ return fname2;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
index 018c3aff..2093d6d4 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
@@ -24,29 +24,39 @@
package org.onap.dmaap.datarouter.node;
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.zip.GZIPInputStream;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+import org.jetbrains.annotations.Nullable;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
-import static com.att.eelf.configuration.Configuration.*;
-import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
-
/**
* A file to be delivered to a destination.
- * <p>
- * A Delivery task represents a work item for the data router - a file that
- * needs to be delivered and provides mechanisms to get information about
- * the file and its delivery data as well as to attempt delivery.
+ *
+ * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
+ * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
*/
public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
- private static EELFLogger eelfLogger = EELFManager.getInstance()
- .getLogger(DeliveryTask.class);
+
+ private static final String DECOMPRESSION_STATUS = "Decompression_Status";
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
private DeliveryTaskHelper deliveryTaskHelper;
private String pubid;
private DestInfo destInfo;
@@ -72,9 +82,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
* Create a delivery task for a given delivery queue and pub ID
*
* @param deliveryTaskHelper The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as
- * the base for the file name in the spool directory and is of
- * the form <milliseconds since 1970>.<fqdn of initial data router node>
+ * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and
+ * is of the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
this.deliveryTaskHelper = deliveryTaskHelper;
@@ -91,40 +100,40 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
boolean monly = destInfo.isMetaDataOnly();
date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
resumeTime = System.currentTimeMillis();
- Vector<String[]> hdrv = new Vector<>();
+ ArrayList<String[]> hdrv = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
- String s = br.readLine();
- int i = s.indexOf('\t');
- method = s.substring(0, i);
+ String line = br.readLine();
+ int index = line.indexOf('\t');
+ method = line.substring(0, index);
NodeUtils.setIpAndFqdnForEelf(method);
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- fileid = s.substring(i + 1);
- while ((s = br.readLine()) != null) {
- i = s.indexOf('\t');
- String h = s.substring(0, i);
- String v = s.substring(i + 1);
- if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
- subid = v.replaceAll("[^ ]*/", "");
+ fileid = line.substring(index + 1);
+ while ((line = br.readLine()) != null) {
+ index = line.indexOf('\t');
+ String header = line.substring(0, index);
+ String headerValue = line.substring(index + 1);
+ if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
+ subid = headerValue.replaceAll("[^ ]*/", "");
feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
}
- if (length == 0 && h.toLowerCase().startsWith("content-")) {
+ if (length == 0 && header.toLowerCase().startsWith("content-")) {
continue;
}
- if (h.equalsIgnoreCase("content-type")) {
- ctype = v;
+ if ("content-type".equalsIgnoreCase(header)) {
+ ctype = headerValue;
}
- if (h.equalsIgnoreCase("x-onap-requestid")) {
- MDC.put(MDC_KEY_REQUEST_ID, v);
+ if ("x-onap-requestid".equalsIgnoreCase(header)) {
+ MDC.put(MDC_KEY_REQUEST_ID, headerValue);
}
- if (h.equalsIgnoreCase("x-invocationid")) {
- MDC.put("InvocationId", v);
- v = UUID.randomUUID().toString();
- newInvocationId = v;
+ if ("x-invocationid".equalsIgnoreCase(header)) {
+ MDC.put("InvocationId", headerValue);
+ headerValue = UUID.randomUUID().toString();
+ newInvocationId = headerValue;
}
- hdrv.add(new String[]{h, v});
+ hdrv.add(new String[]{header, headerValue});
}
} catch (Exception e) {
eelfLogger.error("Exception", e);
@@ -134,20 +143,20 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Is the object a DeliveryTask with the same publication ID?
+ * Is the object a DeliveryTask with the same publication ID.
*/
- public boolean equals(Object o) {
- if (!(o instanceof DeliveryTask)) {
+ public boolean equals(Object object) {
+ if (!(object instanceof DeliveryTask)) {
return (false);
}
- return (pubid.equals(((DeliveryTask) o).pubid));
+ return (pubid.equals(((DeliveryTask) object).pubid));
}
/**
* Compare the publication IDs.
*/
- public int compareTo(DeliveryTask o) {
- return (pubid.compareTo(o.pubid));
+ public int compareTo(DeliveryTask other) {
+ return (pubid.compareTo(other.pubid));
}
/**
@@ -165,79 +174,49 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Get the publish ID
+ * Get the publish ID.
*/
String getPublishId() {
return (pubid);
}
/**
- * Attempt delivery
+ * Attempt delivery.
*/
public void run() {
attempts++;
try {
destInfo = deliveryTaskHelper.getDestinationInfo();
- boolean expect100 = destInfo.isUsing100();
boolean monly = destInfo.isMetaDataOnly();
length = 0;
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
- fileid = fileid.replace(".gz", "");
- }
+ stripSuffixIfIsDecompress();
url = deliveryTaskHelper.getDestURL(fileid);
- URL u = new URL(url);
- HttpURLConnection uc = (HttpURLConnection) u.openConnection();
- uc.setConnectTimeout(60000);
- uc.setReadTimeout(60000);
- uc.setInstanceFollowRedirects(false);
- uc.setRequestMethod(method);
- uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", destInfo.getAuth());
- uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
- for (String[] nv : hdrs) {
- uc.addRequestProperty(nv[0], nv[1]);
- }
- if (length > 0) {
- if (expect100) {
- uc.setRequestProperty("Expect", "100-continue");
- }
- uc.setDoOutput(true);
- if (destInfo.isDecompress()) {
- if (isFiletypeGzip(datafile)) {
- sendDecompressedFile(uc);
- } else {
- uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
- sendFile(uc);
- }
- } else {
- sendFile(uc);
- }
- }
- int rc = uc.getResponseCode();
- String rmsg = uc.getResponseMessage();
- if (rmsg == null) {
- String h0 = uc.getHeaderField(0);
- if (h0 != null) {
- int i = h0.indexOf(' ');
- int j = h0.indexOf(' ', i + 1);
- if (i != -1 && j != -1) {
- rmsg = h0.substring(j + 1);
- }
- }
- }
+ URL urlObj = new URL(url);
+ HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
+ urlConnection.setConnectTimeout(60000);
+ urlConnection.setReadTimeout(60000);
+ urlConnection.setInstanceFollowRedirects(false);
+ urlConnection.setRequestMethod(method);
+ urlConnection.setRequestProperty("Content-Length", Long.toString(length));
+ urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
+ urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
+ boolean expect100 = destInfo.isUsing100();
+ int rc = deliverFileToSubscriber(expect100, urlConnection);
+ String rmsg = urlConnection.getResponseMessage();
+ rmsg = getResponseMessage(urlConnection, rmsg);
String xpubid = null;
InputStream is;
if (rc >= 200 && rc <= 299) {
- is = uc.getInputStream();
- xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
+ is = urlConnection.getInputStream();
+ xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
} else {
if (rc >= 300 && rc <= 399) {
- rmsg = uc.getHeaderField("Location");
+ rmsg = urlConnection.getHeaderField("Location");
}
- is = uc.getErrorStream();
+ is = urlConnection.getErrorStream();
}
byte[] buf = new byte[4096];
if (is != null) {
@@ -247,20 +226,19 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
- eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e);
+ eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
deliveryTaskHelper.reportException(this, e);
}
}
/**
- * To send decompressed gzip to the subscribers
+ * To send decompressed gzip to the subscribers.
*
* @param httpURLConnection connection used to make request
- * @throws IOException
*/
private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
byte[] buffer = new byte[8164];
- httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+ httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
OutputStream outputStream = getOutputStream(httpURLConnection);
if (outputStream != null) {
int bytesRead = 0;
@@ -271,7 +249,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
outputStream.close();
} catch (IOException e) {
- httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+ httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
eelfLogger.info("Could not decompress file", e);
sendFile(httpURLConnection);
}
@@ -283,44 +261,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
* To send any file to the subscriber.
*
* @param httpURLConnection connection used to make request
- * @throws IOException
*/
private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
OutputStream os = getOutputStream(httpURLConnection);
- if (os != null) {
- long sofar = 0;
- try (InputStream is = new FileInputStream(datafile)) {
- byte[] buf = new byte[1024 * 1024];
- while (sofar < length) {
- int i = buf.length;
- if (sofar + i > length) {
- i = (int) (length - sofar);
- }
- i = is.read(buf, 0, i);
- if (i <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += i;
- os.write(buf, 0, i);
+ if (os == null) {
+ return;
+ }
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int len = buf.length;
+ if (sofar + len > length) {
+ len = (int) (length - sofar);
+ }
+ len = is.read(buf, 0, len);
+ if (len <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
}
- os.close();
- } catch (IOException ioe) {
- deliveryTaskHelper.reportDeliveryExtra(this, sofar);
- throw ioe;
+ sofar += len;
+ os.write(buf, 0, len);
}
+ os.close();
+ } catch (IOException ioe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+ throw ioe;
}
}
/**
- * Get the outputstream that will be used to send data
+ * Get the outputstream that will be used to send data.
*
* @param httpURLConnection connection used to make request
* @return AN Outpustream that can be used to send your data.
- * @throws IOException
*/
private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
OutputStream outputStream = null;
-
try {
outputStream = httpURLConnection.getOutputStream();
} catch (ProtocolException pe) {
@@ -331,8 +307,52 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
return outputStream;
}
+ private void stripSuffixIfIsDecompress() {
+ if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
+ fileid = fileid.replace(".gz", "");
+ }
+ }
+
+ private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
+ for (String[] nv : hdrs) {
+ uc.addRequestProperty(nv[0], nv[1]);
+ }
+ if (length > 0) {
+ if (expect100) {
+ uc.setRequestProperty("Expect", "100-continue");
+ }
+ uc.setDoOutput(true);
+ if (destInfo.isDecompress()) {
+ if (isFiletypeGzip(datafile)) {
+ sendDecompressedFile(uc);
+ } else {
+ uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
+ sendFile(uc);
+ }
+ } else {
+ sendFile(uc);
+ }
+ }
+ return uc.getResponseCode();
+ }
+
+ @Nullable
+ private String getResponseMessage(HttpURLConnection uc, String rmsg) {
+ if (rmsg == null) {
+ String h0 = uc.getHeaderField(0);
+ if (h0 != null) {
+ int indexOfSpace1 = h0.indexOf(' ');
+ int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
+ if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
+ rmsg = h0.substring(indexOfSpace2 + 1);
+ }
+ }
+ }
+ return rmsg;
+ }
+
/**
- * Remove meta and data files
+ * Remove meta and data files.
*/
void clean() {
datafile.delete();
@@ -343,28 +363,28 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Set the resume time for a delivery task.
+ * Get the resume time for a delivery task.
*/
- void setResumeTime(long resumeTime) {
- this.resumeTime = resumeTime;
+ long getResumeTime() {
+ return resumeTime;
}
/**
- * Get the resume time for a delivery task.
+ * Set the resume time for a delivery task.
*/
- long getResumeTime() {
- return resumeTime;
+ void setResumeTime(long resumeTime) {
+ this.resumeTime = resumeTime;
}
/**
- * Has this delivery task been cleaned?
+ * Has this delivery task been cleaned.
*/
boolean isCleaned() {
return (hdrs == null);
}
/**
- * Get length of body
+ * Get length of body.
*/
public long getLength() {
return (length);
@@ -378,58 +398,58 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Get the most recent delivery attempt URL
+ * Get the most recent delivery attempt URL.
*/
public String getURL() {
return (url);
}
/**
- * Get the content type
+ * Get the content type.
*/
String getCType() {
return (ctype);
}
/**
- * Get the method
+ * Get the method.
*/
String getMethod() {
return (method);
}
/**
- * Get the file ID
+ * Get the file ID.
*/
String getFileId() {
return (fileid);
}
/**
- * Get the number of delivery attempts
+ * Get the number of delivery attempts.
*/
int getAttempts() {
return (attempts);
}
/**
- * Get the (space delimited list of) subscription ID for this delivery task
+ * Get the (space delimited list of) subscription ID for this delivery task.
*/
String getSubId() {
return (subid);
}
/**
- * Get the feed ID for this delivery task
+ * Get the feed ID for this delivery task.
*/
String getFeedId() {
return (feedid);
}
/**
- * Get the followRedirects for this delivery task
+ * Get the followRedirects for this delivery task.
*/
public boolean getFollowRedirects() {
- return(followRedirects);
+ return (followRedirects);
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
index d4ac8bd6..b9068f2f 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
@@ -26,32 +26,33 @@ package org.onap.dmaap.datarouter.node;
/**
* Interface to allow independent testing of the DeliveryTask code.
- * <p>
- * This interface represents all the configuraiton information and
- * feedback mechanisms that a delivery task needs.
+ *
+ * <p>This interface represents all the configuraiton information and feedback mechanisms that a delivery task needs.
*/
public interface DeliveryTaskHelper {
+
/**
- * Report that a delivery attempt failed due to an exception (like can't connect to remote host)
+ * Report that a delivery attempt failed due to an exception (like can't connect to remote host).
*
- * @param task The task that failed
+ * @param task The task that failed
* @param exception The exception that occurred
*/
void reportException(DeliveryTask task, Exception exception);
/**
- * Report that a delivery attempt completed (successfully or unsuccessfully)
+ * Report that a delivery attempt completed (successfully or unsuccessfully).
*
- * @param task The task that failed
- * @param status The HTTP status
- * @param xpubid The publish ID from the far end (if any)
+ * @param task The task that failed
+ * @param status The HTTP status
+ * @param xpubid The publish ID from the far end (if any)
* @param location The redirection location for a 3XX response
*/
void reportStatus(DeliveryTask task, int status, String xpubid, String location);
/**
- * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
+ * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100
+ * Continue.
*
* @param task The task that failed
* @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
@@ -59,14 +60,14 @@ public interface DeliveryTaskHelper {
void reportDeliveryExtra(DeliveryTask task, long sent);
/**
- * Get the destination information for the delivery queue
+ * Get the destination information for the delivery queue.
*
* @return The destination information
*/
DestInfo getDestinationInfo();
/**
- * Given a file ID, get the URL to deliver to
+ * Given a file ID, get the URL to deliver to.
*
* @param fileid The file id
* @return The URL to deliver to
@@ -74,7 +75,7 @@ public interface DeliveryTaskHelper {
String getDestURL(String fileid);
/**
- * Get the feed ID for a subscription
+ * Get the feed ID for a subscription.
*
* @param subid The subscription ID
* @return The feed iD
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
index 8890fe96..f5fa6e98 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
@@ -25,9 +25,10 @@
package org.onap.dmaap.datarouter.node;
/**
- * Information for a delivery destination that doesn't change from message to message
+ * Information for a delivery destination that doesn't change from message to message.
*/
public class DestInfo {
+
private String name;
private String spool;
private String subid;
@@ -40,114 +41,33 @@ public class DestInfo {
private boolean privilegedSubscriber;
private boolean decompress;
private boolean followRedirects;
- private String aafInstance;
-
- public static class DestInfoBuilder {
- private String name;
- private String spool;
- private String subid;
- private String logdata;
- private String url;
- private String authuser;
- private String authentication;
- private boolean metaonly;
- private boolean use100;
- private boolean privilegedSubscriber;
- private boolean followRedirects;
- private boolean decompress;
- private NodeConfig.ProvSubscription subscription;
-
- public DestInfoBuilder setName(String name) {
- this.name = name;
- return this;
- }
-
- public DestInfoBuilder setSpool(String spool) {
- this.spool = spool;
- return this;
- }
-
- public DestInfoBuilder setSubid(String subid) {
- this.subid = subid;
- return this;
- }
-
- public DestInfoBuilder setLogdata(String logdata) {
- this.logdata = logdata;
- return this;
- }
-
- public DestInfoBuilder setUrl(String url) {
- this.url = url;
- return this;
- }
-
- public DestInfoBuilder setAuthuser(String authuser) {
- this.authuser = authuser;
- return this;
- }
-
- public DestInfoBuilder setAuthentication(String authentication) {
- this.authentication = authentication;
- return this;
- }
-
- public DestInfoBuilder setMetaonly(boolean metaonly) {
- this.metaonly = metaonly;
- return this;
- }
-
- public DestInfoBuilder setUse100(boolean use100) {
- this.use100 = use100;
- return this;
- }
-
- public DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) {
- this.privilegedSubscriber = privilegedSubscriber;
- return this;
- }
-
- public DestInfoBuilder setFollowRedirects(boolean followRedirects) {
- this.followRedirects = followRedirects;
- return this;
- }
-
- public DestInfoBuilder setDecompress(boolean decompress) {
- this.decompress = decompress;
- return this;
- }
-
- public DestInfoBuilder setSubscription(NodeConfig.ProvSubscription subscription) {
- this.subscription = subscription;
- return this;
- }
-
- public DestInfo createDestInfo() {
- return new DestInfo(this);
- }
- }
+ /**
+ * Create a destination information object.
+ *
+ * @param destInfoBuilder DestInfo Object Builder
+ */
public DestInfo(DestInfoBuilder destInfoBuilder) {
- this.name = destInfoBuilder.name;
- this.spool = destInfoBuilder.spool;
- this.subid = destInfoBuilder.subid;
- this.logdata = destInfoBuilder.logdata;
- this.url = destInfoBuilder.url;
- this.authuser = destInfoBuilder.authuser;
- this.authentication = destInfoBuilder.authentication;
- this.metaonly = destInfoBuilder.metaonly;
- this.use100 = destInfoBuilder.use100;
- this.privilegedSubscriber = destInfoBuilder.privilegedSubscriber;
- this.followRedirects = destInfoBuilder.followRedirects;
- this.decompress = destInfoBuilder.decompress;
+ this.name = destInfoBuilder.getName();
+ this.spool = destInfoBuilder.getSpool();
+ this.subid = destInfoBuilder.getSubid();
+ this.logdata = destInfoBuilder.getLogdata();
+ this.url = destInfoBuilder.getUrl();
+ this.authuser = destInfoBuilder.getAuthuser();
+ this.authentication = destInfoBuilder.getAuthentication();
+ this.metaonly = destInfoBuilder.isMetaonly();
+ this.use100 = destInfoBuilder.isUse100();
+ this.privilegedSubscriber = destInfoBuilder.isPrivilegedSubscriber();
+ this.followRedirects = destInfoBuilder.isFollowRedirects();
+ this.decompress = destInfoBuilder.isDecompress();
}
/**
* Create a destination information object.
*
- * @param name n:fqdn or s:subid
- * @param spool The directory where files are spooled.
- * @param subscription The subscription.
+ * @param name n:fqdn or s:subid
+ * @param spool The directory where files are spooled.
+ * @param subscription The subscription.
*/
public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
this.name = name;
@@ -164,8 +84,8 @@ public class DestInfo {
this.decompress = subscription.isDecompress();
}
- public boolean equals(Object o) {
- return ((o instanceof DestInfo) && ((DestInfo) o).spool.equals(spool));
+ public boolean equals(Object object) {
+ return ((object instanceof DestInfo) && ((DestInfo) object).spool.equals(spool));
}
public int hashCode() {
@@ -173,7 +93,7 @@ public class DestInfo {
}
/**
- * Get the name of this destination
+ * Get the name of this destination.
*/
public String getName() {
return (name);
@@ -217,7 +137,7 @@ public class DestInfo {
}
/**
- * Get the user for authentication
+ * Get the user for authentication.
*
* @return The name of the user for logging
*/
@@ -226,7 +146,7 @@ public class DestInfo {
}
/**
- * Get the authentication header
+ * Get the authentication header.
*
* @return The string to use to authenticate to the recipient.
*/
@@ -235,7 +155,7 @@ public class DestInfo {
}
/**
- * Is this a metadata only delivery?
+ * Is this a metadata only delivery.
*
* @return True if this is a metadata only delivery
*/
@@ -244,7 +164,7 @@ public class DestInfo {
}
/**
- * Should I send expect 100-continue header?
+ * Should I send expect 100-continue header.
*
* @return True if I should.
*/
@@ -253,23 +173,23 @@ public class DestInfo {
}
/**
- * Should we wait to receive a file processed acknowledgement before deleting file
+ * Should we wait to receive a file processed acknowledgement before deleting file.
*/
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
/**
- * Should I follow redirects?
- *
- * @return True if I should.
- */
+ * Should I follow redirects.
+ *
+ * @return True if I should.
+ */
public boolean isFollowRedirects() {
return (followRedirects);
}
/**
- * Should i decompress the file before sending it on
+ * Should i decompress the file before sending it on.
*
* @return True if I should.
*/
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java
new file mode 100644
index 00000000..00c5cd8b
--- /dev/null
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java
@@ -0,0 +1,149 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.datarouter.node;
+
+public class DestInfoBuilder {
+
+ private String destInfoName;
+ private String destInfoSpool;
+ private String destInfoSubId;
+ private String destInfoLogData;
+ private String destInfoUrl;
+ private String destInfoAuthUser;
+ private String destInfoAuthentication;
+ private boolean destInfoMetaOnly;
+ private boolean destInfoUse100;
+ private boolean destInfoPrivilegedSubscriber;
+ private boolean destInfoFollowRedirects;
+ private boolean destInfoDecompress;
+
+ public String getName() {
+ return destInfoName;
+ }
+
+ public DestInfoBuilder setName(String name) {
+ this.destInfoName = name;
+ return this;
+ }
+
+ public String getSpool() {
+ return destInfoSpool;
+ }
+
+ public DestInfoBuilder setSpool(String spool) {
+ this.destInfoSpool = spool;
+ return this;
+ }
+
+ public String getSubid() {
+ return destInfoSubId;
+ }
+
+ public DestInfoBuilder setSubid(String subid) {
+ this.destInfoSubId = subid;
+ return this;
+ }
+
+ String getLogdata() {
+ return destInfoLogData;
+ }
+
+ DestInfoBuilder setLogdata(String logdata) {
+ this.destInfoLogData = logdata;
+ return this;
+ }
+
+ public String getUrl() {
+ return destInfoUrl;
+ }
+
+ public DestInfoBuilder setUrl(String url) {
+ this.destInfoUrl = url;
+ return this;
+ }
+
+ String getAuthuser() {
+ return destInfoAuthUser;
+ }
+
+ DestInfoBuilder setAuthuser(String authuser) {
+ this.destInfoAuthUser = authuser;
+ return this;
+ }
+
+ String getAuthentication() {
+ return destInfoAuthentication;
+ }
+
+ DestInfoBuilder setAuthentication(String authentication) {
+ this.destInfoAuthentication = authentication;
+ return this;
+ }
+
+ boolean isMetaonly() {
+ return destInfoMetaOnly;
+ }
+
+ DestInfoBuilder setMetaonly(boolean metaonly) {
+ this.destInfoMetaOnly = metaonly;
+ return this;
+ }
+
+ boolean isUse100() {
+ return destInfoUse100;
+ }
+
+ DestInfoBuilder setUse100(boolean use100) {
+ this.destInfoUse100 = use100;
+ return this;
+ }
+
+ boolean isPrivilegedSubscriber() {
+ return destInfoPrivilegedSubscriber;
+ }
+
+ DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) {
+ this.destInfoPrivilegedSubscriber = privilegedSubscriber;
+ return this;
+ }
+
+ boolean isFollowRedirects() {
+ return destInfoFollowRedirects;
+ }
+
+ DestInfoBuilder setFollowRedirects(boolean followRedirects) {
+ this.destInfoFollowRedirects = followRedirects;
+ return this;
+ }
+
+ boolean isDecompress() {
+ return destInfoDecompress;
+ }
+
+ DestInfoBuilder setDecompress(boolean decompress) {
+ this.destInfoDecompress = decompress;
+ return this;
+ }
+
+ DestInfo createDestInfo() {
+ return new DestInfo(this);
+ }
+} \ No newline at end of file
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
index 534b2b35..49852680 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
@@ -26,39 +26,40 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.IOException;
-import java.util.*;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
/**
- * Determine if an IP address is from a machine
+ * Determine if an IP address is from a machine.
*/
public class IsFrom {
+
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class);
private long nextcheck;
private String[] ips;
private String fqdn;
- private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class);
/**
- * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have any effect.
+ * Create an IsFrom for the specified fully qualified domain name.
*/
- public static void setDNSCache() {
- java.security.Security.setProperty("networkaddress.cache.ttl", "10");
+ public IsFrom(String fqdn) {
+ this.fqdn = fqdn;
}
/**
- * Create an IsFrom for the specified fully qualified domain name.
+ * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have
+ * any effect.
*/
- public IsFrom(String fqdn) {
- this.fqdn = fqdn;
+ public static void setDNSCache() {
+ java.security.Security.setProperty("networkaddress.cache.ttl", "10");
}
/**
- * Check if an IP address matches. If it has been more than
- * 10 seconds since DNS was last checked for changes to the
- * IP address(es) of this FQDN, check again. Then check
- * if the specified IP address belongs to the FQDN.
+ * Check if an IP address matches. If it has been more than 10 seconds since DNS was last checked for changes to
+ * the IP address(es) of this FQDN, check again. Then check if the specified IP address belongs to the FQDN.
*/
public synchronized boolean isFrom(String ip) {
long now = System.currentTimeMillis();
@@ -98,7 +99,7 @@ public class IsFrom {
}
/**
- * Return the fully qualified domain name
+ * Return the fully qualified domain name.
*/
public String toString() {
return (fqdn);
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
index 6ffb7604..3277408c 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
@@ -20,6 +20,7 @@
* * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* *
******************************************************************************/
+
package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
@@ -35,16 +36,18 @@ import java.util.Arrays;
import java.util.TimerTask;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.jetbrains.annotations.NotNull;
/**
* Cleanup of old log files.
- * <p>
- * Periodically scan the log directory for log files that are older than the log file retention interval, and delete
+ *
+ * <p>Periodically scan the log directory for log files that are older than the log file retention interval, and delete
* them. In a future release, This class will also be responsible for uploading events logs to the log server to
* support the log query APIs.
*/
public class LogManager extends TimerTask {
+
private EELFLogger logger = EELFManager.getInstance().getLogger(LogManager.class);
private NodeConfigManager config;
private Matcher isnodelog;
@@ -53,8 +56,55 @@ public class LogManager extends TimerTask {
private String uploaddir;
private String logdir;
+ /**
+ * Construct a log manager
+ *
+ * <p>The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary.
+ * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes).
+ */
+ public LogManager(NodeConfigManager config) {
+ this.config = config;
+ try {
+ isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");
+ iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");
+ } catch (Exception e) {
+ logger.error("Exception", e);
+ }
+ logdir = config.getLogDir();
+ uploaddir = logdir + "/.spool";
+ (new File(uploaddir)).mkdirs();
+ long now = System.currentTimeMillis();
+ long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000);
+ long when = now - now % intvl + intvl + 20000L;
+ config.getTimer().scheduleAtFixedRate(this, when - now, intvl);
+ worker = new Uploader();
+ }
+
+ /**
+ * Trigger check for expired log files and log files to upload.
+ */
+ public void run() {
+ worker.poke();
+ }
+
private class Uploader extends Thread implements DeliveryQueueHelper {
+
+ private static final String EXCEPTION = "Exception";
+ private static final String META = "/.meta";
private EELFLogger logger = EELFManager.getInstance().getLogger(Uploader.class);
+ private DeliveryQueue dq;
+
+ Uploader() {
+ dq = new DeliveryQueue(this,
+ new DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null)
+ .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth())
+ .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false)
+ .setFollowRedirects(false)
+ .setDecompress(false).createDestInfo());
+ setDaemon(true);
+ setName("Log Uploader");
+ start();
+ }
public long getInitFailureTimer() {
return (10000L);
@@ -89,6 +139,7 @@ public class LogManager extends TimerTask {
}
public void handleUnreachable(DestInfo destinationInfo) {
+ throw new UnsupportedOperationException();
}
public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
@@ -103,24 +154,11 @@ public class LogManager extends TimerTask {
return (null);
}
- private DeliveryQueue dq;
-
- public Uploader() {
- dq = new DeliveryQueue(this,
- new DestInfo.DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null)
- .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth())
- .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false).setFollowRedirects(false)
- .setDecompress(false).createDestInfo());
- setDaemon(true);
- setName("Log Uploader");
- start();
- }
-
private synchronized void snooze() {
try {
wait(10000);
} catch (Exception e) {
- logger.error("InterruptedException", e);
+ logger.error(EXCEPTION, e);
}
}
@@ -145,73 +183,48 @@ public class LogManager extends TimerTask {
String curlog = StatusLog.getCurLogFile();
curlog = curlog.substring(curlog.lastIndexOf('/') + 1);
try {
- Writer w = new FileWriter(uploaddir + "/.meta");
- w.write("POST\tlogdata\nContent-Type\ttext/plain\n");
- w.close();
+ Writer writer = new FileWriter(uploaddir + META);
+ writer.write("POST\tlogdata\nContent-Type\ttext/plain\n");
+ writer.close();
BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued"));
lastqueued = br.readLine();
br.close();
} catch (Exception e) {
- logger.error("Exception", e);
+ logger.error(EXCEPTION, e);
}
for (String fn : fns) {
if (!isnodelog.reset(fn).matches()) {
if (!iseventlog.reset(fn).matches()) {
continue;
}
- if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {
- lastqueued = fn;
- try {
- String pid = config.getPublishId();
- Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));
- Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta"));
- } catch (Exception e) {
- logger.error("Exception", e);
- }
- }
+ lastqueued = setLastQueued(lastqueued, curlog, fn);
}
- File f = new File(dir, fn);
- if (f.lastModified() < threshold) {
- f.delete();
+ File file = new File(dir, fn);
+ if (file.lastModified() < threshold) {
+ file.delete();
}
}
try (Writer w = new FileWriter(uploaddir + "/.lastqueued")) {
- (new File(uploaddir + "/.meta")).delete();
+ (new File(uploaddir + META)).delete();
w.write(lastqueued + "\n");
} catch (Exception e) {
- logger.error("Exception", e);
+ logger.error(EXCEPTION, e);
}
}
- }
- /**
- * Construct a log manager
- * <p>
- * The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary.
- * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes).
- */
- public LogManager(NodeConfigManager config) {
- this.config = config;
- try {
- isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");
- iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");
- } catch (Exception e) {
- logger.error("Exception", e);
+ @NotNull
+ private String setLastQueued(String lastqueued, String curlog, String fn) {
+ if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {
+ lastqueued = fn;
+ try {
+ String pid = config.getPublishId();
+ Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));
+ Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + META));
+ } catch (Exception e) {
+ logger.error(EXCEPTION, e);
+ }
+ }
+ return lastqueued;
}
- logdir = config.getLogDir();
- uploaddir = logdir + "/.spool";
- (new File(uploaddir)).mkdirs();
- long now = System.currentTimeMillis();
- long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000);
- long when = now - now % intvl + intvl + 20000L;
- config.getTimer().scheduleAtFixedRate(this, when - now, intvl);
- worker = new Uploader();
- }
-
- /**
- * Trigger check for expired log files and log files to upload
- */
- public void run() {
- worker.poke();
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
index d455f2d9..7f018210 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
@@ -26,22 +26,451 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Vector;
/**
* Processed configuration for this node.
- * <p>
- * The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time
+ *
+ * <p>The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time
* configuration data is received from the provisioning server, a new NodeConfig is created and the previous one
* discarded.
*/
public class NodeConfig {
+
+ private static final String PUBLISHER_NOT_PERMITTED = "Publisher not permitted for this feed";
private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeConfig.class);
+ private HashMap<String, String> params = new HashMap<>();
+ private HashMap<String, Feed> feeds = new HashMap<>();
+ private HashMap<String, DestInfo> nodeinfo = new HashMap<>();
+ private HashMap<String, DestInfo> subinfo = new HashMap<>();
+ private HashMap<String, IsFrom> nodes = new HashMap<>();
+ private HashMap<String, ProvSubscription> provSubscriptions = new HashMap<>();
+ private String myname;
+ private String myauth;
+ private DestInfo[] alldests;
+ private int rrcntr;
+
+ /**
+ * Process the raw provisioning data to configure this node.
+ *
+ * @param pd The parsed provisioning data
+ * @param myname My name as seen by external systems
+ * @param spooldir The directory where temporary files live
+ * @param port The port number for URLs
+ * @param nodeauthkey The keying string used to generate node authentication credentials
+ */
+ public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
+ this.myname = myname;
+ for (ProvParam p : pd.getParams()) {
+ params.put(p.getName(), p.getValue());
+ }
+ ArrayList<DestInfo> destInfos = new ArrayList<>();
+ myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
+ for (ProvNode pn : pd.getNodes()) {
+ String commonName = pn.getCName();
+ if (nodeinfo.get(commonName) != null) {
+ continue;
+ }
+ DestInfo di = new DestInfoBuilder().setName("n:" + commonName).setSpool(spooldir + "/n/" + commonName)
+ .setSubid(null)
+ .setLogdata("n2n-" + commonName).setUrl("https://" + commonName + ":" + port + "/internal/publish")
+ .setAuthuser(commonName).setAuthentication(myauth).setMetaonly(false).setUse100(true)
+ .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo();
+ (new File(di.getSpool())).mkdirs();
+ String auth = NodeUtils.getNodeAuthHdr(commonName, nodeauthkey);
+ destInfos.add(di);
+ nodeinfo.put(commonName, di);
+ nodes.put(auth, new IsFrom(commonName));
+ }
+ PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
+ HashMap<String, ArrayList<Redirection>> rdtab = new HashMap<>();
+ for (ProvForceIngress pfi : pd.getForceIngress()) {
+ ArrayList<Redirection> v = rdtab.get(pfi.getFeedId());
+ if (v == null) {
+ v = new ArrayList<>();
+ rdtab.put(pfi.getFeedId(), v);
+ }
+ Redirection r = new Redirection();
+ if (pfi.getSubnet() != null) {
+ r.snm = new SubnetMatcher(pfi.getSubnet());
+ }
+ r.user = pfi.getUser();
+ r.nodes = pfi.getNodes();
+ v.add(r);
+ }
+ HashMap<String, HashMap<String, String>> pfutab = new HashMap<>();
+ for (ProvFeedUser pfu : pd.getFeedUsers()) {
+ HashMap<String, String> t = pfutab.get(pfu.getFeedId());
+ if (t == null) {
+ t = new HashMap<>();
+ pfutab.put(pfu.getFeedId(), t);
+ }
+ t.put(pfu.getCredentials(), pfu.getUser());
+ }
+ HashMap<String, String> egrtab = new HashMap<>();
+ for (ProvForceEgress pfe : pd.getForceEgress()) {
+ if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
+ continue;
+ }
+ egrtab.put(pfe.getSubId(), pfe.getNode());
+ }
+ HashMap<String, ArrayList<SubnetMatcher>> pfstab = new HashMap<>();
+ for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
+ ArrayList<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
+ if (v == null) {
+ v = new ArrayList<>();
+ pfstab.put(pfs.getFeedId(), v);
+ }
+ v.add(new SubnetMatcher(pfs.getCidr()));
+ }
+ HashMap<String, StringBuilder> feedTargets = new HashMap<>();
+ HashSet<String> allfeeds = new HashSet<>();
+ for (ProvFeed pfx : pd.getFeeds()) {
+ if (pfx.getStatus() == null) {
+ allfeeds.add(pfx.getId());
+ }
+ }
+ for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+ String subId = provSubscription.getSubId();
+ String feedId = provSubscription.getFeedId();
+ if (isFeedOrSubKnown(allfeeds, subId, feedId)) {
+ continue;
+ }
+ int sididx = 999;
+ try {
+ sididx = Integer.parseInt(subId);
+ sididx -= sididx % 100;
+ } catch (Exception e) {
+ logger.error("NODE0517 Exception NodeConfig: " + e);
+ }
+ String subscriptionDirectory = sididx + "/" + subId;
+ DestInfo destinationInfo = new DestInfo("s:" + subId,
+ spooldir + "/s/" + subscriptionDirectory, provSubscription);
+ (new File(destinationInfo.getSpool())).mkdirs();
+ destInfos.add(destinationInfo);
+ provSubscriptions.put(subId, provSubscription);
+ subinfo.put(subId, destinationInfo);
+ String egr = egrtab.get(subId);
+ if (egr != null) {
+ subId = pf.getPath(egr) + subId;
+ }
+ StringBuilder sb = feedTargets.get(feedId);
+ if (sb == null) {
+ sb = new StringBuilder();
+ feedTargets.put(feedId, sb);
+ }
+ sb.append(' ').append(subId);
+ }
+ alldests = destInfos.toArray(new DestInfo[0]);
+ for (ProvFeed pfx : pd.getFeeds()) {
+ String fid = pfx.getId();
+ Feed f = feeds.get(fid);
+ if (f != null) {
+ continue;
+ }
+ f = new Feed();
+ feeds.put(fid, f);
+ f.createdDate = pfx.getCreatedDate();
+ f.loginfo = pfx.getLogData();
+ f.status = pfx.getStatus();
+ /*
+ * AAF changes: TDP EPIC US# 307413
+ * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
+ */
+ f.aafInstance = pfx.getAafInstance();
+ ArrayList<SubnetMatcher> v1 = pfstab.get(fid);
+ if (v1 == null) {
+ f.subnets = new SubnetMatcher[0];
+ } else {
+ f.subnets = v1.toArray(new SubnetMatcher[0]);
+ }
+ HashMap<String, String> h1 = pfutab.get(fid);
+ if (h1 == null) {
+ h1 = new HashMap();
+ }
+ f.authusers = h1;
+ ArrayList<Redirection> v2 = rdtab.get(fid);
+ if (v2 == null) {
+ f.redirections = new Redirection[0];
+ } else {
+ f.redirections = v2.toArray(new Redirection[0]);
+ }
+ StringBuilder sb = feedTargets.get(fid);
+ if (sb == null) {
+ f.targets = new Target[0];
+ } else {
+ f.targets = parseRouting(sb.toString());
+ }
+ }
+ }
+
+ /**
+ * Parse a target string into an array of targets
+ *
+ * @param routing Target string
+ * @return Array of targets.
+ */
+ public Target[] parseRouting(String routing) {
+ routing = routing.trim();
+ if ("".equals(routing)) {
+ return (new Target[0]);
+ }
+ String[] xx = routing.split("\\s+");
+ HashMap<String, Target> tmap = new HashMap<>();
+ HashSet<String> subset = new HashSet<>();
+ ArrayList<Target> tv = new ArrayList<>();
+ for (int i = 0; i < xx.length; i++) {
+ String t = xx[i];
+ int j = t.indexOf('/');
+ if (j == -1) {
+ addTarget(subset, tv, t);
+ } else {
+ addTargetWithRouting(tmap, tv, t, j);
+ }
+ }
+ return (tv.toArray(new Target[0]));
+ }
+
+ /**
+ * Check whether this is a valid node-to-node transfer
+ *
+ * @param credentials Credentials offered by the supposed node
+ * @param ip IP address the request came from
+ */
+ public boolean isAnotherNode(String credentials, String ip) {
+ IsFrom n = nodes.get(credentials);
+ return (n != null && n.isFrom(ip));
+ }
+
+ /**
+ * Check whether publication is allowed.
+ *
+ * @param feedid The ID of the feed being requested.
+ * @param credentials The offered credentials
+ * @param ip The requesting IP address
+ */
+ public String isPublishPermitted(String feedid, String credentials, String ip) {
+ Feed f = feeds.get(feedid);
+ String nf = "Feed does not exist";
+ if (f != null) {
+ nf = f.status;
+ }
+ if (nf != null) {
+ return (nf);
+ }
+ String user = f.authusers.get(credentials);
+ if (user == null) {
+ return (PUBLISHER_NOT_PERMITTED);
+ }
+ if (f.subnets.length == 0) {
+ return (null);
+ }
+ byte[] addr = NodeUtils.getInetAddress(ip);
+ for (SubnetMatcher snm : f.subnets) {
+ if (snm.matches(addr)) {
+ return (null);
+ }
+ }
+ return (PUBLISHER_NOT_PERMITTED);
+ }
+
+ /**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested.
+ */
+ public boolean isDeletePermitted(String subId) {
+ ProvSubscription provSubscription = provSubscriptions.get(subId);
+ return provSubscription.isPrivilegedSubscriber();
+ }
+
+ /**
+ * Check whether publication is allowed for AAF Feed.
+ *
+ * @param feedid The ID of the feed being requested.
+ * @param ip The requesting IP address
+ */
+ public String isPublishPermitted(String feedid, String ip) {
+ Feed f = feeds.get(feedid);
+ String nf = "Feed does not exist";
+ if (f != null) {
+ nf = f.status;
+ }
+ if (nf != null) {
+ return nf;
+ }
+ if (f.subnets.length == 0) {
+ return null;
+ }
+ byte[] addr = NodeUtils.getInetAddress(ip);
+ for (SubnetMatcher snm : f.subnets) {
+ if (snm.matches(addr)) {
+ return null;
+ }
+ }
+ return PUBLISHER_NOT_PERMITTED;
+ }
+
+ /**
+ * Get authenticated user
+ */
+ public String getAuthUser(String feedid, String credentials) {
+ return (feeds.get(feedid).authusers.get(credentials));
+ }
+
+ /**
+ * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID
+ *
+ * @param feedid The ID of the feed specified
+ */
+ public String getAafInstance(String feedid) {
+ Feed f = feeds.get(feedid);
+ return f.aafInstance;
+ }
+
+ /**
+ * Check if the request should be redirected to a different ingress node
+ */
+ public String getIngressNode(String feedid, String user, String ip) {
+ Feed f = feeds.get(feedid);
+ if (f.redirections.length == 0) {
+ return (null);
+ }
+ byte[] addr = NodeUtils.getInetAddress(ip);
+ for (Redirection r : f.redirections) {
+ if ((r.user != null && !user.equals(r.user)) || (r.snm != null && !r.snm.matches(addr))) {
+ continue;
+ }
+ for (String n : r.nodes) {
+ if (myname.equals(n)) {
+ return (null);
+ }
+ }
+ if (r.nodes.length == 0) {
+ return (null);
+ }
+ return (r.nodes[rrcntr++ % r.nodes.length]);
+ }
+ return (null);
+ }
+
+ /**
+ * Get a provisioned configuration parameter
+ */
+ public String getProvParam(String name) {
+ return (params.get(name));
+ }
+
+ /**
+ * Get all the DestInfos
+ */
+ public DestInfo[] getAllDests() {
+ return (alldests);
+ }
+
+ /**
+ * Get the targets for a feed
+ *
+ * @param feedid The feed ID
+ * @return The targets this feed should be delivered to
+ */
+ public Target[] getTargets(String feedid) {
+ if (feedid == null) {
+ return (new Target[0]);
+ }
+ Feed f = feeds.get(feedid);
+ if (f == null) {
+ return (new Target[0]);
+ }
+ return (f.targets);
+ }
+
+ /**
+ * Get the creation date for a feed
+ *
+ * @param feedid The feed ID
+ * @return the timestamp of creation date of feed id passed
+ */
+ public String getCreatedDate(String feedid) {
+ Feed f = feeds.get(feedid);
+ return (f.createdDate);
+ }
+
+ /**
+ * Get the feed ID for a subscription
+ *
+ * @param subid The subscription ID
+ * @return The feed ID
+ */
+ public String getFeedId(String subid) {
+ DestInfo di = subinfo.get(subid);
+ if (di == null) {
+ return (null);
+ }
+ return (di.getLogData());
+ }
+
+ /**
+ * Get the spool directory for a subscription
+ *
+ * @param subid The subscription ID
+ * @return The spool directory
+ */
+ public String getSpoolDir(String subid) {
+ DestInfo di = subinfo.get(subid);
+ if (di == null) {
+ return (null);
+ }
+ return (di.getSpool());
+ }
+
+ /**
+ * Get the Authorization value this node uses
+ *
+ * @return The Authorization header value for this node
+ */
+ public String getMyAuth() {
+ return (myauth);
+ }
+
+ private boolean isFeedOrSubKnown(HashSet<String> allfeeds, String subId, String feedId) {
+ return !allfeeds.contains(feedId) || subinfo.get(subId) != null;
+ }
+
+ private void addTargetWithRouting(HashMap<String, Target> tmap, ArrayList<Target> tv, String t, int j) {
+ String node = t.substring(0, j);
+ String rtg = t.substring(j + 1);
+ DestInfo di = nodeinfo.get(node);
+ if (di == null) {
+ tv.add(new Target(null, t));
+ } else {
+ Target tt = tmap.get(node);
+ if (tt == null) {
+ tt = new Target(di, rtg);
+ tmap.put(node, tt);
+ tv.add(tt);
+ } else {
+ tt.addRouting(rtg);
+ }
+ }
+ }
+
+ private void addTarget(HashSet<String> subset, ArrayList<Target> tv, String t) {
+ DestInfo di = subinfo.get(t);
+ if (di == null) {
+ tv.add(new Target(null, t));
+ } else {
+ if (!subset.contains(t)) {
+ subset.add(t);
+ tv.add(new Target(di, null));
+ }
+ }
+ }
+
/**
* Raw configuration entry for a data router node
*/
@@ -134,9 +563,8 @@ public class NodeConfig {
/**
* Get the created date of the data feed.
*/
- public String getCreatedDate()
- {
- return(createdDate);
+ public String getCreatedDate() {
+ return (createdDate);
}
/**
@@ -277,7 +705,9 @@ public class NodeConfig {
* @param followRedirect Is follow redirect of destination enabled?
* @param decompress To see if they want their information compressed or decompressed
*/
- public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect, boolean decompress) {
+ public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
+ boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect,
+ boolean decompress) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
@@ -348,17 +778,17 @@ public class NodeConfig {
/**
* Should i decompress the file before sending it on
- */
+ */
public boolean isDecompress() {
return (decompress);
}
/**
- * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706
- * Get the followRedirect of this destination
+ * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706 Get the followRedirect of this
+ * destination
*/
boolean getFollowRedirect() {
- return(followRedirect);
+ return (followRedirect);
}
}
@@ -386,7 +816,7 @@ public class NodeConfig {
this.subnet = subnet;
this.user = user;
//Sonar fix
- if(nodes == null) {
+ if (nodes == null) {
this.nodes = new String[0];
} else {
this.nodes = Arrays.copyOf(nodes, nodes.length);
@@ -466,13 +896,6 @@ public class NodeConfig {
private String via;
/**
- * A human readable description of this entry
- */
- public String toString() {
- return ("Hop " + from + "->" + to + " via " + via);
- }
-
- /**
* Construct a hop entry
*
* @param from The FQDN of the node with the data to be delivered
@@ -486,6 +909,13 @@ public class NodeConfig {
}
/**
+ * A human readable description of this entry
+ */
+ public String toString() {
+ return ("Hop " + from + "->" + to + " via " + via);
+ }
+
+ /**
* Get the from node
*/
public String getFrom() {
@@ -519,431 +949,10 @@ public class NodeConfig {
String loginfo;
String status;
SubnetMatcher[] subnets;
- Hashtable<String, String> authusers = new Hashtable<String, String>();
+ HashMap<String, String> authusers = new HashMap<>();
Redirection[] redirections;
Target[] targets;
String createdDate;
String aafInstance;
}
-
- private Hashtable<String, String> params = new Hashtable<>();
- private Hashtable<String, Feed> feeds = new Hashtable<>();
- private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
- private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
- private Hashtable<String, IsFrom> nodes = new Hashtable<>();
- private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
- private String myname;
- private String myauth;
- private DestInfo[] alldests;
- private int rrcntr;
-
- /**
- * Process the raw provisioning data to configure this node
- *
- * @param pd The parsed provisioning data
- * @param myname My name as seen by external systems
- * @param spooldir The directory where temporary files live
- * @param port The port number for URLs
- * @param nodeauthkey The keying string used to generate node authentication credentials
- */
- public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
- this.myname = myname;
- for (ProvParam p : pd.getParams()) {
- params.put(p.getName(), p.getValue());
- }
- Vector<DestInfo> destInfos = new Vector<>();
- myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
- for (ProvNode pn : pd.getNodes()) {
- String cName = pn.getCName();
- if (nodeinfo.get(cName) != null) {
- continue;
- }
- String auth = NodeUtils.getNodeAuthHdr(cName, nodeauthkey);
- DestInfo di = new DestInfo.DestInfoBuilder().setName("n:" + cName).setSpool(spooldir + "/n/" + cName).setSubid(null)
- .setLogdata("n2n-" + cName).setUrl("https://" + cName + ":" + port + "/internal/publish")
- .setAuthuser(cName).setAuthentication(myauth).setMetaonly(false).setUse100(true)
- .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo();
- (new File(di.getSpool())).mkdirs();
- destInfos.add(di);
- nodeinfo.put(cName, di);
- nodes.put(auth, new IsFrom(cName));
- }
- PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
- Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<>();
- for (ProvForceIngress pfi : pd.getForceIngress()) {
- Vector<Redirection> v = rdtab.get(pfi.getFeedId());
- if (v == null) {
- v = new Vector<>();
- rdtab.put(pfi.getFeedId(), v);
- }
- Redirection r = new Redirection();
- if (pfi.getSubnet() != null) {
- r.snm = new SubnetMatcher(pfi.getSubnet());
- }
- r.user = pfi.getUser();
- r.nodes = pfi.getNodes();
- v.add(r);
- }
- Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<>();
- for (ProvFeedUser pfu : pd.getFeedUsers()) {
- Hashtable<String, String> t = pfutab.get(pfu.getFeedId());
- if (t == null) {
- t = new Hashtable<>();
- pfutab.put(pfu.getFeedId(), t);
- }
- t.put(pfu.getCredentials(), pfu.getUser());
- }
- Hashtable<String, String> egrtab = new Hashtable<>();
- for (ProvForceEgress pfe : pd.getForceEgress()) {
- if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
- continue;
- }
- egrtab.put(pfe.getSubId(), pfe.getNode());
- }
- Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
- for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
- Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
- if (v == null) {
- v = new Vector<>();
- pfstab.put(pfs.getFeedId(), v);
- }
- v.add(new SubnetMatcher(pfs.getCidr()));
- }
- Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
- HashSet<String> allfeeds = new HashSet<>();
- for (ProvFeed pfx : pd.getFeeds()) {
- if (pfx.getStatus() == null) {
- allfeeds.add(pfx.getId());
- }
- }
- for (ProvSubscription provSubscription : pd.getSubscriptions()) {
- String subId = provSubscription.getSubId();
- String feedId = provSubscription.getFeedId();
- if (!allfeeds.contains(feedId)) {
- continue;
- }
- if (subinfo.get(subId) != null) {
- continue;
- }
- int sididx = 999;
- try {
- sididx = Integer.parseInt(subId);
- sididx -= sididx % 100;
- } catch (Exception e) {
- logger.error("NODE0517 Exception NodeConfig: "+e);
- }
- String subscriptionDirectory = sididx + "/" + subId;
- DestInfo destinationInfo = new DestInfo("s:" + subId,
- spooldir + "/s/" + subscriptionDirectory, provSubscription);
- (new File(destinationInfo.getSpool())).mkdirs();
- destInfos.add(destinationInfo);
- provSubscriptions.put(subId, provSubscription);
- subinfo.put(subId, destinationInfo);
- String egr = egrtab.get(subId);
- if (egr != null) {
- subId = pf.getPath(egr) + subId;
- }
- StringBuffer sb = feedTargets.get(feedId);
- if (sb == null) {
- sb = new StringBuffer();
- feedTargets.put(feedId, sb);
- }
- sb.append(' ').append(subId);
- }
- alldests = destInfos.toArray(new DestInfo[0]);
- for (ProvFeed pfx : pd.getFeeds()) {
- String fid = pfx.getId();
- Feed f = feeds.get(fid);
- if (f != null) {
- continue;
- }
- f = new Feed();
- feeds.put(fid, f);
- f.createdDate = pfx.getCreatedDate();
- f.loginfo = pfx.getLogData();
- f.status = pfx.getStatus();
- /*
- * AAF changes: TDP EPIC US# 307413
- * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
- */
- f.aafInstance = pfx.getAafInstance();
- Vector<SubnetMatcher> v1 = pfstab.get(fid);
- if (v1 == null) {
- f.subnets = new SubnetMatcher[0];
- } else {
- f.subnets = v1.toArray(new SubnetMatcher[0]);
- }
- Hashtable<String, String> h1 = pfutab.get(fid);
- if (h1 == null) {
- h1 = new Hashtable<String, String>();
- }
- f.authusers = h1;
- Vector<Redirection> v2 = rdtab.get(fid);
- if (v2 == null) {
- f.redirections = new Redirection[0];
- } else {
- f.redirections = v2.toArray(new Redirection[0]);
- }
- StringBuffer sb = feedTargets.get(fid);
- if (sb == null) {
- f.targets = new Target[0];
- } else {
- f.targets = parseRouting(sb.toString());
- }
- }
- }
-
- /**
- * Parse a target string into an array of targets
- *
- * @param routing Target string
- * @return Array of targets.
- */
- public Target[] parseRouting(String routing) {
- routing = routing.trim();
- if ("".equals(routing)) {
- return (new Target[0]);
- }
- String[] xx = routing.split("\\s+");
- Hashtable<String, Target> tmap = new Hashtable<String, Target>();
- HashSet<String> subset = new HashSet<String>();
- Vector<Target> tv = new Vector<Target>();
- Target[] ret = new Target[xx.length];
- for (int i = 0; i < xx.length; i++) {
- String t = xx[i];
- int j = t.indexOf('/');
- if (j == -1) {
- DestInfo di = subinfo.get(t);
- if (di == null) {
- tv.add(new Target(null, t));
- } else {
- if (!subset.contains(t)) {
- subset.add(t);
- tv.add(new Target(di, null));
- }
- }
- } else {
- String node = t.substring(0, j);
- String rtg = t.substring(j + 1);
- DestInfo di = nodeinfo.get(node);
- if (di == null) {
- tv.add(new Target(null, t));
- } else {
- Target tt = tmap.get(node);
- if (tt == null) {
- tt = new Target(di, rtg);
- tmap.put(node, tt);
- tv.add(tt);
- } else {
- tt.addRouting(rtg);
- }
- }
- }
- }
- return (tv.toArray(new Target[0]));
- }
-
- /**
- * Check whether this is a valid node-to-node transfer
- *
- * @param credentials Credentials offered by the supposed node
- * @param ip IP address the request came from
- */
- public boolean isAnotherNode(String credentials, String ip) {
- IsFrom n = nodes.get(credentials);
- return (n != null && n.isFrom(ip));
- }
-
- /**
- * Check whether publication is allowed.
- *
- * @param feedid The ID of the feed being requested.
- * @param credentials The offered credentials
- * @param ip The requesting IP address
- */
- public String isPublishPermitted(String feedid, String credentials, String ip) {
- Feed f = feeds.get(feedid);
- String nf = "Feed does not exist";
- if (f != null) {
- nf = f.status;
- }
- if (nf != null) {
- return (nf);
- }
- String user = f.authusers.get(credentials);
- if (user == null) {
- return ("Publisher not permitted for this feed");
- }
- if (f.subnets.length == 0) {
- return (null);
- }
- byte[] addr = NodeUtils.getInetAddress(ip);
- for (SubnetMatcher snm : f.subnets) {
- if (snm.matches(addr)) {
- return (null);
- }
- }
- return ("Publisher not permitted for this feed");
- }
-
- /**
- * Check whether delete file is allowed.
- *
- * @param subId The ID of the subscription being requested.
- */
- public boolean isDeletePermitted(String subId) {
- ProvSubscription provSubscription = provSubscriptions.get(subId);
- return provSubscription.isPrivilegedSubscriber();
- }
-
- /**
- * Check whether publication is allowed for AAF Feed.
- * @param feedid The ID of the feed being requested.
- * @param ip The requesting IP address
- */
- public String isPublishPermitted(String feedid, String ip) {
- Feed f = feeds.get(feedid);
- String nf = "Feed does not exist";
- if (f != null) {
- nf = f.status;
- }
- if (nf != null) {
- return(nf);
- }
- if (f.subnets.length == 0) {
- return(null);
- }
- byte[] addr = NodeUtils.getInetAddress(ip);
- for (SubnetMatcher snm: f.subnets) {
- if (snm.matches(addr)) {
- return(null);
- }
- }
- return("Publisher not permitted for this feed");
- }
-
- /**
- * Get authenticated user
- */
- public String getAuthUser(String feedid, String credentials) {
- return (feeds.get(feedid).authusers.get(credentials));
- }
-
- /**
- * AAF changes: TDP EPIC US# 307413
- * Check AAF_instance for feed ID
- * @param feedid The ID of the feed specified
- */
- public String getAafInstance(String feedid) {
- Feed f = feeds.get(feedid);
- return f.aafInstance;
- }
-
- /**
- * Check if the request should be redirected to a different ingress node
- */
- public String getIngressNode(String feedid, String user, String ip) {
- Feed f = feeds.get(feedid);
- if (f.redirections.length == 0) {
- return (null);
- }
- byte[] addr = NodeUtils.getInetAddress(ip);
- for (Redirection r : f.redirections) {
- if (r.user != null && !user.equals(r.user)) {
- continue;
- }
- if (r.snm != null && !r.snm.matches(addr)) {
- continue;
- }
- for (String n : r.nodes) {
- if (myname.equals(n)) {
- return (null);
- }
- }
- if (r.nodes.length == 0) {
- return (null);
- }
- return (r.nodes[rrcntr++ % r.nodes.length]);
- }
- return (null);
- }
-
- /**
- * Get a provisioned configuration parameter
- */
- public String getProvParam(String name) {
- return (params.get(name));
- }
-
- /**
- * Get all the DestInfos
- */
- public DestInfo[] getAllDests() {
- return (alldests);
- }
-
- /**
- * Get the targets for a feed
- *
- * @param feedid The feed ID
- * @return The targets this feed should be delivered to
- */
- public Target[] getTargets(String feedid) {
- if (feedid == null) {
- return (new Target[0]);
- }
- Feed f = feeds.get(feedid);
- if (f == null) {
- return (new Target[0]);
- }
- return (f.targets);
- }
-
- /**
- * Get the creation date for a feed
- * @param feedid The feed ID
- * @return the timestamp of creation date of feed id passed
- */
- public String getCreatedDate(String feedid) {
- Feed f = feeds.get(feedid);
- return(f.createdDate);
- }
-
- /**
- * Get the feed ID for a subscription
- *
- * @param subid The subscription ID
- * @return The feed ID
- */
- public String getFeedId(String subid) {
- DestInfo di = subinfo.get(subid);
- if (di == null) {
- return (null);
- }
- return (di.getLogData());
- }
-
- /**
- * Get the spool directory for a subscription
- *
- * @param subid The subscription ID
- * @return The spool directory
- */
- public String getSpoolDir(String subid) {
- DestInfo di = subinfo.get(subid);
- if (di == null) {
- return (null);
- }
- return (di.getSpool());
- }
-
- /**
- * Get the Authorization value this node uses
- *
- * @return The Authorization header value for this node
- */
- public String getMyAuth() {
- return (myauth);
- }
-
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
index 884f7bf9..8a0b0b86 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
@@ -26,8 +26,6 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
@@ -35,6 +33,7 @@ import java.io.Reader;
import java.net.URL;
import java.util.Properties;
import java.util.Timer;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
/**
@@ -49,8 +48,9 @@ import java.util.Timer;
*/
public class NodeConfigManager implements DeliveryQueueHelper {
- private static EELFLogger eelfLogger = EELFManager.getInstance()
- .getLogger(NodeConfigManager.class);
+ private static final String CHANGE_ME = "changeme";
+ private static final String NODE_CONFIG_MANAGER = "NodeConfigManager";
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class);
private static NodeConfigManager base = new NodeConfigManager();
private Timer timer = new Timer("Node Configuration Timer", true);
@@ -94,7 +94,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
private String eventlogsuffix;
private String eventloginterval;
private boolean followredirects;
- private String [] enabledprotocols;
+ private String[] enabledprotocols;
private String aafType;
private String aafInstance;
private String aafAction;
@@ -103,13 +103,6 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
- * Get the default node configuration manager
- */
- public static NodeConfigManager getInstance() {
- return base;
- }
-
- /**
* Initialize the configuration of a Data Router node
*/
private NodeConfigManager() {
@@ -120,8 +113,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
drNodeProperties.load(new FileInputStream(System
.getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties")));
} catch (Exception e) {
- NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
- eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e, System.getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"));
+ NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
+ eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e,
+ System.getProperty("org.onap.dmaap.datarouter.node.properties",
+ "/opt/app/datartr/etc/node.properties"));
}
provurl = drNodeProperties.getProperty("ProvisioningURL", "https://dmaap-dr-prov:8443/internal/prov");
/*
@@ -143,7 +138,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
try {
provhost = (new URL(provurl)).getHost();
} catch (Exception e) {
- NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
+ NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl);
System.exit(1);
}
@@ -168,14 +163,14 @@ public class NodeConfigManager implements DeliveryQueueHelper {
logretention = Long.parseLong(drNodeProperties.getProperty("LogRetention", "30")) * 86400000L;
eventlogprefix = logdir + "/events";
eventlogsuffix = ".log";
- String redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat");
+ redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat");
kstype = drNodeProperties.getProperty("KeyStoreType", "jks");
ksfile = drNodeProperties.getProperty("KeyStoreFile", "etc/keystore");
- kspass = drNodeProperties.getProperty("KeyStorePassword", "changeme");
- kpass = drNodeProperties.getProperty("KeyPassword", "changeme");
+ kspass = drNodeProperties.getProperty("KeyStorePassword", CHANGE_ME);
+ kpass = drNodeProperties.getProperty("KeyPassword", CHANGE_ME);
tstype = drNodeProperties.getProperty("TrustStoreType", "jks");
tsfile = drNodeProperties.getProperty("TrustStoreFile");
- tspass = drNodeProperties.getProperty("TrustStorePassword", "changeme");
+ tspass = drNodeProperties.getProperty("TrustStorePassword", CHANGE_ME);
if (tsfile != null && tsfile.length() > 0) {
System.setProperty("javax.net.ssl.trustStoreType", tstype);
System.setProperty("javax.net.ssl.trustStore", tsfile);
@@ -185,7 +180,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
quiesce = new File(drNodeProperties.getProperty("QuiesceFile", "etc/SHUTDOWN"));
myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);
if (myname == null) {
- NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
+ NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);
eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);
System.exit(1);
@@ -202,6 +197,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
pfetcher.request();
}
+ /**
+ * Get the default node configuration manager
+ */
+ public static NodeConfigManager getInstance() {
+ return base;
+ }
+
private void localconfig() {
followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
@@ -218,52 +220,53 @@ public class NodeConfigManager implements DeliveryQueueHelper {
try {
initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
- eelfLogger.error("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
+ eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
}
try {
- waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+ waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL"))
+ * 1000);
} catch (Exception e) {
- eelfLogger.error("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
+ eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
}
try {
maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
- eelfLogger.error("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
+ eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
}
try {
expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
} catch (Exception e) {
- eelfLogger.error("Error parsing DELIVERY_MAX_AGE", e);
+ eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e);
}
try {
failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
} catch (Exception e) {
- eelfLogger.error("Error parsing DELIVERY_RETRY_RATIO", e);
+ eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e);
}
try {
deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
} catch (Exception e) {
- eelfLogger.error("Error parsing DELIVERY_THREADS", e);
+ eelfLogger.trace("Error parsing DELIVERY_THREADS", e);
}
try {
fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
} catch (Exception e) {
- eelfLogger.error("Error parsing FAIR_FILE_LIMIT", e);
+ eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e);
}
try {
fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
} catch (Exception e) {
- eelfLogger.error("Error parsing FAIR_TIME_LIMIT", e);
+ eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e);
}
try {
fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
} catch (Exception e) {
- eelfLogger.error("Error parsing FREE_DISK_RED_PERCENT", e);
+ eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e);
}
try {
fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
} catch (Exception e) {
- eelfLogger.error("Error parsing FREE_DISK_YELLOW_PERCENT", e);
+ eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e);
}
if (fdpstart < 0.01) {
fdpstart = 0.01;
@@ -286,14 +289,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);
localconfig();
configtasks.startRun();
- Runnable rr;
- while ((rr = configtasks.next()) != null) {
- try {
- rr.run();
- } catch (Exception e) {
- eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
- }
- }
+ runTasks();
} catch (Exception e) {
NodeUtils.setIpAndFqdnForEelf("fetchconfigs");
eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
@@ -302,6 +298,17 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
}
+ private void runTasks() {
+ Runnable rr;
+ while ((rr = configtasks.next()) != null) {
+ try {
+ rr.run();
+ } catch (Exception e) {
+ eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
+ }
+ }
+ }
+
/**
* Process a gofetch request from a particular IP address. If the IP address is not an IP address we would go to to
* fetch the provisioning data, ignore the request. If the data has been fetched very recently (default 10
@@ -381,7 +388,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
* @return True if the IP and credentials are valid for the specified feed.
*/
public String isPublishPermitted(String feedid, String ip) {
- return(config.isPublishPermitted(feedid, ip));
+ return (config.isPublishPermitted(feedid, ip));
}
/**
@@ -396,12 +403,12 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * AAF changes: TDP EPIC US# 307413
- * Check AAF_instance for feed ID in NodeConfig
+ * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID in NodeConfig
+ *
* @param feedid The ID of the feed specified
*/
public String getAafInstance(String feedid) {
- return(config.getAafInstance(feedid));
+ return (config.getAafInstance(feedid));
}
/**
@@ -597,11 +604,12 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Get the creation date for a feed
+ *
* @param feedid The feed ID
* @return the timestamp of creation date of feed id passed
*/
public String getCreatedDate(String feedid) {
- return(config.getCreatedDate(feedid));
+ return (config.getCreatedDate(feedid));
}
/**
@@ -774,10 +782,11 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
* Disable and enable protocols
- * */
+ */
public String[] getEnabledprotocols() {
return enabledprotocols;
}
+
public void setEnabledprotocols(String[] enabledprotocols) {
this.enabledprotocols = enabledprotocols.clone();
}
@@ -805,34 +814,42 @@ public class NodeConfigManager implements DeliveryQueueHelper {
public String getAafType() {
return aafType;
}
+
public void setAafType(String aafType) {
this.aafType = aafType;
}
+
public String getAafInstance() {
return aafInstance;
}
+
public void setAafInstance(String aafInstance) {
this.aafInstance = aafInstance;
}
+
public String getAafAction() {
return aafAction;
}
+
public void setAafAction(String aafAction) {
this.aafAction = aafAction;
}
+
/*
* Get aafURL from SWM variable
* */
public String getAafURL() {
return aafURL;
}
+
public void setAafURL(String aafURL) {
this.aafURL = aafURL;
}
- public boolean getCadiEnabeld() {
+ public boolean getCadiEnabled() {
return cadiEnabled;
}
+
public void setCadiEnabled(boolean cadiEnabled) {
this.cadiEnabled = cadiEnabled;
}
@@ -847,7 +864,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
try {
String type = getAafType();
String action = getAafAction();
- if (aafInstance == null || aafInstance.equals("")) {
+ if ("".equals(aafInstance)) {
aafInstance = getAafInstance();
}
return type + "|" + aafInstance + "|" + action;
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
index 7a2691e4..9eaea283 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
@@ -26,74 +26,38 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Properties;
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletException;
import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.server.*;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.onap.aaf.cadi.PropAccess;
-import javax.servlet.DispatcherType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.EnumSet;
-import java.util.Properties;
-
/**
* The main starting point for the Data Router node
*/
public class NodeMain {
- private NodeMain() {
- }
-
private static EELFLogger nodeMainLogger = EELFManager.getInstance().getLogger(NodeMain.class);
-
- class Inner {
- InputStream getCadiProps() {
- InputStream in = null;
- try {
- in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties");
- } catch (Exception e) {
- nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e);
- }
- return in;
- }
- }
-
- private static class WaitForConfig implements Runnable {
-
- private NodeConfigManager localNodeConfigManager;
-
- WaitForConfig(NodeConfigManager ncm) {
- this.localNodeConfigManager = ncm;
- }
-
- public synchronized void run() {
- notify();
- }
-
- synchronized void waitForConfig() {
- localNodeConfigManager.registerConfigTask(this);
- while (!localNodeConfigManager.isConfigured()) {
- nodeMainLogger.info("NODE0003 Waiting for Node Configuration");
- try {
- wait();
- } catch (Exception exception) {
- nodeMainLogger
- .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(),
- exception);
- }
- }
- localNodeConfigManager.deregisterConfigTask(this);
- nodeMainLogger.info("NODE0004 Node Configuration Data Received");
- }
- }
-
private static Delivery delivery;
private static NodeConfigManager nodeConfigManager;
+ private NodeMain() {
+ }
+
/**
* Reset the retry timer for a subscription
*/
@@ -123,7 +87,8 @@ public class NodeMain {
httpConfiguration.setRequestHeaderSize(2048);
// HTTP connector
- try (ServerConnector httpServerConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration))) {
+ try (ServerConnector httpServerConnector = new ServerConnector(server,
+ new HttpConnectionFactory(httpConfiguration))) {
httpServerConnector.setPort(nodeConfigManager.getHttpPort());
httpServerConnector.setIdleTimeout(2000);
@@ -147,9 +112,12 @@ public class NodeMain {
sslContextFactory.addExcludeProtocols("SSLv3");
sslContextFactory.setIncludeProtocols(nodeConfigManager.getEnabledprotocols());
- nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" + String.join(",", sslContextFactory.getExcludeProtocols()));
- nodeMainLogger.info("NODE00004 Supported protocols node server:-" + String.join(",", sslContextFactory.getIncludeProtocols()));
- nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" + String.join(",", sslContextFactory.getExcludeCipherSuites()));
+ nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" +
+ String.join(",", sslContextFactory.getExcludeProtocols()));
+ nodeMainLogger.info("NODE00004 Supported protocols node server:-" +
+ String.join(",", sslContextFactory.getIncludeProtocols()));
+ nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" +
+ String.join(",", sslContextFactory.getExcludeCipherSuites()));
HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration);
httpsConfiguration.setRequestHeaderSize(8192);
@@ -174,20 +142,8 @@ public class NodeMain {
servletContextHandler.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
//CADI Filter activation check
- if (nodeConfigManager.getCadiEnabeld()) {
- Properties cadiProperties = new Properties();
- try {
- Inner obj = new NodeMain().new Inner();
- InputStream in = obj.getCadiProps();
- cadiProperties.load(in);
- } catch (IOException e1) {
- nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties ", e1);
- }
- cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL());
- nodeMainLogger.info("NODE00005 aaf_url set to - " + cadiProperties.getProperty("aaf_url"));
-
- PropAccess access = new PropAccess(cadiProperties);
- servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet.of(DispatcherType.REQUEST));
+ if (nodeConfigManager.getCadiEnabled()) {
+ enableCadi(servletContextHandler);
}
server.setHandler(servletContextHandler);
@@ -199,9 +155,68 @@ public class NodeMain {
server.start();
nodeMainLogger.info("NODE00006 Node Server started-" + server.getState());
} catch (Exception e) {
- nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e);
+ nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e.getMessage());
}
server.join();
nodeMainLogger.info("NODE00007 Node Server joined - " + server.getState());
}
+
+ private static void enableCadi(ServletContextHandler servletContextHandler) throws ServletException {
+ Properties cadiProperties = new Properties();
+ try {
+ Inner obj = new NodeMain().new Inner();
+ InputStream in = obj.getCadiProps();
+ cadiProperties.load(in);
+ } catch (IOException e1) {
+ nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties " + e1.getMessage());
+ }
+ cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL());
+ nodeMainLogger.info("NODE00005 aaf_url set to - " + cadiProperties.getProperty("aaf_url"));
+
+ PropAccess access = new PropAccess(cadiProperties);
+ servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet
+ .of(DispatcherType.REQUEST));
+ }
+
+ private static class WaitForConfig implements Runnable {
+
+ private NodeConfigManager localNodeConfigManager;
+
+ WaitForConfig(NodeConfigManager ncm) {
+ this.localNodeConfigManager = ncm;
+ }
+
+ public synchronized void run() {
+ notify();
+ }
+
+ synchronized void waitForConfig() {
+ localNodeConfigManager.registerConfigTask(this);
+ while (!localNodeConfigManager.isConfigured()) {
+ nodeMainLogger.info("NODE0003 Waiting for Node Configuration");
+ try {
+ wait();
+ } catch (Exception exception) {
+ nodeMainLogger
+ .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(),
+ exception);
+ }
+ }
+ localNodeConfigManager.deregisterConfigTask(this);
+ nodeMainLogger.info("NODE0004 Node Configuration Data Received");
+ }
+ }
+
+ class Inner {
+
+ InputStream getCadiProps() {
+ InputStream in = null;
+ try {
+ in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties");
+ } catch (Exception e) {
+ nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e);
+ }
+ return in;
+ }
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
index d665080b..3f2fc09f 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
@@ -24,15 +24,10 @@
package org.onap.dmaap.datarouter.node;
+import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-import org.slf4j.MDC;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
@@ -45,8 +40,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.regex.Pattern;
-
-import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
/**
* Servlet for handling all http and https requests to the data router node
@@ -61,23 +60,27 @@ import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
*/
public class NodeServlet extends HttpServlet {
+ private static final String FROM = " from ";
+ private static final String INVALID_REQUEST_URI = "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.";
+ private static final String IO_EXCEPTION = "IOException";
+ private static final String ON_BEHALF_OF = "X-DMAAP-DR-ON-BEHALF-OF";
private static NodeConfigManager config;
- private static Pattern MetaDataPattern;
+ private static Pattern metaDataPattern;
private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeServlet.class);
- private final Delivery delivery;
static {
final String ws = "\\s*";
// assume that \\ and \" have been replaced by X
final String string = "\"[^\"]*\"";
- //String string = "\"(?:[^\"\\\\]|\\\\.)*\"";
final String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";
final String value = "(?:" + string + "|" + number + "|null|true|false)";
final String item = string + ws + ":" + ws + value + ws;
final String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;
- MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
+ metaDataPattern = Pattern.compile(object, Pattern.DOTALL);
}
+ private final Delivery delivery;
+
NodeServlet(Delivery delivery) {
this.delivery = delivery;
}
@@ -91,7 +94,7 @@ public class NodeServlet extends HttpServlet {
eelfLogger.info("NODE0101 Node Servlet Configured");
}
- private boolean down(HttpServletResponse resp) throws IOException {
+ private boolean down(HttpServletResponse resp) {
if (config.isShutdown() || !config.isConfigured()) {
sendResponseError(resp, HttpServletResponse.SC_SERVICE_UNAVAILABLE, eelfLogger);
eelfLogger.info("NODE0102 Rejecting request: Service is being quiesced");
@@ -109,15 +112,10 @@ public class NodeServlet extends HttpServlet {
NodeUtils.setRequestIdAndInvocationId(req);
eelfLogger.info(EelfMsgs.ENTRY);
try {
- eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+ eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
getIdFromPath(req) + "");
- try {
- if (down(resp)) {
- return;
- }
-
- } catch (IOException ioe) {
- eelfLogger.error("IOException", ioe);
+ if (down(resp)) {
+ return;
}
String path = req.getPathInfo();
String qs = req.getQueryString();
@@ -138,7 +136,7 @@ public class NodeServlet extends HttpServlet {
}
}
- eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);
+ eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + FROM + ip);
sendResponseError(resp, HttpServletResponse.SC_NOT_FOUND, eelfLogger);
} finally {
eelfLogger.info(EelfMsgs.EXIT);
@@ -153,12 +151,12 @@ public class NodeServlet extends HttpServlet {
NodeUtils.setIpAndFqdnForEelf("doPut");
NodeUtils.setRequestIdAndInvocationId(req);
eelfLogger.info(EelfMsgs.ENTRY);
- eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+ eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
getIdFromPath(req) + "");
try {
common(req, resp, true);
} catch (IOException ioe) {
- eelfLogger.error("IOException", ioe);
+ eelfLogger.error(IO_EXCEPTION, ioe);
eelfLogger.info(EelfMsgs.EXIT);
}
}
@@ -171,25 +169,30 @@ public class NodeServlet extends HttpServlet {
NodeUtils.setIpAndFqdnForEelf("doDelete");
NodeUtils.setRequestIdAndInvocationId(req);
eelfLogger.info(EelfMsgs.ENTRY);
- eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+ eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
getIdFromPath(req) + "");
try {
common(req, resp, false);
} catch (IOException ioe) {
- eelfLogger.error("IOException", ioe);
+ eelfLogger.error(IO_EXCEPTION, ioe);
eelfLogger.info(EelfMsgs.EXIT);
}
}
private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+ final String PUBLISH = "/publish/";
+ final String INTERNAL_PUBLISH = "/internal/publish/";
+ final String HTTPS = "https://";
+ final String USER = " user ";
String fileid = getFileId(req, resp);
- if (fileid == null) return;
+ if (fileid == null) {
+ return;
+ }
String feedid = null;
String user = null;
String ip = req.getRemoteAddr();
String lip = req.getLocalAddr();
String pubid = null;
- String xpubid = null;
String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
Target[] targets = null;
boolean isAAFFeed = false;
@@ -199,17 +202,17 @@ public class NodeServlet extends HttpServlet {
}
String credentials = req.getHeader("Authorization");
if (credentials == null) {
- eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0306 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");
eelfLogger.info(EelfMsgs.EXIT);
return;
}
- if (fileid.startsWith("/publish/")) {
+ if (fileid.startsWith(PUBLISH)) {
fileid = fileid.substring(9);
int i = fileid.indexOf('/');
if (i == -1 || i == fileid.length() - 1) {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0205 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
"Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid.");
@@ -218,18 +221,19 @@ public class NodeServlet extends HttpServlet {
}
feedid = fileid.substring(0, i);
- if (config.getCadiEnabeld()) {
+ if (config.getCadiEnabled()) {
String path = req.getPathInfo();
if (!path.startsWith("/internal") && feedid != null) {
String aafInstance = config.getAafInstance(feedid);
- if (!(aafInstance.equalsIgnoreCase("legacy"))) {
+ if (!("legacy".equalsIgnoreCase(aafInstance))) {
isAAFFeed = true;
String permission = config.getPermission(aafInstance);
eelfLogger.info("NodeServlet.common() permission string - " + permission);
//Check in CADI Framework API if user has AAF permission or not
if (!req.isUserInRole(permission)) {
String message = "AAF disallows access to permission string - " + permission;
- eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());
+ eelfLogger.error("NODE0307 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo()
+ + FROM + req.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
eelfLogger.info(EelfMsgs.EXIT);
return;
@@ -240,9 +244,8 @@ public class NodeServlet extends HttpServlet {
fileid = fileid.substring(i + 1);
pubid = config.getPublishId();
- xpubid = req.getHeader("X-DMAAP-DR-PUBLISH-ID");
targets = config.getTargets(feedid);
- } else if (fileid.startsWith("/internal/publish/")) {
+ } else if (fileid.startsWith(INTERNAL_PUBLISH)) {
if (!config.isAnotherNode(credentials, ip)) {
eelfLogger.error("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);
resp.sendError(HttpServletResponse.SC_FORBIDDEN);
@@ -254,18 +257,18 @@ public class NodeServlet extends HttpServlet {
user = "datartr"; // SP6 : Added usr as datartr to avoid null entries for internal routing
targets = config.parseRouting(req.getHeader("X-DMAAP-DR-ROUTING"));
} else {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0204 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ INVALID_REQUEST_URI);
eelfLogger.info(EelfMsgs.EXIT);
return;
}
if (fileid.indexOf('/') != -1) {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0202 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ INVALID_REQUEST_URI);
eelfLogger.info(EelfMsgs.EXIT);
return;
}
@@ -278,14 +281,16 @@ public class NodeServlet extends HttpServlet {
if (xp != 443) {
hp = hp + ":" + xp;
}
- String logurl = "https://" + hp + "/internal/publish/" + fileid;
+ String logurl = HTTPS + hp + INTERNAL_PUBLISH + fileid;
if (feedid != null) {
- logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;
+ logurl = HTTPS + hp + PUBLISH + feedid + "/" + fileid;
//Cadi code starts
if (!isAAFFeed) {
String reason = config.isPublishPermitted(feedid, credentials, ip);
if (reason != null) {
- eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason " + PathUtil.cleanString(reason));
+ eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil
+ .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil
+ .cleanString(ip) + " reason " + PathUtil.cleanString(reason));
resp.sendError(HttpServletResponse.SC_FORBIDDEN, reason);
eelfLogger.info(EelfMsgs.EXIT);
return;
@@ -294,9 +299,12 @@ public class NodeServlet extends HttpServlet {
} else {
String reason = config.isPublishPermitted(feedid, ip);
if (reason != null) {
- eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason Invalid AAF user- " + PathUtil.cleanString(reason));
+ eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil
+ .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil
+ .cleanString(ip) + " reason Invalid AAF user- " + PathUtil.cleanString(reason));
String message = "Invalid AAF user- " + PathUtil.cleanString(reason);
- eelfLogger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + PathUtil.cleanString(req.getPathInfo()) + " from " + PathUtil.cleanString(req.getRemoteAddr()));
+ eelfLogger.info("NODE0308 Rejecting unauthenticated PUT or DELETE of " + PathUtil
+ .cleanString(req.getPathInfo()) + FROM + PathUtil.cleanString(req.getRemoteAddr()));
resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
return;
}
@@ -316,25 +324,26 @@ public class NodeServlet extends HttpServlet {
if (iport != 443) {
port = ":" + iport;
}
- String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;
- eelfLogger.info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil.cleanString(redirto)); //Fortify scan fixes - log forging
+ String redirto = HTTPS + newnode + port + PUBLISH + feedid + "/" + fileid;
+ eelfLogger
+ .info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + USER
+ + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil
+ .cleanString(redirto)); //Fortify scan fixes - log forging
resp.sendRedirect(PathUtil.cleanString(redirto)); //Fortify scan fixes-open redirect - 2 issues
eelfLogger.info(EelfMsgs.EXIT);
return;
}
resp.setHeader("X-DMAAP-DR-PUBLISH-ID", pubid);
}
- if (req.getPathInfo().startsWith("/internal/publish/")) {
+ if (req.getPathInfo().startsWith(INTERNAL_PUBLISH)) {
feedid = req.getHeader("X-DMAAP-DR-FEED-ID");
}
String fbase = PathUtil.cleanString(config.getSpoolDir() + "/" + pubid); //Fortify scan fixes-Path manipulation
File data = new File(fbase);
File meta = new File(fbase + ".M");
- OutputStream dos = null;
Writer mw = null;
- InputStream is = null;
try {
- StringBuffer mx = new StringBuffer();
+ StringBuilder mx = new StringBuilder();
mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
Enumeration hnames = req.getHeaderNames();
String ctype = null;
@@ -364,13 +373,17 @@ public class NodeServlet extends HttpServlet {
}
if ("x-dmaap-dr-meta".equals(hnlc)) {
if (hv.length() > 4096) {
- eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
+ eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed "
+ + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip "
+ + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");
eelfLogger.info(EelfMsgs.EXIT);
return;
}
- if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {
- eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
+ if (!metaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {
+ eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed "
+ + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip "
+ + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");
eelfLogger.info(EelfMsgs.EXIT);
return;
@@ -388,28 +401,12 @@ public class NodeServlet extends HttpServlet {
}
mx.append("X-DMAAP-DR-RECEIVED\t").append(rcvd).append('\n');
String metadata = mx.toString();
- byte[] buf = new byte[1024 * 1024];
- int i;
- try {
- is = req.getInputStream();
- dos = new FileOutputStream(data);
- while ((i = is.read(buf)) > 0) {
- dos.write(buf, 0, i);
- }
- is.close();
- is = null;
- dos.close();
- dos = null;
- } catch (IOException ioe) {
- long exlen = -1;
- try {
- exlen = Long.parseLong(req.getHeader("Content-Length"));
- } catch (Exception e) {
- eelfLogger.error("NODE0529 Exception common: " + e);
- }
- StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());
- eelfLogger.info(EelfMsgs.EXIT);
- throw ioe;
+ long exlen = getExlen(req);
+ String message = writeInputStreamToFile(req, data);
+ if (message != null) {
+ StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user,
+ message);
+ throw new IOException(message);
}
Path dpath = Paths.get(fbase);
for (Target t : targets) {
@@ -418,7 +415,8 @@ public class NodeServlet extends HttpServlet {
// TODO: unknown destination
continue;
}
- String dbase = PathUtil.cleanString(di.getSpool() + "/" + pubid); //Fortify scan fixes-Path Manipulation
+ String dbase = PathUtil
+ .cleanString(di.getSpool() + "/" + pubid); //Fortify scan fixes-Path Manipulation
Files.createLink(Paths.get(dbase), dpath);
mw = new FileWriter(meta);
mw.write(metadata);
@@ -427,45 +425,28 @@ public class NodeServlet extends HttpServlet {
}
mw.close();
meta.renameTo(new File(dbase + ".M"));
-
}
resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
try {
resp.getOutputStream().close();
} catch (IOException ioe) {
- long exlen = -1;
- try {
- exlen = Long.parseLong(req.getHeader("Content-Length"));
- } catch (Exception e) {
- eelfLogger.error("NODE00000 Exception common", e);
- }
- StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());
+ StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user,
+ ioe.getMessage());
//Fortify scan fixes - log forging
- eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe.toString(), ioe);
-
+ eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid)
+ + USER + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe
+ .toString(), ioe);
throw ioe;
}
- StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);
+ StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user,
+ HttpServletResponse.SC_NO_CONTENT);
} catch (IOException ioe) {
- eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);
+ eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + USER + user
+ + " ip " + ip + " " + ioe.toString(), ioe);
eelfLogger.info(EelfMsgs.EXIT);
throw ioe;
} finally {
- if (is != null) {
- try {
- is.close();
- } catch (Exception e) {
- eelfLogger.error("NODE0530 Exception common: " + e);
- }
- }
- if (dos != null) {
- try {
- dos.close();
- } catch (Exception e) {
- eelfLogger.error("NODE0531 Exception common: " + e);
- }
- }
if (mw != null) {
try {
mw.close();
@@ -486,12 +467,39 @@ public class NodeServlet extends HttpServlet {
}
}
+ private String writeInputStreamToFile(HttpServletRequest req, File data) {
+ byte[] buf = new byte[1024 * 1024];
+ int i;
+ try (OutputStream dos = new FileOutputStream(data);
+ InputStream is = req.getInputStream()) {
+ while ((i = is.read(buf)) > 0) {
+ dos.write(buf, 0, i);
+ }
+ } catch (IOException ioe) {
+ eelfLogger.error("NODE0530 Exception common: " + ioe, ioe);
+ eelfLogger.info(EelfMsgs.EXIT);
+ return ioe.getMessage();
+ }
+ return null;
+ }
+
+ private long getExlen(HttpServletRequest req) {
+ long exlen = -1;
+ try {
+ exlen = Long.parseLong(req.getHeader("Content-Length"));
+ } catch (Exception e) {
+ eelfLogger.error("NODE0529 Exception common: " + e);
+ }
+ return exlen;
+ }
+
private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+ final String FROM_DR_MESSAGE = ".M) from DR Node: ";
try {
fileid = fileid.substring(8);
int i = fileid.indexOf('/');
if (i == -1 || i == fileid.length() - 1) {
- eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
"Invalid request URI. Expecting <subId>/<pubId>.");
@@ -501,7 +509,7 @@ public class NodeServlet extends HttpServlet {
String subscriptionId = fileid.substring(0, i);
int subId = Integer.parseInt(subscriptionId);
pubid = fileid.substring(i + 1);
- String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
+ config.getMyName() + ".";
int subIdDir = subId - (subId % 100);
if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
@@ -509,7 +517,7 @@ public class NodeServlet extends HttpServlet {
}
boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
if (result) {
- eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
+ config.getMyName());
resp.setStatus(HttpServletResponse.SC_OK);
eelfLogger.info(EelfMsgs.EXIT);
@@ -519,7 +527,7 @@ public class NodeServlet extends HttpServlet {
eelfLogger.info(EelfMsgs.EXIT);
}
} catch (IOException ioe) {
- eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
+ config.getMyName(), ioe);
eelfLogger.info(EelfMsgs.EXIT);
}
@@ -533,7 +541,7 @@ public class NodeServlet extends HttpServlet {
}
if (!req.isSecure()) {
eelfLogger.error(
- "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+ "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
eelfLogger.info(EelfMsgs.EXIT);
@@ -541,17 +549,18 @@ public class NodeServlet extends HttpServlet {
}
String fileid = req.getPathInfo();
if (fileid == null) {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0201 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ INVALID_REQUEST_URI);
eelfLogger.info(EelfMsgs.EXIT);
return null;
}
return fileid;
}
- private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+ private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage)
+ throws IOException {
try {
boolean deletePermitted = config.isDeletePermitted(subscriptionId);
if (!deletePermitted) {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
index da84ae54..e79e2ee3 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
@@ -24,15 +24,22 @@
package org.onap.dmaap.datarouter.node;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;
+import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.security.KeyStore;
+import java.security.KeyStoreException;
import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -47,8 +54,6 @@ import org.apache.commons.lang3.StringUtils;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
-import static com.att.eelf.configuration.Configuration.*;
-
/**
* Utility functions for the data router node
*/
@@ -63,7 +68,7 @@ public class NodeUtils {
/**
* Base64 encode a byte array
*
- * @param raw The bytes to be encoded
+ * @param raw The bytes to be encoded
* @return The encoded string
*/
public static String base64Encode(byte[] raw) {
@@ -117,11 +122,7 @@ public class NodeUtils {
KeyStore ks;
try {
ks = KeyStore.getInstance(kstype);
- try (FileInputStream fileInputStream = new FileInputStream(ksfile)) {
- ks.load(fileInputStream, kspass.toCharArray());
- } catch (IOException ioException) {
- eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(),
- ioException);
+ if (loadKeyStore(ksfile, kspass, ks)) {
return (null);
}
} catch (Exception e) {
@@ -142,22 +143,9 @@ public class NodeUtils {
try {
Enumeration<String> aliases = ks.aliases();
while (aliases.hasMoreElements()) {
- String s = aliases.nextElement();
- if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {
- X509Certificate c = (X509Certificate) ks.getCertificate(s);
- if (c != null) {
- String subject = c.getSubjectX500Principal().getName();
- String[] parts = subject.split(",");
- if (parts.length < 1) {
- return (null);
- }
- subject = parts[5].trim();
- if (!subject.startsWith("CN=")) {
- return (null);
-
- }
- return (subject.substring(3));
- }
+ String name = getNameFromSubject(ks, aliases);
+ if (name != null) {
+ return name;
}
}
} catch (Exception e) {
@@ -290,19 +278,51 @@ public class NodeUtils {
/**
* Method to check to see if file is of type gzip
*
- * @param file The name of the file to be checked
- * @return True if the file is of type gzip
+ * @param file The name of the file to be checked
+ * @return True if the file is of type gzip
*/
- public static boolean isFiletypeGzip(File file){
- try(FileInputStream fileInputStream = new FileInputStream(file);
- GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
+ public static boolean isFiletypeGzip(File file) {
+ try (FileInputStream fileInputStream = new FileInputStream(file);
+ GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
return true;
- }catch (IOException e){
+ } catch (IOException e) {
eelfLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e);
return false;
}
}
+ private static boolean loadKeyStore(String ksfile, String kspass, KeyStore ks)
+ throws NoSuchAlgorithmException, CertificateException {
+ try (FileInputStream fileInputStream = new FileInputStream(ksfile)) {
+ ks.load(fileInputStream, kspass.toCharArray());
+ } catch (IOException ioException) {
+ eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(),
+ ioException);
+ return true;
+ }
+ return false;
+ }
+
+
+ private static String getNameFromSubject(KeyStore ks, Enumeration<String> aliases) throws KeyStoreException {
+ String s = aliases.nextElement();
+ if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {
+ X509Certificate c = (X509Certificate) ks.getCertificate(s);
+ if (c != null) {
+ String subject = c.getSubjectX500Principal().getName();
+ String[] parts = subject.split(",");
+ if (parts.length < 1) {
+ return null;
+ }
+ subject = parts[5].trim();
+ if (!subject.startsWith("CN=")) {
+ return null;
+ }
+ return subject.substring(3);
+ }
+ }
+ return null;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
index fec2ca39..d8beab5a 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
@@ -24,9 +24,11 @@
package org.onap.dmaap.datarouter.node;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Vector;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop;
/**
* Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to
@@ -35,16 +37,41 @@ import java.util.Vector;
public class PathFinder {
- private static class Hop {
+ private ArrayList<String> errors = new ArrayList<>();
+ private HashMap<String, String> routes = new HashMap<>();
- boolean mark;
- boolean bad;
- NodeConfig.ProvHop basis;
+ /**
+ * Find routes from a specified origin to all of the nodes given a set of specified next hops.
+ *
+ * @param origin where we start
+ * @param nodes where we can go
+ * @param hops detours along the way
+ */
+ public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {
+ HashSet<String> known = new HashSet<>();
+ HashMap<String, HashMap<String, Hop>> ht = new HashMap<>();
+ for (String n : nodes) {
+ known.add(n);
+ ht.put(n, new HashMap<>());
+ }
+ for (NodeConfig.ProvHop ph : hops) {
+ Hop h = getHop(known, ht, ph);
+ if (h == null) {
+ continue;
+ }
+ if (ph.getVia().equals(ph.getTo())) {
+ errors.add(ph + " gives destination as via");
+ h.bad = true;
+ }
+ }
+ for (String n : known) {
+ if (n.equals(origin)) {
+ routes.put(n, "");
+ }
+ routes.put(n, plot(origin, n, ht.get(n)) + "/");
+ }
}
- private Vector<String> errors = new Vector<String>();
- private Hashtable<String, String> routes = new Hashtable<String, String>();
-
/**
* Get list of errors encountered while finding paths
*
@@ -68,13 +95,12 @@ public class PathFinder {
return (ret);
}
- private String plot(String from, String to, Hashtable<String, Hop> info) {
+ private String plot(String from, String to, HashMap<String, Hop> info) {
Hop nh = info.get(from);
if (nh == null || nh.bad) {
return (to);
}
if (nh.mark) {
- // loop detected;
while (!nh.bad) {
nh.bad = true;
errors.add(nh.basis + " is part of a cycle");
@@ -91,55 +117,38 @@ public class PathFinder {
return (nh.basis.getVia() + "/" + x);
}
- /**
- * Find routes from a specified origin to all of the nodes given a set of specified next hops.
- *
- * @param origin where we start
- * @param nodes where we can go
- * @param hops detours along the way
- */
- public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {
- HashSet<String> known = new HashSet<String>();
- Hashtable<String, Hashtable<String, Hop>> ht = new Hashtable<String, Hashtable<String, Hop>>();
- for (String n : nodes) {
- known.add(n);
- ht.put(n, new Hashtable<String, Hop>());
+ @Nullable
+ private Hop getHop(HashSet<String> known, HashMap<String, HashMap<String, Hop>> ht, ProvHop ph) {
+ if (!known.contains(ph.getFrom())) {
+ errors.add(ph + " references unknown from node");
+ return null;
}
- for (NodeConfig.ProvHop ph : hops) {
- if (!known.contains(ph.getFrom())) {
- errors.add(ph + " references unknown from node");
- continue;
- }
- if (!known.contains(ph.getTo())) {
- errors.add(ph + " references unknown destination node");
- continue;
- }
- Hashtable<String, Hop> ht2 = ht.get(ph.getTo());
- Hop h = ht2.get(ph.getFrom());
- if (h != null) {
- h.bad = true;
- errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia());
- continue;
- }
- h = new Hop();
- h.basis = ph;
- ht2.put(ph.getFrom(), h);
- if (!known.contains(ph.getVia())) {
- errors.add(ph + " references unknown via node");
- h.bad = true;
- continue;
- }
- if (ph.getVia().equals(ph.getTo())) {
- errors.add(ph + " gives destination as via");
- h.bad = true;
- continue;
- }
+ if (!known.contains(ph.getTo())) {
+ errors.add(ph + " references unknown destination node");
+ return null;
}
- for (String n : known) {
- if (n.equals(origin)) {
- routes.put(n, "");
- }
- routes.put(n, plot(origin, n, ht.get(n)) + "/");
+ HashMap<String, Hop> ht2 = ht.get(ph.getTo());
+ Hop h = ht2.get(ph.getFrom());
+ if (h != null) {
+ h.bad = true;
+ errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia());
+ return null;
}
+ h = new Hop();
+ h.basis = ph;
+ ht2.put(ph.getFrom(), h);
+ if (!known.contains(ph.getVia())) {
+ errors.add(ph + " references unknown via node");
+ h.bad = true;
+ return null;
+ }
+ return h;
+ }
+
+ private static class Hop {
+
+ boolean mark;
+ boolean bad;
+ NodeConfig.ProvHop basis;
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
index a4034410..16f8033b 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
@@ -1,65 +1,82 @@
-/**
- * -
+/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * <p>
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dmaap.datarouter.node;
/**
* FORTIFY SCAN FIXES
* <p>This Utility is used for Fortify fixes. It Validates the path url formed from
- * the string passed in the request parameters.</p>
- *
+ * the string passed in the request parameters.</p>
*/
class PathUtil {
+ private PathUtil() {
+ throw new IllegalStateException("Utility Class");
+ }
+
/**
* This method takes String as the parameter and return the filtered path string.
+ *
* @param aString String to clean
* @return A cleaned String
*/
static String cleanString(String aString) {
- if (aString == null) return null;
- String cleanString = "";
+ if (aString == null) {
+ return null;
+ }
+ StringBuilder cleanString = new StringBuilder();
for (int i = 0; i < aString.length(); ++i) {
- cleanString += cleanChar(aString.charAt(i));
+ cleanString.append(cleanChar(aString.charAt(i)));
}
- return cleanString;
+ return cleanString.toString();
}
/**
* This method filters the valid special characters in path string.
+ *
* @param aChar The char to be cleaned
* @return The cleaned char
*/
private static char cleanChar(char aChar) {
// 0 - 9
for (int i = 48; i < 58; ++i) {
- if (aChar == i) return (char) i;
+ if (aChar == i) {
+ return (char) i;
+ }
}
// 'A' - 'Z'
for (int i = 65; i < 91; ++i) {
- if (aChar == i) return (char) i;
+ if (aChar == i) {
+ return (char) i;
+ }
}
// 'a' - 'z'
for (int i = 97; i < 123; ++i) {
- if (aChar == i) return (char) i;
+ if (aChar == i) {
+ return (char) i;
+ }
}
+ return getValidCharacter(aChar);
+ }
+
+ private static char getValidCharacter(char aChar) {
// other valid characters
switch (aChar) {
case '/':
@@ -82,7 +99,8 @@ class PathUtil {
return '_';
case ' ':
return ' ';
+ default:
+ return '%';
}
- return '%';
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
index 1af7dda4..bb9ddc3b 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
@@ -24,35 +24,91 @@
package org.onap.dmaap.datarouter.node;
-import java.io.*;
-import java.util.*;
-
-import org.json.*;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import org.jetbrains.annotations.Nullable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeed;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedSubnet;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedUser;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceEgress;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceIngress;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvNode;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvParam;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvSubscription;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
/**
* Parser for provisioning data from the provisioning server.
* <p>
- * The ProvData class uses a Reader for the text configuration from the
- * provisioning server to construct arrays of raw configuration entries.
+ * The ProvData class uses a Reader for the text configuration from the provisioning server to construct arrays of raw
+ * configuration entries.
*/
public class ProvData {
+
+ private static final String FEED_ID = "feedid";
private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(ProvData.class);
- private NodeConfig.ProvNode[] pn;
- private NodeConfig.ProvParam[] pp;
- private NodeConfig.ProvFeed[] pf;
- private NodeConfig.ProvFeedUser[] pfu;
- private NodeConfig.ProvFeedSubnet[] pfsn;
- private NodeConfig.ProvSubscription[] ps;
- private NodeConfig.ProvForceIngress[] pfi;
- private NodeConfig.ProvForceEgress[] pfe;
- private NodeConfig.ProvHop[] ph;
-
- private static String[] gvasa(JSONArray a, int index) {
- return (gvasa(a.get(index)));
+ private NodeConfig.ProvNode[] provNodes;
+ private NodeConfig.ProvParam[] provParams;
+ private NodeConfig.ProvFeed[] provFeeds;
+ private NodeConfig.ProvFeedUser[] provFeedUsers;
+ private NodeConfig.ProvFeedSubnet[] provFeedSubnets;
+ private NodeConfig.ProvSubscription[] provSubscriptions;
+ private NodeConfig.ProvForceIngress[] provForceIngresses;
+ private NodeConfig.ProvForceEgress[] provForceEgresses;
+ private NodeConfig.ProvHop[] provHops;
+
+ /**
+ * Construct raw provisioing data entries from the text (JSON) provisioning document received from the provisioning
+ * server
+ *
+ * @param r The reader for the JSON text.
+ */
+ public ProvData(Reader r) throws IOException {
+ ArrayList<ProvNode> provNodes1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvParam> provParams1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvFeed> provFeeds1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvFeedUser> provFeedUsers1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvFeedSubnet> provFeedSubnets1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvSubscription> provSubscriptions1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvForceIngress> provForceIngresses1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvForceEgress> provForceEgresses1 = new ArrayList<>();
+ ArrayList<NodeConfig.ProvHop> provHops1 = new ArrayList<>();
+ try {
+ JSONTokener jtx = new JSONTokener(r);
+ JSONObject jcfg = new JSONObject(jtx);
+ char c = jtx.nextClean();
+ if (c != '\0') {
+ throw new JSONException("Spurious characters following configuration");
+ }
+ r.close();
+ addJSONFeeds(provFeeds1, provFeedUsers1, provFeedSubnets1, jcfg);
+ addJSONSubs(provSubscriptions1, jcfg);
+ addJSONParams(provNodes1, provParams1, jcfg);
+ addJSONRoutingInformation(provForceIngresses1, provForceEgresses1, provHops1, jcfg);
+ } catch (JSONException jse) {
+ NodeUtils.setIpAndFqdnForEelf("ProvData");
+ eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString());
+ eelfLogger
+ .error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);
+ throw new IOException(jse.toString(), jse);
+ }
+ provNodes = provNodes1.toArray(new NodeConfig.ProvNode[provNodes1.size()]);
+ provParams = provParams1.toArray(new NodeConfig.ProvParam[provParams1.size()]);
+ provFeeds = provFeeds1.toArray(new NodeConfig.ProvFeed[provFeeds1.size()]);
+ provFeedUsers = provFeedUsers1.toArray(new NodeConfig.ProvFeedUser[provFeedUsers1.size()]);
+ provFeedSubnets = provFeedSubnets1.toArray(new NodeConfig.ProvFeedSubnet[provFeedSubnets1.size()]);
+ provSubscriptions = provSubscriptions1.toArray(new NodeConfig.ProvSubscription[provSubscriptions1.size()]);
+ provForceIngresses = provForceIngresses1.toArray(new NodeConfig.ProvForceIngress[provForceIngresses1.size()]);
+ provForceEgresses = provForceEgresses1.toArray(new NodeConfig.ProvForceEgress[provForceEgresses1.size()]);
+ provHops = provHops1.toArray(new NodeConfig.ProvHop[provHops1.size()]);
}
private static String[] gvasa(JSONObject o, String key) {
@@ -62,7 +118,7 @@ public class ProvData {
private static String[] gvasa(Object o) {
if (o instanceof JSONArray) {
JSONArray a = (JSONArray) o;
- Vector<String> v = new Vector<String>();
+ ArrayList<String> v = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
String s = gvas(a, i);
if (s != null) {
@@ -96,236 +152,256 @@ public class ProvData {
}
/**
- * Construct raw provisioing data entries from the text (JSON)
- * provisioning document received from the provisioning server
- *
- * @param r The reader for the JSON text.
- */
- public ProvData(Reader r) throws IOException {
- Vector<NodeConfig.ProvNode> pnv = new Vector<NodeConfig.ProvNode>();
- Vector<NodeConfig.ProvParam> ppv = new Vector<NodeConfig.ProvParam>();
- Vector<NodeConfig.ProvFeed> pfv = new Vector<NodeConfig.ProvFeed>();
- Vector<NodeConfig.ProvFeedUser> pfuv = new Vector<NodeConfig.ProvFeedUser>();
- Vector<NodeConfig.ProvFeedSubnet> pfsnv = new Vector<NodeConfig.ProvFeedSubnet>();
- Vector<NodeConfig.ProvSubscription> psv = new Vector<NodeConfig.ProvSubscription>();
- Vector<NodeConfig.ProvForceIngress> pfiv = new Vector<NodeConfig.ProvForceIngress>();
- Vector<NodeConfig.ProvForceEgress> pfev = new Vector<NodeConfig.ProvForceEgress>();
- Vector<NodeConfig.ProvHop> phv = new Vector<NodeConfig.ProvHop>();
- try {
- JSONTokener jtx = new JSONTokener(r);
- JSONObject jcfg = new JSONObject(jtx);
- char c = jtx.nextClean();
- if (c != '\0') {
- throw new JSONException("Spurious characters following configuration");
- }
- r.close();
- JSONArray jfeeds = jcfg.optJSONArray("feeds");
- if (jfeeds != null) {
- for (int fx = 0; fx < jfeeds.length(); fx++) {
- JSONObject jfeed = jfeeds.getJSONObject(fx);
- String stat = null;
- if (jfeed.optBoolean("suspend", false)) {
- stat = "Feed is suspended";
- }
- if (jfeed.optBoolean("deleted", false)) {
- stat = "Feed is deleted";
- }
- String fid = gvas(jfeed, "feedid");
- String fname = gvas(jfeed, "name");
- String fver = gvas(jfeed, "version");
- String createdDate = gvas(jfeed, "created_date");
- /*
- * START - AAF changes
- * TDP EPIC US# 307413
- * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds
- */
- String aafInstance = gvas(jfeed, "aaf_instance");
- pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat,createdDate, aafInstance));
- /*
- * END - AAF changes
- */
- JSONObject jauth = jfeed.optJSONObject("authorization");
- if (jauth == null) {
- continue;
- }
- JSONArray jeids = jauth.optJSONArray("endpoint_ids");
- if (jeids != null) {
- for (int ux = 0; ux < jeids.length(); ux++) {
- JSONObject ju = jeids.getJSONObject(ux);
- String login = gvas(ju, "id");
- String password = gvas(ju, "password");
- pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));
- }
- }
- JSONArray jeips = jauth.optJSONArray("endpoint_addrs");
- if (jeips != null) {
- for (int ix = 0; ix < jeips.length(); ix++) {
- String sn = gvas(jeips, ix);
- pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn));
- }
- }
- }
- }
- JSONArray jsubs = jcfg.optJSONArray("subscriptions");
- if (jsubs != null) {
- for (int sx = 0; sx < jsubs.length(); sx++) {
- JSONObject jsub = jsubs.getJSONObject(sx);
- if (jsub.optBoolean("suspend", false)) {
- continue;
- }
- String sid = gvas(jsub, "subid");
- String fid = gvas(jsub, "feedid");
- JSONObject jdel = jsub.getJSONObject("delivery");
- String delurl = gvas(jdel, "url");
- String id = gvas(jdel, "user");
- String password = gvas(jdel, "password");
- boolean monly = jsub.getBoolean("metadataOnly");
- boolean use100 = jdel.getBoolean("use100");
- boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
- boolean decompress = jsub.getBoolean("decompress");
- boolean followRedirect = jsub.getBoolean("follow_redirect");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, followRedirect, decompress));
- }
- }
- JSONObject jparams = jcfg.optJSONObject("parameters");
- if (jparams != null) {
- for (String pname : JSONObject.getNames(jparams)) {
- String pvalue = gvas(jparams, pname);
- if (pvalue != null) {
- ppv.add(new NodeConfig.ProvParam(pname, pvalue));
- }
- }
- String sfx = gvas(jparams, "PROV_DOMAIN");
- JSONArray jnodes = jparams.optJSONArray("NODES");
- if (jnodes != null) {
- for (int nx = 0; nx < jnodes.length(); nx++) {
- String nn = gvas(jnodes, nx);
- if (nn.indexOf('.') == -1) {
- nn = nn + "." + sfx;
- }
- pnv.add(new NodeConfig.ProvNode(nn));
- }
- }
- }
- JSONArray jingresses = jcfg.optJSONArray("ingress");
- if (jingresses != null) {
- for (int fx = 0; fx < jingresses.length(); fx++) {
- JSONObject jingress = jingresses.getJSONObject(fx);
- String fid = gvas(jingress, "feedid");
- String subnet = gvas(jingress, "subnet");
- String user = gvas(jingress, "user");
- String[] nodes = gvasa(jingress, "node");
- if (fid == null || "".equals(fid)) {
- continue;
- }
- if ("".equals(subnet)) {
- subnet = null;
- }
- if ("".equals(user)) {
- user = null;
- }
- pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes));
- }
- }
- JSONObject jegresses = jcfg.optJSONObject("egress");
- if (jegresses != null && JSONObject.getNames(jegresses) != null) {
- for (String esid : JSONObject.getNames(jegresses)) {
- String enode = gvas(jegresses, esid);
- if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {
- pfev.add(new NodeConfig.ProvForceEgress(esid, enode));
- }
- }
- }
- JSONArray jhops = jcfg.optJSONArray("routing");
- if (jhops != null) {
- for (int fx = 0; fx < jhops.length(); fx++) {
- JSONObject jhop = jhops.getJSONObject(fx);
- String from = gvas(jhop, "from");
- String to = gvas(jhop, "to");
- String via = gvas(jhop, "via");
- if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {
- continue;
- }
- phv.add(new NodeConfig.ProvHop(from, to, via));
- }
- }
- } catch (JSONException jse) {
- NodeUtils.setIpAndFqdnForEelf("ProvData");
- eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString());
- eelfLogger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);
- throw new IOException(jse.toString(), jse);
- }
- pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]);
- pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]);
- pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]);
- pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]);
- pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]);
- ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]);
- pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]);
- pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]);
- ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]);
- }
-
- /**
* Get the raw node configuration entries
*/
public NodeConfig.ProvNode[] getNodes() {
- return (pn);
+ return (provNodes);
}
/**
* Get the raw parameter configuration entries
*/
public NodeConfig.ProvParam[] getParams() {
- return (pp);
+ return (provParams);
}
/**
* Ge the raw feed configuration entries
*/
public NodeConfig.ProvFeed[] getFeeds() {
- return (pf);
+ return (provFeeds);
}
/**
* Get the raw feed user configuration entries
*/
public NodeConfig.ProvFeedUser[] getFeedUsers() {
- return (pfu);
+ return (provFeedUsers);
}
/**
* Get the raw feed subnet configuration entries
*/
public NodeConfig.ProvFeedSubnet[] getFeedSubnets() {
- return (pfsn);
+ return (provFeedSubnets);
}
/**
* Get the raw subscription entries
*/
public NodeConfig.ProvSubscription[] getSubscriptions() {
- return (ps);
+ return (provSubscriptions);
}
/**
* Get the raw forced ingress entries
*/
public NodeConfig.ProvForceIngress[] getForceIngress() {
- return (pfi);
+ return (provForceIngresses);
}
/**
* Get the raw forced egress entries
*/
public NodeConfig.ProvForceEgress[] getForceEgress() {
- return (pfe);
+ return (provForceEgresses);
}
/**
* Get the raw next hop entries
*/
public NodeConfig.ProvHop[] getHops() {
- return (ph);
+ return (provHops);
+ }
+
+ @Nullable
+ private String getFeedStatus(JSONObject jfeed) {
+ String stat = null;
+ if (jfeed.optBoolean("suspend", false)) {
+ stat = "Feed is suspended";
+ }
+ if (jfeed.optBoolean("deleted", false)) {
+ stat = "Feed is deleted";
+ }
+ return stat;
+ }
+
+ private void addJSONFeeds(ArrayList<ProvFeed> provFeeds1, ArrayList<ProvFeedUser> provFeedUsers1,
+ ArrayList<ProvFeedSubnet> provFeedSubnets1,
+ JSONObject jsonConfig) {
+ JSONArray jfeeds = jsonConfig.optJSONArray("feeds");
+ if (jfeeds != null) {
+ for (int fx = 0; fx < jfeeds.length(); fx++) {
+ addJSONFeed(provFeeds1, provFeedUsers1, provFeedSubnets1, jfeeds, fx);
+ }
+ }
+ }
+
+ private void addJSONFeed(ArrayList<ProvFeed> provFeeds1, ArrayList<ProvFeedUser> provFeedUsers1,
+ ArrayList<ProvFeedSubnet> provFeedSubnets1, JSONArray jfeeds, int feedIndex) {
+ JSONObject jfeed = jfeeds.getJSONObject(feedIndex);
+ String stat = getFeedStatus(jfeed);
+ String fid = gvas(jfeed, FEED_ID);
+ String fname = gvas(jfeed, "name");
+ String fver = gvas(jfeed, "version");
+ String createdDate = gvas(jfeed, "created_date");
+ /*
+ * START - AAF changes
+ * TDP EPIC US# 307413
+ * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds
+ */
+ String aafInstance = gvas(jfeed, "aaf_instance");
+ provFeeds1.add(new ProvFeed(fid, fname + "//" + fver, stat, createdDate, aafInstance));
+ /*
+ * END - AAF changes
+ */
+ addJSONFeedAuthArrays(provFeedUsers1, provFeedSubnets1, jfeed, fid);
+ }
+
+ private void addJSONFeedAuthArrays(ArrayList<ProvFeedUser> provFeedUsers1,
+ ArrayList<ProvFeedSubnet> provFeedSubnets1, JSONObject jfeed, String fid) {
+ JSONObject jauth = jfeed.optJSONObject("authorization");
+ if (jauth == null) {
+ return;
+ }
+ JSONArray jeids = jauth.optJSONArray("endpoint_ids");
+ if (jeids != null) {
+ for (int ux = 0; ux < jeids.length(); ux++) {
+ JSONObject ju = jeids.getJSONObject(ux);
+ String login = gvas(ju, "id");
+ String password = gvas(ju, "password");
+ provFeedUsers1.add(new ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));
+ }
+ }
+ JSONArray jeips = jauth.optJSONArray("endpoint_addrs");
+ if (jeips != null) {
+ for (int ix = 0; ix < jeips.length(); ix++) {
+ String sn = gvas(jeips, ix);
+ provFeedSubnets1.add(new ProvFeedSubnet(fid, sn));
+ }
+ }
+ }
+
+ private void addJSONSubs(ArrayList<ProvSubscription> provSubscriptions1, JSONObject jsonConfig) {
+ JSONArray jsubs = jsonConfig.optJSONArray("subscriptions");
+ if (jsubs != null) {
+ for (int sx = 0; sx < jsubs.length(); sx++) {
+ addJSONSub(provSubscriptions1, jsubs, sx);
+ }
+ }
+ }
+
+ private void addJSONSub(ArrayList<ProvSubscription> provSubscriptions1, JSONArray jsubs, int sx) {
+ JSONObject jsub = jsubs.getJSONObject(sx);
+ if (jsub.optBoolean("suspend", false)) {
+ return;
+ }
+ String sid = gvas(jsub, "subid");
+ String fid = gvas(jsub, FEED_ID);
+ JSONObject jdel = jsub.getJSONObject("delivery");
+ String delurl = gvas(jdel, "url");
+ String id = gvas(jdel, "user");
+ String password = gvas(jdel, "password");
+ boolean monly = jsub.getBoolean("metadataOnly");
+ boolean use100 = jdel.getBoolean("use100");
+ boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+ boolean decompress = jsub.getBoolean("decompress");
+ boolean followRedirect = jsub.getBoolean("follow_redirect");
+ provSubscriptions1
+ .add(new ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100,
+ privilegedSubscriber, followRedirect, decompress));
+ }
+
+ private void addJSONParams(ArrayList<ProvNode> provNodes1, ArrayList<ProvParam> provParams1,
+ JSONObject jsonconfig) {
+ JSONObject jparams = jsonconfig.optJSONObject("parameters");
+ if (jparams != null) {
+ for (String pname : JSONObject.getNames(jparams)) {
+ addJSONParam(provParams1, jparams, pname);
+ }
+ addJSONNodesToParams(provNodes1, jparams);
+ }
+ }
+
+ private void addJSONParam(ArrayList<ProvParam> provParams1, JSONObject jparams, String pname) {
+ String pvalue = gvas(jparams, pname);
+ if (pvalue != null) {
+ provParams1.add(new ProvParam(pname, pvalue));
+ }
+ }
+
+ private void addJSONNodesToParams(ArrayList<ProvNode> provNodes1, JSONObject jparams) {
+ String sfx = gvas(jparams, "PROV_DOMAIN");
+ JSONArray jnodes = jparams.optJSONArray("NODES");
+ if (jnodes != null) {
+ for (int nx = 0; nx < jnodes.length(); nx++) {
+ String nn = gvas(jnodes, nx);
+ if (nn == null) {
+ continue;
+ }
+ if (nn.indexOf('.') == -1) {
+ nn = nn + "." + sfx;
+ }
+ provNodes1.add(new ProvNode(nn));
+ }
+ }
+ }
+
+ private void addJSONRoutingInformation(ArrayList<ProvForceIngress> provForceIngresses1,
+ ArrayList<ProvForceEgress> provForceEgresses1, ArrayList<ProvHop> provHops1, JSONObject jsonConfig) {
+ JSONArray jingresses = jsonConfig.optJSONArray("ingress");
+ if (jingresses != null) {
+ for (int fx = 0; fx < jingresses.length(); fx++) {
+ addJSONIngressRoute(provForceIngresses1, jingresses, fx);
+ }
+ }
+ JSONObject jegresses = jsonConfig.optJSONObject("egress");
+ if (jegresses != null && JSONObject.getNames(jegresses) != null) {
+ for (String esid : JSONObject.getNames(jegresses)) {
+ addJSONEgressRoute(provForceEgresses1, jegresses, esid);
+ }
+ }
+ JSONArray jhops = jsonConfig.optJSONArray("routing");
+ if (jhops != null) {
+ for (int fx = 0; fx < jhops.length(); fx++) {
+ addJSONRoutes(provHops1, jhops, fx);
+ }
+ }
+ }
+
+ private void addJSONIngressRoute(ArrayList<ProvForceIngress> provForceIngresses1, JSONArray jingresses, int fx) {
+ JSONObject jingress = jingresses.getJSONObject(fx);
+ String fid = gvas(jingress, FEED_ID);
+ String subnet = gvas(jingress, "subnet");
+ String user = gvas(jingress, "user");
+ String[] nodes = gvasa(jingress, "node");
+ if (fid == null || "".equals(fid)) {
+ return;
+ }
+ if ("".equals(subnet)) {
+ subnet = null;
+ }
+ if ("".equals(user)) {
+ user = null;
+ }
+ provForceIngresses1.add(new ProvForceIngress(fid, subnet, user, nodes));
+ }
+
+ private void addJSONEgressRoute(ArrayList<ProvForceEgress> provForceEgresses1, JSONObject jegresses, String esid) {
+ String enode = gvas(jegresses, esid);
+ if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {
+ provForceEgresses1.add(new ProvForceEgress(esid, enode));
+ }
+ }
+
+ private void addJSONRoutes(ArrayList<ProvHop> provHops1, JSONArray jhops, int fx) {
+ JSONObject jhop = jhops.getJSONObject(fx);
+ String from = gvas(jhop, "from");
+ String to = gvas(jhop, "to");
+ String via = gvas(jhop, "via");
+ if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {
+ return;
+ }
+ provHops1.add(new ProvHop(from, to, via));
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
index 3d4908e8..5b7248af 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
@@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node;
* Generate publish IDs
*/
public class PublishId {
+
private long nextuid;
private String myname;
@@ -41,7 +42,8 @@ public class PublishId {
}
/**
- * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes.
+ * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log
+ * correlation purposes.
*/
public synchronized String next() {
long now = System.currentTimeMillis();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
index 42af8ca0..94b694d4 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
@@ -24,13 +24,15 @@
package org.onap.dmaap.datarouter.node;
-import java.util.*;
+import java.util.Timer;
+import java.util.TimerTask;
/**
* Execute an operation no more frequently than a specified interval
*/
public abstract class RateLimitedOperation implements Runnable {
+
private boolean marked; // a timer task exists
private boolean executing; // the operation is currently in progress
private boolean remark; // a request was made while the operation was in progress
@@ -41,29 +43,15 @@ public abstract class RateLimitedOperation implements Runnable {
/**
* Create a rate limited operation
*
- * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin
- * @param timer The timer used to perform deferred executions
+ * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can
+ * begin
+ * @param timer The timer used to perform deferred executions
*/
public RateLimitedOperation(long mininterval, Timer timer) {
this.timer = timer;
this.mininterval = mininterval;
}
- private class deferred extends TimerTask {
- public void run() {
- execute();
- }
- }
-
- private synchronized void unmark() {
- marked = false;
- }
-
- private void execute() {
- unmark();
- request();
- }
-
/**
* Request that the operation be performed by this thread or at a later time by the timer
*/
@@ -90,7 +78,7 @@ public abstract class RateLimitedOperation implements Runnable {
if (last + mininterval > now) {
// too soon - schedule a timer
marked = true;
- timer.schedule(new deferred(), last + mininterval - now);
+ timer.schedule(new Deferred(), last + mininterval - now);
return (true);
}
last = now;
@@ -107,4 +95,20 @@ public abstract class RateLimitedOperation implements Runnable {
}
return (false);
}
+
+ private class Deferred extends TimerTask {
+
+ public void run() {
+ execute();
+ }
+
+ private void execute() {
+ unmark();
+ request();
+ }
+
+ private synchronized void unmark() {
+ marked = false;
+ }
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
index 4cd650b0..83e3c30d 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
@@ -29,27 +29,27 @@ import com.att.eelf.configuration.EELFManager;
import java.io.BufferedReader;
import java.io.FileOutputStream;
import java.io.FileReader;
-import java.io.IOException;
import java.io.OutputStream;
-import java.util.Hashtable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Timer;
/**
* Track redirections of subscriptions
*/
public class RedirManager {
+
private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(RedirManager.class);
- private Hashtable<String, String> sid2primary = new Hashtable<String, String>();
- private Hashtable<String, String> sid2secondary = new Hashtable<String, String>();
- private String redirfile;
RateLimitedOperation op;
+ private HashMap<String, String> sid2primary = new HashMap<>();
+ private HashMap<String, String> sid2secondary = new HashMap<>();
+ private String redirfile;
/**
* Create a mechanism for maintaining subscription redirections.
*
* @param redirfile The file to store the redirection information.
- * @param mininterval The minimum number of milliseconds between writes to the redirection
- * information file.
+ * @param mininterval The minimum number of milliseconds between writes to the redirection information file.
* @param timer The timer thread used to run delayed file writes.
*/
public RedirManager(String redirfile, long mininterval, Timer timer) {
@@ -57,10 +57,12 @@ public class RedirManager {
op = new RateLimitedOperation(mininterval, timer) {
public void run() {
try {
- StringBuffer sb = new StringBuffer();
- for (String s : sid2primary.keySet()) {
- sb.append(s).append(' ').append(sid2primary.get(s)).append(' ')
- .append(sid2secondary.get(s)).append('\n');
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : sid2primary.entrySet()) {
+ String s = entry.getKey();
+ String value = entry.getValue();
+ sb.append(s).append(' ').append(value).append(' ')
+ .append(sid2secondary.get(s)).append('\n');
}
try (OutputStream os = new FileOutputStream(RedirManager.this.redirfile)) {
os.write(sb.toString().getBytes());
@@ -74,23 +76,17 @@ public class RedirManager {
String s;
try (BufferedReader br = new BufferedReader(new FileReader(redirfile))) {
while ((s = br.readLine()) != null) {
- s = s.trim();
- String[] sx = s.split(" ");
- if (s.startsWith("#") || sx.length != 3) {
- continue;
- }
- sid2primary.put(sx[0], sx[1]);
- sid2secondary.put(sx[0], sx[2]);
+ addSubRedirInfo(s);
}
}
} catch (Exception e) {
- eelfLogger.error("Missing file is normal", e);
+ eelfLogger.debug("Missing file is normal", e);
}
}
/**
- * Set up redirection. If a request is to be sent to subscription ID sid, and that is
- * configured to go to URL primary, instead, go to secondary.
+ * Set up redirection. If a request is to be sent to subscription ID sid, and that is configured to go to URL
+ * primary, instead, go to secondary.
*
* @param sid The subscription ID to be redirected
* @param primary The URL associated with that subscription ID
@@ -103,8 +99,7 @@ public class RedirManager {
}
/**
- * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its
- * primary URL.
+ * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its primary URL.
*
* @param sid The subscription ID to remove from the table.
*/
@@ -115,8 +110,8 @@ public class RedirManager {
}
/**
- * Look up where to send a subscription. If the primary has changed or there is no redirection,
- * use the primary. Otherwise, redirect to the secondary URL.
+ * Look up where to send a subscription. If the primary has changed or there is no redirection, use the primary.
+ * Otherwise, redirect to the secondary URL.
*
* @param sid The subscription ID to look up.
* @param primary The configured primary URL.
@@ -138,4 +133,14 @@ public class RedirManager {
public synchronized boolean isRedirected(String sid) {
return (sid != null && sid2secondary.get(sid) != null);
}
+
+ private void addSubRedirInfo(String s) {
+ s = s.trim();
+ String[] sx = s.split(" ");
+ if (s.startsWith("#") || sx.length != 3) {
+ return;
+ }
+ sid2primary.put(sx[0], sx[1]);
+ sid2secondary.put(sx[0], sx[2]);
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
index 1be3408a..e6165588 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
@@ -25,20 +25,27 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import java.util.regex.*;
-import java.util.*;
-import java.io.*;
-import java.nio.file.*;
-import java.text.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Logging for data router delivery events (PUB/DEL/EXP)
*/
public class StatusLog {
+
+ private static final String EXCEPTION = "Exception";
private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(StatusLog.class);
private static StatusLog instance = new StatusLog();
- private HashSet<String> toship = new HashSet<String>();
- private SimpleDateFormat filedate;
+ private SimpleDateFormat filedate = new SimpleDateFormat("-yyyyMMddHHmm");
+
private String prefix = "logs/events";
private String suffix = ".log";
private String plainfile;
@@ -48,85 +55,75 @@ public class StatusLog {
private long intvl;
private NodeConfigManager config = NodeConfigManager.getInstance();
- {
- try {
- filedate = new SimpleDateFormat("-yyyyMMddHHmm");
- } catch (Exception e) {
- eelfLogger.error("Exception", e);
- }
+ private StatusLog() {
}
/**
- * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are specified, assume seconds.
+ * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are
+ * specified, assume seconds.
*/
public static long parseInterval(String interval, int def) {
try {
Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval);
if (m.matches()) {
- int dur = 0;
- String x = m.group(1);
- if (x != null) {
- dur += 3600 * Integer.parseInt(x);
- }
- x = m.group(2);
- if (x != null) {
- dur += 60 * Integer.parseInt(x);
- }
- x = m.group(3);
- if (x != null) {
- dur += Integer.parseInt(x);
- }
- if (dur < 60) {
- dur = 60;
- }
+ int dur = getDur(m);
int best = 86400;
int dist = best - dur;
if (dur > best) {
dist = dur - best;
}
- int base = 1;
- for (int i = 0; i < 8; i++) {
- int base2 = base;
- base *= 2;
- for (int j = 0; j < 4; j++) {
- int base3 = base2;
- base2 *= 3;
- for (int k = 0; k < 3; k++) {
- int cur = base3;
- base3 *= 5;
- int ndist = cur - dur;
- if (dur > cur) {
- ndist = dur - cur;
- }
- if (ndist < dist) {
- best = cur;
- dist = ndist;
- }
- }
- }
- }
+ best = getBest(dur, best, dist);
def = best * 1000;
}
} catch (Exception e) {
- eelfLogger.error("Exception", e);
+ eelfLogger.error(EXCEPTION, e);
}
return (def);
}
- private synchronized void checkRoll(long now) throws IOException {
- if (now >= nexttime) {
- if (os != null) {
- os.close();
- os = null;
+ private static int getBest(int dur, int best, int dist) {
+ int base = 1;
+ for (int i = 0; i < 8; i++) {
+ int base2 = base;
+ base *= 2;
+ for (int j = 0; j < 4; j++) {
+ int base3 = base2;
+ base2 *= 3;
+ for (int k = 0; k < 3; k++) {
+ int cur = base3;
+ base3 *= 5;
+ int ndist = cur - dur;
+ if (dur > cur) {
+ ndist = dur - cur;
+ }
+ if (ndist < dist) {
+ best = cur;
+ dist = ndist;
+ }
+ }
}
- intvl = parseInterval(config.getEventLogInterval(), 300000);
- prefix = config.getEventLogPrefix();
- suffix = config.getEventLogSuffix();
- nexttime = now - now % intvl + intvl;
- curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;
- plainfile = prefix + suffix;
- notify();
}
+ return best;
+ }
+
+ private static int getDur(Matcher m) {
+ int dur = 0;
+ String x = m.group(1);
+ if (x != null) {
+ dur += 3600 * Integer.parseInt(x);
+ }
+ x = m.group(2);
+ if (x != null) {
+ dur += 60 * Integer.parseInt(x);
+ }
+ x = m.group(3);
+ if (x != null) {
+ dur += Integer.parseInt(x);
+ }
+ if (dur < 60) {
+ dur = 60;
+ }
+ return dur;
}
/**
@@ -138,111 +135,107 @@ public class StatusLog {
try {
instance.checkRoll(System.currentTimeMillis());
} catch (Exception e) {
- eelfLogger.error("Exception", e);
+ eelfLogger.error(EXCEPTION, e);
}
return (instance.curfile);
}
- private synchronized void log(String s) {
- try {
- long now = System.currentTimeMillis();
- checkRoll(now);
- if (os == null) {
- os = new FileOutputStream(curfile, true);
- (new File(plainfile)).delete();
- Files.createLink(Paths.get(plainfile), Paths.get(curfile));
- }
- os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes());
- os.flush();
- } catch (IOException ioe) {
- eelfLogger.error("IOException", ioe);
- }
- }
-
/**
* Log a received publication attempt.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed id given by the publisher
* @param requrl The URL of the received request
* @param method The method (DELETE or PUT) in the received request
- * @param ctype The content type (if method is PUT and clen > 0)
- * @param clen The content length (if method is PUT)
- * @param srcip The IP address of the publisher
- * @param user The identity of the publisher
+ * @param ctype The content type (if method is PUT and clen > 0)
+ * @param clen The content length (if method is PUT)
+ * @param srcip The IP address of the publisher
+ * @param user The identity of the publisher
* @param status The status returned to the publisher
*/
- public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) {
- instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status);
+ public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen,
+ String srcip, String user, int status) {
+ instance.log(
+ "PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip
+ + "|" + user + "|" + status);
}
/**
* Log a data transfer error receiving a publication attempt
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed id given by the publisher
* @param requrl The URL of the received request
* @param method The method (DELETE or PUT) in the received request
- * @param ctype The content type (if method is PUT and clen > 0)
- * @param clen The expected content length (if method is PUT)
- * @param rcvd The content length received
- * @param srcip The IP address of the publisher
- * @param user The identity of the publisher
- * @param error The error message from the IO exception
+ * @param ctype The content type (if method is PUT and clen > 0)
+ * @param clen The expected content length (if method is PUT)
+ * @param rcvd The content length received
+ * @param srcip The IP address of the publisher
+ * @param user The identity of the publisher
+ * @param error The error message from the IO exception
*/
- public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) {
- instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error);
+ public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen,
+ long rcvd, String srcip, String user, String error) {
+ instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd
+ + "|" + srcip + "|" + user + "|" + error);
}
/**
* Log a delivery attempt.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed ID
- * @param subid The (space delimited list of) subscription ID
+ * @param subid The (space delimited list of) subscription ID
* @param requrl The URL used in the attempt
* @param method The method (DELETE or PUT) in the attempt
- * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
- * @param clen The content length (if PUT and not metaonly)
- * @param user The identity given to the subscriber
+ * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
+ * @param clen The content length (if PUT and not metaonly)
+ * @param user The identity given to the subscriber
* @param status The status returned by the subscriber or -1 if an exeception occured trying to connect
* @param xpubid The publish ID returned by the subscriber
*/
- public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) {
+ public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype,
+ long clen, String user, int status, String xpubid) {
if (feedid == null) {
return;
}
- instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid);
+ instance.log(
+ "DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen
+ + "|" + user + "|" + status + "|" + xpubid);
}
/**
* Log delivery attempts expired
*
- * @param pubid The publish ID assigned by the node
- * @param feedid The feed ID
- * @param subid The (space delimited list of) subscription ID
- * @param requrl The URL that would be delivered to
- * @param method The method (DELETE or PUT) in the request
- * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
- * @param clen The content length (if PUT and not metaonly)
- * @param reason The reason the attempts were discontinued
+ * @param pubid The publish ID assigned by the node
+ * @param feedid The feed ID
+ * @param subid The (space delimited list of) subscription ID
+ * @param requrl The URL that would be delivered to
+ * @param method The method (DELETE or PUT) in the request
+ * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
+ * @param clen The content length (if PUT and not metaonly)
+ * @param reason The reason the attempts were discontinued
* @param attempts The number of attempts made
*/
- public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) {
+ public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype,
+ long clen, String reason, int attempts) {
if (feedid == null) {
return;
}
- instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts);
+ instance.log(
+ "EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen
+ + "|" + reason + "|" + attempts);
}
/**
* Log extra statistics about unsuccessful delivery attempts.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed ID
- * @param subid The (space delimited list of) subscription ID
- * @param clen The content length
- * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred.
+ * @param subid The (space delimited list of) subscription ID
+ * @param clen The content length
+ * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the
+ * number of bytes sent before an error occurred.
*/
public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) {
if (feedid == null) {
@@ -251,6 +244,35 @@ public class StatusLog {
instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent);
}
- private StatusLog() {
+ private synchronized void checkRoll(long now) throws IOException {
+ if (now >= nexttime) {
+ if (os != null) {
+ os.close();
+ os = null;
+ }
+ intvl = parseInterval(config.getEventLogInterval(), 300000);
+ prefix = config.getEventLogPrefix();
+ suffix = config.getEventLogSuffix();
+ nexttime = now - now % intvl + intvl;
+ curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;
+ plainfile = prefix + suffix;
+ notify();
+ }
+ }
+
+ private synchronized void log(String s) {
+ try {
+ long now = System.currentTimeMillis();
+ checkRoll(now);
+ if (os == null) {
+ os = new FileOutputStream(curfile, true);
+ (new File(plainfile)).delete();
+ Files.createLink(Paths.get(plainfile), Paths.get(curfile));
+ }
+ os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes());
+ os.flush();
+ } catch (IOException ioe) {
+ eelfLogger.error("IOException", ioe);
+ }
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
index 6f74df48..fd5a6bc6 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
@@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node;
* Compare IP addresses as byte arrays to a subnet specified as a CIDR
*/
public class SubnetMatcher {
+
private byte[] sn;
private int len;
private int mask;
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
index eb10876e..d86b2e92 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
@@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node;
* A destination to deliver a message
*/
public class Target {
+
private DestInfo destinfo;
private String routing;
@@ -35,7 +36,7 @@ public class Target {
* A destination to deliver a message
*
* @param destinfo Either info for a subscription ID or info for a node-to-node transfer
- * @param routing For a node-to-node transfer, what to do when it gets there.
+ * @param routing For a node-to-node transfer, what to do when it gets there.
*/
public Target(DestInfo destinfo, String routing) {
this.destinfo = destinfo;
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
index 33e4f801..1eb73c69 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
@@ -24,41 +24,37 @@
package org.onap.dmaap.datarouter.node;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Iterator;
/**
- * Manage a list of tasks to be executed when an event occurs.
- * This makes the following guarantees:
+ * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees:
* <ul>
* <li>Tasks can be safely added and removed in the middle of a run.</li>
* <li>No task will be returned more than once during a run.</li>
* <li>No task will be returned when it is not, at that moment, in the list of tasks.</li>
* <li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li>
- * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is called.
+ * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is
+ * called.
* </ul>
*/
public class TaskList {
+
private Iterator<Runnable> runlist;
- private HashSet<Runnable> tasks = new HashSet<Runnable>();
+ private HashSet<Runnable> tasks = new HashSet<>();
private HashSet<Runnable> togo;
private HashSet<Runnable> sofar;
private HashSet<Runnable> added;
private HashSet<Runnable> removed;
/**
- * Construct a new TaskList
- */
- public TaskList() {
- }
-
- /**
* Start executing the sequence of tasks.
*/
public synchronized void startRun() {
- sofar = new HashSet<Runnable>();
- added = new HashSet<Runnable>();
- removed = new HashSet<Runnable>();
- togo = new HashSet<Runnable>(tasks);
+ sofar = new HashSet<>();
+ added = new HashSet<>();
+ removed = new HashSet<>();
+ togo = new HashSet<>(tasks);
runlist = togo.iterator();
}
@@ -69,18 +65,13 @@ public class TaskList {
while (runlist != null) {
if (runlist.hasNext()) {
Runnable task = runlist.next();
- if (removed.contains(task)) {
- continue;
+ if (addTaskToSoFar(task)) {
+ return task;
}
- if (sofar.contains(task)) {
- continue;
- }
- sofar.add(task);
- return (task);
}
- if (added.size() != 0) {
+ if (!added.isEmpty()) {
togo = added;
- added = new HashSet<Runnable>();
+ added = new HashSet<>();
removed.clear();
runlist = togo.iterator();
continue;
@@ -115,4 +106,15 @@ public class TaskList {
}
tasks.remove(task);
}
+
+ private boolean addTaskToSoFar(Runnable task) {
+ if (removed.contains(task)) {
+ return false;
+ }
+ if (sofar.contains(task)) {
+ return false;
+ }
+ sofar.add(task);
+ return true;
+ }
}
diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java
index 08120073..c21bdecc 100644
--- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java
+++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java
@@ -97,7 +97,7 @@ public class DeliveryTest {
private DestInfo[] createDestInfoObjects() {
DestInfo[] destInfos = new DestInfo[1];
- DestInfo destInfo = new DestInfo.DestInfoBuilder().setName("node.datarouternew.com").setSpool("spool/s/0/1").setSubid("1")
+ DestInfo destInfo = new DestInfoBuilder().setName("node.datarouternew.com").setSpool("spool/s/0/1").setSubid("1")
.setLogdata("logs/").setUrl("/subs/1").setAuthuser("user1").setAuthentication("Basic dXNlcjE6cGFzc3dvcmQx")
.setMetaonly(false).setUse100(true).setPrivilegedSubscriber(false).setFollowRedirects(false)
.setDecompress(false).createDestInfo();
diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java
index db71ceae..a375f026 100644
--- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java
+++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java
@@ -219,7 +219,7 @@ public class NodeServletTest {
@Test
public void Given_Request_Is_HTTP_PUT_On_Publish_On_AAF_Feed_And_Cadi_Enabled_And_No_Permissions_Then_Forbidden_Response_Is_Generated() throws Exception {
- when(config.getCadiEnabeld()).thenReturn(true);
+ when(config.getCadiEnabled()).thenReturn(true);
when(config.getAafInstance("1")).thenReturn("*");
when(request.getPathInfo()).thenReturn("/publish/1/fileName");
setHeadersForValidRequest(true);