summaryrefslogtreecommitdiffstats
path: root/datarouter-node/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'datarouter-node/src/main/java')
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java58
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java202
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java124
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java330
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java27
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java150
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java149
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java39
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java146
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java1017
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java311
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java172
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java282
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java151
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java143
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java66
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java489
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java8
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java51
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java77
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java273
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java15
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java13
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java62
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/AuditFilter.java38
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java43
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/JettyFilter.java37
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/MetricsFilter.java42
28 files changed, 2509 insertions, 2006 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
index 30ad1618..245dbccd 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
@@ -17,23 +17,24 @@
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.onap.aaf.cadi.PropAccess;
-import org.onap.aaf.cadi.filter.CadiFilter;
-
+import java.io.IOException;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
+import org.onap.aaf.cadi.PropAccess;
+import org.onap.aaf.cadi.filter.CadiFilter;
public class DRNodeCadiFilter extends CadiFilter {
+
private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeServlet.class);
DRNodeCadiFilter(boolean init, PropAccess access) throws ServletException {
@@ -41,23 +42,16 @@ public class DRNodeCadiFilter extends CadiFilter {
}
@Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String path = httpRequest.getPathInfo();
if (!(path.startsWith("/internal"))) {
- if (!(httpRequest.getMethod().equalsIgnoreCase("POST"))) {
- if (httpRequest.getMethod().equalsIgnoreCase("DELETE") && path.startsWith("/delete")) {
+ if (!("POST".equalsIgnoreCase(httpRequest.getMethod()))) {
+ if ("DELETE".equalsIgnoreCase(httpRequest.getMethod()) && path.startsWith("/delete")) {
chain.doFilter(request, response);
} else {
- String feedId = getFeedId(request, response);
- String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId);
- if (aafDbInstance != null && !aafDbInstance.equals("") && !aafDbInstance.equalsIgnoreCase("legacy")) {
- logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance);
- super.doFilter(request, response, chain);
- } else {
- logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed");
- chain.doFilter(request, response);
- }
+ doFilterWithFeedId(request, response, chain);
}
}
} else {
@@ -72,9 +66,10 @@ public class DRNodeCadiFilter extends CadiFilter {
if (fileid == null) {
logger.error("NODE0105 Rejecting bad URI for PUT " + req.getPathInfo() + " from " + req.getRemoteAddr());
try {
- resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
} catch (IOException e) {
- logger.error("NODE0541 DRNodeCadiFilter.getFeedId: ", e.getMessage());
+ logger.error("NODE0541 DRNodeCadiFilter.getFeedId: ", e);
}
return null;
}
@@ -82,19 +77,34 @@ public class DRNodeCadiFilter extends CadiFilter {
if (fileid.startsWith("/publish/")) {
fileid = fileid.substring(9);
- int i = fileid.indexOf('/');
- if (i == -1 || i == fileid.length() - 1) {
- logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req.getRemoteAddr());
+ int index = fileid.indexOf('/');
+ if (index == -1 || index == fileid.length() - 1) {
+ logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
try {
- resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid.");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. "
+ + "Possible missing fileid.");
} catch (IOException e) {
- logger.error("NODE0542 DRNodeCadiFilter.getFeedId: ", e.getMessage());
+ logger.error("NODE0542 DRNodeCadiFilter.getFeedId: ", e);
}
return null;
}
- feedid = fileid.substring(0, i);
+ feedid = fileid.substring(0, index);
}
return feedid;
}
+ private void doFilterWithFeedId(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+ String feedId = getFeedId(request, response);
+ String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId);
+ if (aafDbInstance != null && !"".equals(aafDbInstance) && !"legacy".equalsIgnoreCase(aafDbInstance)) {
+ logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance);
+ super.doFilter(request, response, chain);
+ } else {
+ logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed");
+ chain.doFilter(request, response);
+ }
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
index 501e489c..150d2aa2 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
@@ -23,67 +23,37 @@
package org.onap.dmaap.datarouter.node;
-import java.util.*;
-import java.io.*;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
/**
* Main control point for delivering files to destinations.
- * <p>
- * The Delivery class manages assignment of delivery threads to delivery
- * queues and creation and destruction of delivery queues as
- * configuration changes. DeliveryQueues are assigned threads based on a
- * modified round-robin approach giving priority to queues with more work
- * as measured by both bytes to deliver and files to deliver and lower
- * priority to queues that already have delivery threads working.
- * A delivery thread continues to work for a delivery queue as long as
- * that queue has more files to deliver.
+ *
+ * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
+ * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
+ * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
+ * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
+ * queue as long as that queue has more files to deliver.
*/
public class Delivery {
- private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
-
- private static class DelItem implements Comparable<DelItem> {
- private String pubid;
- private String spool;
-
- public int compareTo(DelItem x) {
- int i = pubid.compareTo(x.pubid);
- if (i == 0) {
- i = spool.compareTo(x.spool);
- }
- return (i);
- }
-
- public String getPublishId() {
- return (pubid);
- }
-
- public String getSpool() {
- return (spool);
- }
-
- public DelItem(String pubid, String spool) {
- this.pubid = pubid;
- this.spool = spool;
- }
- }
+ private static final String TOTAL = " total=";
+ private static final String YELLOW = " yellow=";
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
private double fdstart;
private double fdstop;
private int threads;
private int curthreads;
private NodeConfigManager config;
- private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
+ private HashMap<String, DeliveryQueue> dqs = new HashMap<>();
private DeliveryQueue[] queues = new DeliveryQueue[0];
private int qpos = 0;
private long nextcheck;
- private Runnable cmon = new Runnable() {
- public void run() {
- checkconfig();
- }
- };
/**
* Constructs a new Delivery system using the specified configuration manager.
@@ -92,10 +62,37 @@ public class Delivery {
*/
public Delivery(NodeConfigManager config) {
this.config = config;
+ Runnable cmon = this::checkconfig;
config.registerConfigTask(cmon);
checkconfig();
}
+ /**
+ * Reset the retry timer for a delivery queue.
+ */
+ public synchronized void resetQueue(String spool) {
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ dq.resetQueue();
+ }
+ }
+ }
+
+ /**
+ * Mark the task in spool a success.
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
+
private void cleardir(String dir) {
if (dqs.get(dir) != null) {
return;
@@ -113,12 +110,11 @@ public class Delivery {
File spoolfile = new File(config.getSpoolBase());
long tspace = spoolfile.getTotalSpace();
long start = (long) (tspace * fdstart);
- long stop = (long) (tspace * fdstop);
long cur = spoolfile.getUsableSpace();
if (cur >= start) {
return;
}
- Vector<DelItem> cv = new Vector<DelItem>();
+ ArrayList<DelItem> cv = new ArrayList<>();
for (String sdir : dqs.keySet()) {
for (String meta : (new File(sdir)).list()) {
if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
@@ -129,27 +125,21 @@ public class Delivery {
}
DelItem[] items = cv.toArray(new DelItem[cv.size()]);
Arrays.sort(items);
- logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace);
- for (DelItem item : items) {
- long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
- logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
- if (amount > 0) {
- cur += amount;
- if (cur >= stop) {
- cur = spoolfile.getUsableSpace();
- }
- if (cur >= stop) {
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
- return;
- }
- }
+ long stop = (long) (tspace * fdstop);
+ logger.info(
+ "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace);
+ if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
+ return;
}
cur = spoolfile.getUsableSpace();
if (cur >= stop) {
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
+ logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
return;
}
- logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace);
+ logger.warn(
+ "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW
+ + stop + TOTAL + tspace);
}
private void cleardirs() {
@@ -188,7 +178,7 @@ public class Delivery {
DestInfo[] alldis = config.getAllDests();
DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
qpos = 0;
- Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
+ HashMap<String, DeliveryQueue> ndqs = new HashMap<>();
for (DestInfo di : alldis) {
String spl = di.getSpool();
DeliveryQueue dq = dqs.get(spl);
@@ -205,11 +195,8 @@ public class Delivery {
cleardirs();
while (curthreads < threads) {
curthreads++;
- (new Thread() {
- {
- setName("Delivery Thread");
- }
-
+ (new Thread("del-thread-" + curthreads) {
+ @Override
public void run() {
dodelivery();
}
@@ -246,6 +233,7 @@ public class Delivery {
try {
wait(nextcheck + 500 - now);
} catch (Exception e) {
+ logger.error("InterruptedException", e);
}
now = System.currentTimeMillis();
}
@@ -257,29 +245,69 @@ public class Delivery {
}
}
- /**
- * Reset the retry timer for a delivery queue
- */
- public synchronized void resetQueue(String spool) {
- if (spool != null) {
- DeliveryQueue dq = dqs.get(spool);
- if (dq != null) {
- dq.resetQueue();
+ private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
+ for (DelItem item : items) {
+ long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
+ logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
+ + " to free up disk");
+ if (amount > 0) {
+ cur += amount;
+ if (cur >= stop) {
+ cur = spoolfile.getUsableSpace();
+ }
+ if (cur >= stop) {
+ logger.info(
+ "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
+ return true;
+ }
}
}
+ return false;
}
- /**
- * Mark the task in spool a success
- */
- public synchronized boolean markTaskSuccess(String spool, String pubId) {
- boolean succeeded = false;
- if (spool != null) {
- DeliveryQueue dq = dqs.get(spool);
- if (dq != null) {
- succeeded = dq.markTaskSuccess(pubId);
+ static class DelItem implements Comparable<DelItem> {
+
+ private String pubid;
+ private String spool;
+
+ public DelItem(String pubid, String spool) {
+ this.pubid = pubid;
+ this.spool = spool;
+ }
+
+ public int compareTo(DelItem other) {
+ int diff = pubid.compareTo(other.pubid);
+ if (diff == 0) {
+ diff = spool.compareTo(other.spool);
}
+ return (diff);
+ }
+
+ public String getPublishId() {
+ return (pubid);
+ }
+
+ public String getSpool() {
+ return (spool);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ DelItem delItem = (DelItem) object;
+ return Objects.equals(pubid, delItem.pubid)
+ && Objects.equals(getSpool(), delItem.getSpool());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pubid, getSpool());
}
- return succeeded;
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
index bef8dab2..0ba9ecfd 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
@@ -24,8 +24,11 @@
package org.onap.dmaap.datarouter.node;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.io.*;
import java.util.*;
+import org.jetbrains.annotations.Nullable;
/**
* Mechanism for monitoring and controlling delivery of files to a destination.
@@ -64,6 +67,7 @@ import java.util.*;
* failure timer is active or if no files are found in a directory scan.
*/
public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
private DeliveryQueueHelper deliveryQueueHelper;
private DestInfo destinationInfo;
private Hashtable<String, DeliveryTask> working = new Hashtable<>();
@@ -113,6 +117,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
private synchronized void markSuccess(DeliveryTask task) {
working.remove(task.getPublishId());
+ logger.debug(task.getPublishId() + " marked as success.");
task.clean();
failed = false;
failduration = 0;
@@ -122,6 +127,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
* Mark that a delivery task has expired.
*/
private synchronized void markExpired(DeliveryTask task) {
+ logger.debug(task.getPublishId() + " marked as expired.");
task.clean();
}
@@ -130,6 +136,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
private synchronized void markFailNoRetry(DeliveryTask task) {
working.remove(task.getPublishId());
+ logger.debug(task.getPublishId() + " marked as failed permanently");
task.clean();
failed = false;
failduration = 0;
@@ -159,6 +166,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
private synchronized void markRedirect(DeliveryTask task) {
working.remove(task.getPublishId());
+ logger.debug(task.getPublishId() + " marked as redirected.");
retry.put(task.getPublishId(), task);
}
@@ -167,6 +175,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
*/
private synchronized void markFailWithRetry(DeliveryTask task) {
working.remove(task.getPublishId());
+ logger.debug(task.getPublishId() + " marked as temporarily failed.");
retry.put(task.getPublishId(), task);
fdupdate();
}
@@ -202,53 +211,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
todo = new Vector<>();
String[] files = dir.list();
Arrays.sort(files);
- for (String fname : files) {
- if (!fname.endsWith(".M")) {
- continue;
- }
- String fname2 = fname.substring(0, fname.length() - 2);
- long pidtime = 0;
- int dot = fname2.indexOf('.');
- if (dot < 1) {
- continue;
- }
- try {
- pidtime = Long.parseLong(fname2.substring(0, dot));
- } catch (Exception e) {
- }
- if (pidtime < 1000000000000L) {
- continue;
- }
- if (working.get(fname2) != null) {
- continue;
- }
- DeliveryTask dt = retry.get(fname2);
- if (dt == null) {
- dt = new DeliveryTask(this, fname2);
- }
- todo.add(dt);
- }
+ scanForNextTask(files);
retry = new Hashtable<>();
}
- if (todoindex < todo.size()) {
- DeliveryTask dt = todo.get(todoindex);
- if (dt.isCleaned()) {
- todoindex++;
- continue;
- }
- if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
- retry.put(dt.getPublishId(), dt);
- todoindex++;
- continue;
- }
- if (dt.getDate() >= mindate) {
- return (dt);
- }
- todoindex++;
- reportExpiry(dt);
- continue;
+ DeliveryTask dt = getDeliveryTask(mindate);
+ if (dt != null) {
+ return dt;
}
- return (null);
+ return null;
+
}
}
@@ -359,11 +330,12 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
* files to deliver
*/
public void run() {
- DeliveryTask t;
+ DeliveryTask task;
long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
int filestogo = deliveryQueueHelper.getFairFileLimit();
- while ((t = getNext()) != null) {
- t.run();
+ while ((task = getNext()) != null) {
+ logger.debug("Processing file: " + task.getPublishId());
+ task.run();
if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
break;
}
@@ -403,4 +375,62 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
}
return false;
}
+ private void scanForNextTask(String[] files) {
+ for (String fname : files) {
+ String pubId = getPubId(fname);
+ if (pubId == null) {
+ continue;
+ }
+ DeliveryTask dt = retry.get(pubId);
+ if (dt == null) {
+ dt = new DeliveryTask(this, pubId);
+ }
+ todo.add(dt);
+ }
+ }
+
+ @Nullable
+ private DeliveryTask getDeliveryTask(long mindate) {
+ if (todoindex < todo.size()) {
+ DeliveryTask dt = todo.get(todoindex);
+ if (dt.isCleaned()) {
+ todoindex++;
+ }
+ if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
+ retry.put(dt.getPublishId(), dt);
+ todoindex++;
+ }
+ if (dt.getDate() >= mindate) {
+ return (dt);
+ }
+ todoindex++;
+ reportExpiry(dt);
+ }
+ return null;
+ }
+
+ @Nullable
+ private String getPubId(String fname) {
+ if (!fname.endsWith(".M")) {
+ return null;
+ }
+ String fname2 = fname.substring(0, fname.length() - 2);
+ long pidtime = 0;
+ int dot = fname2.indexOf('.');
+ if (dot < 1) {
+ return null;
+ }
+ try {
+ pidtime = Long.parseLong(fname2.substring(0, dot));
+ } catch (Exception e) {
+ logger.error("Exception", e);
+ }
+ if (pidtime < 1000000000000L) {
+ return null;
+ }
+ if (working.get(fname2) != null) {
+ return null;
+ }
+ return fname2;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
index cca61707..7ed35928 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
@@ -24,29 +24,40 @@
package org.onap.dmaap.datarouter.node;
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.zip.GZIPInputStream;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+import org.jetbrains.annotations.Nullable;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
-import static com.att.eelf.configuration.Configuration.*;
-import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
-
/**
* A file to be delivered to a destination.
- * <p>
- * A Delivery task represents a work item for the data router - a file that
- * needs to be delivered and provides mechanisms to get information about
- * the file and its delivery data as well as to attempt delivery.
+ *
+ * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
+ * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
*/
public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
- private static EELFLogger eelfLogger = EELFManager.getInstance()
- .getLogger(DeliveryTask.class);
+
+ private static final String DECOMPRESSION_STATUS = "Decompression_Status";
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
private DeliveryTaskHelper deliveryTaskHelper;
private String pubid;
private DestInfo destInfo;
@@ -69,12 +80,11 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
/**
- * Create a delivery task for a given delivery queue and pub ID
+ * Create a delivery task for a given delivery queue and pub ID.
*
* @param deliveryTaskHelper The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as
- * the base for the file name in the spool directory and is of
- * the form <milliseconds since 1970>.<fqdn of initial data router node>
+ * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and
+ * is of the form (milliseconds since 1970).(fqdn of initial data router node)
*/
DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
this.deliveryTaskHelper = deliveryTaskHelper;
@@ -84,70 +94,70 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
this.followRedirects = destInfo.isFollowRedirects();
feedid = destInfo.getLogData();
spool = destInfo.getSpool();
- String dfn = spool + "/" + pubid;
+ String dfn = spool + File.separator + pubid;
String mfn = dfn + ".M";
- datafile = new File(spool + "/" + pubid);
+ datafile = new File(spool + File.separator + pubid);
metafile = new File(mfn);
boolean monly = destInfo.isMetaDataOnly();
date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
resumeTime = System.currentTimeMillis();
- Vector<String[]> hdrv = new Vector<>();
+ ArrayList<String[]> hdrv = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
- String s = br.readLine();
- int i = s.indexOf('\t');
- method = s.substring(0, i);
+ String line = br.readLine();
+ int index = line.indexOf('\t');
+ method = line.substring(0, index);
NodeUtils.setIpAndFqdnForEelf(method);
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- fileid = s.substring(i + 1);
- while ((s = br.readLine()) != null) {
- i = s.indexOf('\t');
- String h = s.substring(0, i);
- String v = s.substring(i + 1);
- if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
- subid = v.replaceAll("[^ ]*/", "");
+ fileid = line.substring(index + 1);
+ while ((line = br.readLine()) != null) {
+ index = line.indexOf('\t');
+ String header = line.substring(0, index);
+ String headerValue = line.substring(index + 1);
+ if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
+ subid = headerValue.replaceAll("[^ ]*/", "");
feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
}
- if (length == 0 && h.toLowerCase().startsWith("content-")) {
+ if (length == 0 && header.toLowerCase().startsWith("content-")) {
continue;
}
- if (h.equalsIgnoreCase("content-type")) {
- ctype = v;
+ if ("content-type".equalsIgnoreCase(header)) {
+ ctype = headerValue;
}
- if (h.equalsIgnoreCase("x-onap-requestid")) {
- MDC.put(MDC_KEY_REQUEST_ID, v);
+ if ("x-onap-requestid".equalsIgnoreCase(header)) {
+ MDC.put(MDC_KEY_REQUEST_ID, headerValue);
}
- if (h.equalsIgnoreCase("x-invocationid")) {
- MDC.put("InvocationId", v);
- v = UUID.randomUUID().toString();
- newInvocationId = v;
+ if ("x-invocationid".equalsIgnoreCase(header)) {
+ MDC.put("InvocationId", headerValue);
+ headerValue = UUID.randomUUID().toString();
+ newInvocationId = headerValue;
}
- hdrv.add(new String[]{h, v});
+ hdrv.add(new String[]{header, headerValue});
}
} catch (Exception e) {
- eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()), e.getMessage());
+ eelfLogger.error("Exception", e);
}
hdrs = hdrv.toArray(new String[hdrv.size()][]);
url = deliveryTaskHelper.getDestURL(fileid);
}
/**
- * Is the object a DeliveryTask with the same publication ID?
+ * Is the object a DeliveryTask with the same publication ID.
*/
- public boolean equals(Object o) {
- if (!(o instanceof DeliveryTask)) {
+ public boolean equals(Object object) {
+ if (!(object instanceof DeliveryTask)) {
return (false);
}
- return (pubid.equals(((DeliveryTask) o).pubid));
+ return (pubid.equals(((DeliveryTask) object).pubid));
}
/**
* Compare the publication IDs.
*/
- public int compareTo(DeliveryTask o) {
- return (pubid.compareTo(o.pubid));
+ public int compareTo(DeliveryTask other) {
+ return (pubid.compareTo(other.pubid));
}
/**
@@ -165,79 +175,49 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Get the publish ID
+ * Get the publish ID.
*/
String getPublishId() {
return (pubid);
}
/**
- * Attempt delivery
+ * Attempt delivery.
*/
public void run() {
attempts++;
try {
destInfo = deliveryTaskHelper.getDestinationInfo();
- boolean expect100 = destInfo.isUsing100();
boolean monly = destInfo.isMetaDataOnly();
length = 0;
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
- fileid = fileid.replace(".gz", "");
- }
+ stripSuffixIfIsDecompress();
url = deliveryTaskHelper.getDestURL(fileid);
- URL u = new URL(url);
- HttpURLConnection uc = (HttpURLConnection) u.openConnection();
- uc.setConnectTimeout(60000);
- uc.setReadTimeout(60000);
- uc.setInstanceFollowRedirects(false);
- uc.setRequestMethod(method);
- uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", destInfo.getAuth());
- uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
- for (String[] nv : hdrs) {
- uc.addRequestProperty(nv[0], nv[1]);
- }
- if (length > 0) {
- if (expect100) {
- uc.setRequestProperty("Expect", "100-continue");
- }
- uc.setDoOutput(true);
- if (destInfo.isDecompress()) {
- if (isFiletypeGzip(datafile)) {
- sendDecompressedFile(uc);
- } else {
- uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
- sendFile(uc);
- }
- } else {
- sendFile(uc);
- }
- }
- int rc = uc.getResponseCode();
- String rmsg = uc.getResponseMessage();
- if (rmsg == null) {
- String h0 = uc.getHeaderField(0);
- if (h0 != null) {
- int i = h0.indexOf(' ');
- int j = h0.indexOf(' ', i + 1);
- if (i != -1 && j != -1) {
- rmsg = h0.substring(j + 1);
- }
- }
- }
+ URL urlObj = new URL(url);
+ HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
+ urlConnection.setConnectTimeout(60000);
+ urlConnection.setReadTimeout(60000);
+ urlConnection.setInstanceFollowRedirects(false);
+ urlConnection.setRequestMethod(method);
+ urlConnection.setRequestProperty("Content-Length", Long.toString(length));
+ urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
+ urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
+ boolean expect100 = destInfo.isUsing100();
+ int rc = deliverFileToSubscriber(expect100, urlConnection);
+ String rmsg = urlConnection.getResponseMessage();
+ rmsg = getResponseMessage(urlConnection, rmsg);
String xpubid = null;
InputStream is;
if (rc >= 200 && rc <= 299) {
- is = uc.getInputStream();
- xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
+ is = urlConnection.getInputStream();
+ xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
} else {
if (rc >= 300 && rc <= 399) {
- rmsg = uc.getHeaderField("Location");
+ rmsg = urlConnection.getHeaderField("Location");
}
- is = uc.getErrorStream();
+ is = urlConnection.getErrorStream();
}
byte[] buf = new byte[4096];
if (is != null) {
@@ -247,23 +227,22 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
- eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e);
+ eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
deliveryTaskHelper.reportException(this, e);
}
}
/**
- * To send decompressed gzip to the subscribers
+ * To send decompressed gzip to the subscribers.
*
* @param httpURLConnection connection used to make request
- * @throws IOException
*/
private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
byte[] buffer = new byte[8164];
- httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+ httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
OutputStream outputStream = getOutputStream(httpURLConnection);
if (outputStream != null) {
- int bytesRead = 0;
+ int bytesRead;
try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
int bufferLength = buffer.length;
while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
@@ -271,8 +250,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
outputStream.close();
} catch (IOException e) {
- httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
- eelfLogger.info("Could not decompress file");
+ httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
+ eelfLogger.info("Could not decompress file", e);
sendFile(httpURLConnection);
}
@@ -283,44 +262,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
* To send any file to the subscriber.
*
* @param httpURLConnection connection used to make request
- * @throws IOException
*/
private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
OutputStream os = getOutputStream(httpURLConnection);
- if (os != null) {
- long sofar = 0;
- try (InputStream is = new FileInputStream(datafile)) {
- byte[] buf = new byte[1024 * 1024];
- while (sofar < length) {
- int i = buf.length;
- if (sofar + i > length) {
- i = (int) (length - sofar);
- }
- i = is.read(buf, 0, i);
- if (i <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += i;
- os.write(buf, 0, i);
+ if (os == null) {
+ return;
+ }
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int len = buf.length;
+ if (sofar + len > length) {
+ len = (int) (length - sofar);
}
- os.close();
- } catch (IOException ioe) {
- deliveryTaskHelper.reportDeliveryExtra(this, sofar);
- throw ioe;
+ len = is.read(buf, 0, len);
+ if (len <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
+ }
+ sofar += len;
+ os.write(buf, 0, len);
}
+ os.close();
+ } catch (IOException ioe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+ throw ioe;
}
}
/**
- * Get the outputstream that will be used to send data
+ * Get the outputstream that will be used to send data.
*
* @param httpURLConnection connection used to make request
* @return AN Outpustream that can be used to send your data.
- * @throws IOException
*/
private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
OutputStream outputStream = null;
-
try {
outputStream = httpURLConnection.getOutputStream();
} catch (ProtocolException pe) {
@@ -331,22 +308,74 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
return outputStream;
}
+ private void stripSuffixIfIsDecompress() {
+ if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
+ fileid = fileid.replace(".gz", "");
+ }
+ }
+
+ private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
+ for (String[] nv : hdrs) {
+ uc.addRequestProperty(nv[0], nv[1]);
+ }
+ if (length > 0) {
+ if (expect100) {
+ uc.setRequestProperty("Expect", "100-continue");
+ }
+ uc.setDoOutput(true);
+ if (destInfo.isDecompress()) {
+ if (isFiletypeGzip(datafile)) {
+ sendDecompressedFile(uc);
+ } else {
+ uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
+ sendFile(uc);
+ }
+ } else {
+ sendFile(uc);
+ }
+ }
+ return uc.getResponseCode();
+ }
+
+ @Nullable
+ private String getResponseMessage(HttpURLConnection uc, String rmsg) {
+ if (rmsg == null) {
+ String h0 = uc.getHeaderField(0);
+ if (h0 != null) {
+ int indexOfSpace1 = h0.indexOf(' ');
+ int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
+ if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
+ rmsg = h0.substring(indexOfSpace2 + 1);
+ }
+ }
+ }
+ return rmsg;
+ }
+
/**
- * Remove meta and data files
+ * Remove meta and data files.
*/
void clean() {
- datafile.delete();
- metafile.delete();
+ deleteWithRetry(datafile);
+ deleteWithRetry(metafile);
eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
eelfLogger.info(EelfMsgs.EXIT);
hdrs = null;
}
- /**
- * Set the resume time for a delivery task.
- */
- void setResumeTime(long resumeTime) {
- this.resumeTime = resumeTime;
+ private void deleteWithRetry(File file) {
+ int maxTries = 3;
+ int tryCount = 1;
+ while (tryCount <= maxTries) {
+ try {
+ Files.deleteIfExists(file.toPath());
+ break;
+ } catch (IOException e) {
+ eelfLogger.error("IOException : Failed to delete file :"
+ + file.getName() + " on attempt " + tryCount, e);
+ }
+ tryCount++;
+ }
}
/**
@@ -357,14 +386,21 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Has this delivery task been cleaned?
+ * Set the resume time for a delivery task.
+ */
+ void setResumeTime(long resumeTime) {
+ this.resumeTime = resumeTime;
+ }
+
+ /**
+ * Has this delivery task been cleaned.
*/
boolean isCleaned() {
return (hdrs == null);
}
/**
- * Get length of body
+ * Get length of body.
*/
public long getLength() {
return (length);
@@ -378,58 +414,58 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
/**
- * Get the most recent delivery attempt URL
+ * Get the most recent delivery attempt URL.
*/
public String getURL() {
return (url);
}
/**
- * Get the content type
+ * Get the content type.
*/
String getCType() {
return (ctype);
}
/**
- * Get the method
+ * Get the method.
*/
String getMethod() {
return (method);
}
/**
- * Get the file ID
+ * Get the file ID.
*/
String getFileId() {
return (fileid);
}
/**
- * Get the number of delivery attempts
+ * Get the number of delivery attempts.
*/
int getAttempts() {
return (attempts);
}
/**
- * Get the (space delimited list of) subscription ID for this delivery task
+ * Get the (space delimited list of) subscription ID for this delivery task.
*/
String getSubId() {
return (subid);
}
/**
- * Get the feed ID for this delivery task
+ * Get the feed ID for this delivery task.
*/
String getFeedId() {
return (feedid);
}
/**
- * Get the followRedirects for this delivery task
+ * Get the followRedirects for this delivery task.
*/
- public boolean getFollowRedirects() {
- return(followRedirects);
+ boolean getFollowRedirects() {
+ return (followRedirects);
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
index d4ac8bd6..b9068f2f 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
@@ -26,32 +26,33 @@ package org.onap.dmaap.datarouter.node;
/**
* Interface to allow independent testing of the DeliveryTask code.
- * <p>
- * This interface represents all the configuraiton information and
- * feedback mechanisms that a delivery task needs.
+ *
+ * <p>This interface represents all the configuraiton information and feedback mechanisms that a delivery task needs.
*/
public interface DeliveryTaskHelper {
+
/**
- * Report that a delivery attempt failed due to an exception (like can't connect to remote host)
+ * Report that a delivery attempt failed due to an exception (like can't connect to remote host).
*
- * @param task The task that failed
+ * @param task The task that failed
* @param exception The exception that occurred
*/
void reportException(DeliveryTask task, Exception exception);
/**
- * Report that a delivery attempt completed (successfully or unsuccessfully)
+ * Report that a delivery attempt completed (successfully or unsuccessfully).
*
- * @param task The task that failed
- * @param status The HTTP status
- * @param xpubid The publish ID from the far end (if any)
+ * @param task The task that failed
+ * @param status The HTTP status
+ * @param xpubid The publish ID from the far end (if any)
* @param location The redirection location for a 3XX response
*/
void reportStatus(DeliveryTask task, int status, String xpubid, String location);
/**
- * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
+ * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100
+ * Continue.
*
* @param task The task that failed
* @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
@@ -59,14 +60,14 @@ public interface DeliveryTaskHelper {
void reportDeliveryExtra(DeliveryTask task, long sent);
/**
- * Get the destination information for the delivery queue
+ * Get the destination information for the delivery queue.
*
* @return The destination information
*/
DestInfo getDestinationInfo();
/**
- * Given a file ID, get the URL to deliver to
+ * Given a file ID, get the URL to deliver to.
*
* @param fileid The file id
* @return The URL to deliver to
@@ -74,7 +75,7 @@ public interface DeliveryTaskHelper {
String getDestURL(String fileid);
/**
- * Get the feed ID for a subscription
+ * Get the feed ID for a subscription.
*
* @param subid The subscription ID
* @return The feed iD
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
index 8890fe96..f5fa6e98 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
@@ -25,9 +25,10 @@
package org.onap.dmaap.datarouter.node;
/**
- * Information for a delivery destination that doesn't change from message to message
+ * Information for a delivery destination that doesn't change from message to message.
*/
public class DestInfo {
+
private String name;
private String spool;
private String subid;
@@ -40,114 +41,33 @@ public class DestInfo {
private boolean privilegedSubscriber;
private boolean decompress;
private boolean followRedirects;
- private String aafInstance;
-
- public static class DestInfoBuilder {
- private String name;
- private String spool;
- private String subid;
- private String logdata;
- private String url;
- private String authuser;
- private String authentication;
- private boolean metaonly;
- private boolean use100;
- private boolean privilegedSubscriber;
- private boolean followRedirects;
- private boolean decompress;
- private NodeConfig.ProvSubscription subscription;
-
- public DestInfoBuilder setName(String name) {
- this.name = name;
- return this;
- }
-
- public DestInfoBuilder setSpool(String spool) {
- this.spool = spool;
- return this;
- }
-
- public DestInfoBuilder setSubid(String subid) {
- this.subid = subid;
- return this;
- }
-
- public DestInfoBuilder setLogdata(String logdata) {
- this.logdata = logdata;
- return this;
- }
-
- public DestInfoBuilder setUrl(String url) {
- this.url = url;
- return this;
- }
-
- public DestInfoBuilder setAuthuser(String authuser) {
- this.authuser = authuser;
- return this;
- }
-
- public DestInfoBuilder setAuthentication(String authentication) {
- this.authentication = authentication;
- return this;
- }
-
- public DestInfoBuilder setMetaonly(boolean metaonly) {
- this.metaonly = metaonly;
- return this;
- }
-
- public DestInfoBuilder setUse100(boolean use100) {
- this.use100 = use100;
- return this;
- }
-
- public DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) {
- this.privilegedSubscriber = privilegedSubscriber;
- return this;
- }
-
- public DestInfoBuilder setFollowRedirects(boolean followRedirects) {
- this.followRedirects = followRedirects;
- return this;
- }
-
- public DestInfoBuilder setDecompress(boolean decompress) {
- this.decompress = decompress;
- return this;
- }
-
- public DestInfoBuilder setSubscription(NodeConfig.ProvSubscription subscription) {
- this.subscription = subscription;
- return this;
- }
-
- public DestInfo createDestInfo() {
- return new DestInfo(this);
- }
- }
+ /**
+ * Create a destination information object.
+ *
+ * @param destInfoBuilder DestInfo Object Builder
+ */
public DestInfo(DestInfoBuilder destInfoBuilder) {
- this.name = destInfoBuilder.name;
- this.spool = destInfoBuilder.spool;
- this.subid = destInfoBuilder.subid;
- this.logdata = destInfoBuilder.logdata;
- this.url = destInfoBuilder.url;
- this.authuser = destInfoBuilder.authuser;
- this.authentication = destInfoBuilder.authentication;
- this.metaonly = destInfoBuilder.metaonly;
- this.use100 = destInfoBuilder.use100;
- this.privilegedSubscriber = destInfoBuilder.privilegedSubscriber;
- this.followRedirects = destInfoBuilder.followRedirects;
- this.decompress = destInfoBuilder.decompress;
+ this.name = destInfoBuilder.getName();
+ this.spool = destInfoBuilder.getSpool();
+ this.subid = destInfoBuilder.getSubid();
+ this.logdata = destInfoBuilder.getLogdata();
+ this.url = destInfoBuilder.getUrl();
+ this.authuser = destInfoBuilder.getAuthuser();
+ this.authentication = destInfoBuilder.getAuthentication();
+ this.metaonly = destInfoBuilder.isMetaonly();
+ this.use100 = destInfoBuilder.isUse100();
+ this.privilegedSubscriber = destInfoBuilder.isPrivilegedSubscriber();
+ this.followRedirects = destInfoBuilder.isFollowRedirects();
+ this.decompress = destInfoBuilder.isDecompress();
}
/**
* Create a destination information object.
*
- * @param name n:fqdn or s:subid
- * @param spool The directory where files are spooled.
- * @param subscription The subscription.
+ * @param name n:fqdn or s:subid
+ * @param spool The directory where files are spooled.
+ * @param subscription The subscription.
*/
public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
this.name = name;
@@ -164,8 +84,8 @@ public class DestInfo {
this.decompress = subscription.isDecompress();
}
- public boolean equals(Object o) {
- return ((o instanceof DestInfo) && ((DestInfo) o).spool.equals(spool));
+ public boolean equals(Object object) {
+ return ((object instanceof DestInfo) && ((DestInfo) object).spool.equals(spool));
}
public int hashCode() {
@@ -173,7 +93,7 @@ public class DestInfo {
}
/**
- * Get the name of this destination
+ * Get the name of this destination.
*/
public String getName() {
return (name);
@@ -217,7 +137,7 @@ public class DestInfo {
}
/**
- * Get the user for authentication
+ * Get the user for authentication.
*
* @return The name of the user for logging
*/
@@ -226,7 +146,7 @@ public class DestInfo {
}
/**
- * Get the authentication header
+ * Get the authentication header.
*
* @return The string to use to authenticate to the recipient.
*/
@@ -235,7 +155,7 @@ public class DestInfo {
}
/**
- * Is this a metadata only delivery?
+ * Is this a metadata only delivery.
*
* @return True if this is a metadata only delivery
*/
@@ -244,7 +164,7 @@ public class DestInfo {
}
/**
- * Should I send expect 100-continue header?
+ * Should I send expect 100-continue header.
*
* @return True if I should.
*/
@@ -253,23 +173,23 @@ public class DestInfo {
}
/**
- * Should we wait to receive a file processed acknowledgement before deleting file
+ * Should we wait to receive a file processed acknowledgement before deleting file.
*/
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
/**
- * Should I follow redirects?
- *
- * @return True if I should.
- */
+ * Should I follow redirects.
+ *
+ * @return True if I should.
+ */
public boolean isFollowRedirects() {
return (followRedirects);
}
/**
- * Should i decompress the file before sending it on
+ * Should i decompress the file before sending it on.
*
* @return True if I should.
*/
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java
new file mode 100644
index 00000000..00c5cd8b
--- /dev/null
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java
@@ -0,0 +1,149 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.datarouter.node;
+
+public class DestInfoBuilder {
+
+ private String destInfoName;
+ private String destInfoSpool;
+ private String destInfoSubId;
+ private String destInfoLogData;
+ private String destInfoUrl;
+ private String destInfoAuthUser;
+ private String destInfoAuthentication;
+ private boolean destInfoMetaOnly;
+ private boolean destInfoUse100;
+ private boolean destInfoPrivilegedSubscriber;
+ private boolean destInfoFollowRedirects;
+ private boolean destInfoDecompress;
+
+ public String getName() {
+ return destInfoName;
+ }
+
+ public DestInfoBuilder setName(String name) {
+ this.destInfoName = name;
+ return this;
+ }
+
+ public String getSpool() {
+ return destInfoSpool;
+ }
+
+ public DestInfoBuilder setSpool(String spool) {
+ this.destInfoSpool = spool;
+ return this;
+ }
+
+ public String getSubid() {
+ return destInfoSubId;
+ }
+
+ public DestInfoBuilder setSubid(String subid) {
+ this.destInfoSubId = subid;
+ return this;
+ }
+
+ String getLogdata() {
+ return destInfoLogData;
+ }
+
+ DestInfoBuilder setLogdata(String logdata) {
+ this.destInfoLogData = logdata;
+ return this;
+ }
+
+ public String getUrl() {
+ return destInfoUrl;
+ }
+
+ public DestInfoBuilder setUrl(String url) {
+ this.destInfoUrl = url;
+ return this;
+ }
+
+ String getAuthuser() {
+ return destInfoAuthUser;
+ }
+
+ DestInfoBuilder setAuthuser(String authuser) {
+ this.destInfoAuthUser = authuser;
+ return this;
+ }
+
+ String getAuthentication() {
+ return destInfoAuthentication;
+ }
+
+ DestInfoBuilder setAuthentication(String authentication) {
+ this.destInfoAuthentication = authentication;
+ return this;
+ }
+
+ boolean isMetaonly() {
+ return destInfoMetaOnly;
+ }
+
+ DestInfoBuilder setMetaonly(boolean metaonly) {
+ this.destInfoMetaOnly = metaonly;
+ return this;
+ }
+
+ boolean isUse100() {
+ return destInfoUse100;
+ }
+
+ DestInfoBuilder setUse100(boolean use100) {
+ this.destInfoUse100 = use100;
+ return this;
+ }
+
+ boolean isPrivilegedSubscriber() {
+ return destInfoPrivilegedSubscriber;
+ }
+
+ DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) {
+ this.destInfoPrivilegedSubscriber = privilegedSubscriber;
+ return this;
+ }
+
+ boolean isFollowRedirects() {
+ return destInfoFollowRedirects;
+ }
+
+ DestInfoBuilder setFollowRedirects(boolean followRedirects) {
+ this.destInfoFollowRedirects = followRedirects;
+ return this;
+ }
+
+ boolean isDecompress() {
+ return destInfoDecompress;
+ }
+
+ DestInfoBuilder setDecompress(boolean decompress) {
+ this.destInfoDecompress = decompress;
+ return this;
+ }
+
+ DestInfo createDestInfo() {
+ return new DestInfo(this);
+ }
+} \ No newline at end of file
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
index f7cedd22..49852680 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
@@ -26,39 +26,40 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.IOException;
-import java.util.*;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
/**
- * Determine if an IP address is from a machine
+ * Determine if an IP address is from a machine.
*/
public class IsFrom {
+
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class);
private long nextcheck;
private String[] ips;
private String fqdn;
- private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class);
/**
- * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have any effect.
+ * Create an IsFrom for the specified fully qualified domain name.
*/
- public static void setDNSCache() {
- java.security.Security.setProperty("networkaddress.cache.ttl", "10");
+ public IsFrom(String fqdn) {
+ this.fqdn = fqdn;
}
/**
- * Create an IsFrom for the specified fully qualified domain name.
+ * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have
+ * any effect.
*/
- public IsFrom(String fqdn) {
- this.fqdn = fqdn;
+ public static void setDNSCache() {
+ java.security.Security.setProperty("networkaddress.cache.ttl", "10");
}
/**
- * Check if an IP address matches. If it has been more than
- * 10 seconds since DNS was last checked for changes to the
- * IP address(es) of this FQDN, check again. Then check
- * if the specified IP address belongs to the FQDN.
+ * Check if an IP address matches. If it has been more than 10 seconds since DNS was last checked for changes to
+ * the IP address(es) of this FQDN, check again. Then check if the specified IP address belongs to the FQDN.
*/
public synchronized boolean isFrom(String ip) {
long now = System.currentTimeMillis();
@@ -71,7 +72,7 @@ public class IsFrom {
hostAddrArray.add(addr.getHostAddress());
}
} catch (UnknownHostException e) {
- logger.error("IsFrom: UnknownHostEx: " + e.toString(), e.getMessage());
+ logger.error("IsFrom: UnknownHostEx: " + e.toString(), e);
}
ips = hostAddrArray.toArray(new String[0]);
logger.info("IsFrom: DNS ENTRIES FOR FQDN " + fqdn + " : " + Arrays.toString(ips));
@@ -90,15 +91,15 @@ public class IsFrom {
return true;
}
} catch (UnknownHostException e) {
- logger.error("IsFrom: UnknownHostEx: " + e.toString(), e.getMessage());
+ logger.error("IsFrom: UnknownHostEx: " + e.toString(), e);
} catch (IOException e) {
- logger.error("IsFrom: Failed to parse IP : " + ip + " : " + e.toString(), e.getMessage());
+ logger.error("IsFrom: Failed to parse IP : " + ip + " : " + e.toString(), e);
}
return false;
}
/**
- * Return the fully qualified domain name
+ * Return the fully qualified domain name.
*/
public String toString() {
return (fqdn);
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
index 78a195b1..cf3b29a5 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
@@ -20,8 +20,11 @@
* * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* *
******************************************************************************/
+
package org.onap.dmaap.datarouter.node;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -33,17 +36,20 @@ import java.util.Arrays;
import java.util.TimerTask;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.jetbrains.annotations.NotNull;
/**
* Cleanup of old log files.
- * <p>
- * Periodically scan the log directory for log files that are older than the log file retention interval, and delete
+ *
+ * <p>Periodically scan the log directory for log files that are older than the log file retention interval, and delete
* them. In a future release, This class will also be responsible for uploading events logs to the log server to
* support the log query APIs.
*/
public class LogManager extends TimerTask {
+ private static final String EXCEPTION = "Exception";
+ private EELFLogger logger = EELFManager.getInstance().getLogger(LogManager.class);
private NodeConfigManager config;
private Matcher isnodelog;
private Matcher iseventlog;
@@ -51,7 +57,58 @@ public class LogManager extends TimerTask {
private String uploaddir;
private String logdir;
- private class Uploader extends Thread implements DeliveryQueueHelper {
+ /**
+ * Construct a log manager
+ *
+ * <p>The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary.
+ * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes).
+ */
+ public LogManager(NodeConfigManager config) {
+ this.config = config;
+ try {
+ isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");
+ iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");
+ } catch (Exception e) {
+ logger.error(EXCEPTION, e);
+ }
+ logdir = config.getLogDir();
+ uploaddir = logdir + "/.spool";
+ (new File(uploaddir)).mkdirs();
+ long now = System.currentTimeMillis();
+ long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000);
+ long when = now - now % intvl + intvl + 20000L;
+ config.getTimer().scheduleAtFixedRate(this, when - now, intvl);
+ worker = new Uploader();
+ }
+
+ /**
+ * Trigger check for expired log files and log files to upload.
+ */
+ public void run() {
+ worker.poke();
+ }
+
+ public Uploader getWorker() {
+ return worker;
+ }
+
+ class Uploader extends Thread implements DeliveryQueueHelper {
+
+ private static final String META = "/.meta";
+ private EELFLogger logger = EELFManager.getInstance().getLogger(Uploader.class);
+ private DeliveryQueue dq;
+
+ Uploader() {
+ dq = new DeliveryQueue(this,
+ new DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null)
+ .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth())
+ .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false)
+ .setFollowRedirects(false)
+ .setDecompress(false).createDestInfo());
+ setDaemon(true);
+ setName("Log Uploader");
+ start();
+ }
public long getInitFailureTimer() {
return (10000L);
@@ -86,6 +143,7 @@ public class LogManager extends TimerTask {
}
public void handleUnreachable(DestInfo destinationInfo) {
+ throw new UnsupportedOperationException();
}
public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
@@ -100,23 +158,11 @@ public class LogManager extends TimerTask {
return (null);
}
- private DeliveryQueue dq;
-
- public Uploader() {
- dq = new DeliveryQueue(this,
- new DestInfo.DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null)
- .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth())
- .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false).setFollowRedirects(false)
- .setDecompress(false).createDestInfo());
- setDaemon(true);
- setName("Log Uploader");
- start();
- }
-
private synchronized void snooze() {
try {
wait(10000);
} catch (Exception e) {
+ logger.error(EXCEPTION, e);
}
}
@@ -124,6 +170,7 @@ public class LogManager extends TimerTask {
notify();
}
+ @Override
public void run() {
while (true) {
scan();
@@ -141,69 +188,48 @@ public class LogManager extends TimerTask {
String curlog = StatusLog.getCurLogFile();
curlog = curlog.substring(curlog.lastIndexOf('/') + 1);
try {
- Writer w = new FileWriter(uploaddir + "/.meta");
- w.write("POST\tlogdata\nContent-Type\ttext/plain\n");
- w.close();
+ Writer writer = new FileWriter(uploaddir + META);
+ writer.write("POST\tlogdata\nContent-Type\ttext/plain\n");
+ writer.close();
BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued"));
lastqueued = br.readLine();
br.close();
} catch (Exception e) {
+ logger.error(EXCEPTION, e);
}
for (String fn : fns) {
if (!isnodelog.reset(fn).matches()) {
if (!iseventlog.reset(fn).matches()) {
continue;
}
- if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {
- lastqueued = fn;
- try {
- String pid = config.getPublishId();
- Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));
- Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta"));
- } catch (Exception e) {
- }
- }
+ lastqueued = setLastQueued(lastqueued, curlog, fn);
}
- File f = new File(dir, fn);
- if (f.lastModified() < threshold) {
- f.delete();
+ File file = new File(dir, fn);
+ if (file.lastModified() < threshold) {
+ file.delete();
}
}
try (Writer w = new FileWriter(uploaddir + "/.lastqueued")) {
- (new File(uploaddir + "/.meta")).delete();
+ (new File(uploaddir + META)).delete();
w.write(lastqueued + "\n");
} catch (Exception e) {
+ logger.error(EXCEPTION, e);
}
}
- }
- /**
- * Construct a log manager
- * <p>
- * The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary.
- * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes).
- */
- public LogManager(NodeConfigManager config) {
- this.config = config;
- try {
- isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");
- iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");
- } catch (Exception e) {
+ @NotNull
+ private String setLastQueued(String lastqueued, String curlog, String fn) {
+ if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {
+ lastqueued = fn;
+ try {
+ String pid = config.getPublishId();
+ Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));
+ Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + META));
+ } catch (Exception e) {
+ logger.error(EXCEPTION, e);
+ }
+ }
+ return lastqueued;
}
- logdir = config.getLogDir();
- uploaddir = logdir + "/.spool";
- (new File(uploaddir)).mkdirs();
- long now = System.currentTimeMillis();
- long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000);
- long when = now - now % intvl + intvl + 20000L;
- config.getTimer().scheduleAtFixedRate(this, when - now, intvl);
- worker = new Uploader();
- }
-
- /**
- * Trigger check for expired log files and log files to upload
- */
- public void run() {
- worker.poke();
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
index d455f2d9..127668ff 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
@@ -26,24 +26,505 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Vector;
+import org.jetbrains.annotations.NotNull;
/**
* Processed configuration for this node.
- * <p>
- * The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time
+ *
+ * <p>The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time
* configuration data is received from the provisioning server, a new NodeConfig is created and the previous one
* discarded.
*/
public class NodeConfig {
+
+ private static final String PUBLISHER_NOT_PERMITTED = "Publisher not permitted for this feed";
private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeConfig.class);
+ private HashMap<String, String> params = new HashMap<>();
+ private HashMap<String, Feed> feeds = new HashMap<>();
+ private HashMap<String, DestInfo> nodeinfo = new HashMap<>();
+ private HashMap<String, DestInfo> subinfo = new HashMap<>();
+ private HashMap<String, IsFrom> nodes = new HashMap<>();
+ private HashMap<String, ProvSubscription> provSubscriptions = new HashMap<>();
+ private String myname;
+ private String myauth;
+ private DestInfo[] alldests;
+ private int rrcntr;
+
+ /**
+ * Process the raw provisioning data to configure this node.
+ *
+ * @param pd The parsed provisioning data
+ * @param myname My name as seen by external systems
+ * @param spooldir The directory where temporary files live
+ * @param port The port number for URLs
+ * @param nodeauthkey The keying string used to generate node authentication credentials
+ */
+ public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
+ this.myname = myname;
+ for (ProvParam p : pd.getParams()) {
+ params.put(p.getName(), p.getValue());
+ }
+ ArrayList<DestInfo> destInfos = addDestInfoToNodeConfig(pd, myname, spooldir, port, nodeauthkey);
+ PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
+ HashMap<String, ArrayList<Redirection>> rdtab = addSubRedirInfoToNodeConfig(pd);
+ HashMap<String, HashMap<String, String>> pfutab = addFeedUsersToNodeConfig(pd);
+ HashMap<String, String> egrtab = addEgressRoutesToNodeConfig(pd, myname);
+ HashMap<String, ArrayList<SubnetMatcher>> pfstab = addFeedSubnetToNodeConfig(pd);
+ HashSet<String> allfeeds = addFeedsToNodeConfig(pd);
+ HashMap<String, StringBuilder> feedTargets = addSubsToNodeConfig(pd, spooldir, destInfos, pf, egrtab, allfeeds);
+ alldests = destInfos.toArray(new DestInfo[0]);
+ addFeedTargetsToNodeConfig(pd, rdtab, pfutab, pfstab, feedTargets);
+ }
+
+ @NotNull
+ private ArrayList<DestInfo> addDestInfoToNodeConfig(ProvData pd, String myname, String spooldir, int port,
+ String nodeauthkey) {
+ ArrayList<DestInfo> destInfos = new ArrayList<>();
+ myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
+ for (ProvNode pn : pd.getNodes()) {
+ String commonName = pn.getCName();
+ if (nodeinfo.get(commonName) != null) {
+ continue;
+ }
+ DestInfo di = new DestInfoBuilder().setName("n:" + commonName).setSpool(spooldir + "/n/" + commonName)
+ .setSubid(null)
+ .setLogdata("n2n-" + commonName).setUrl("https://" + commonName + ":" + port + "/internal/publish")
+ .setAuthuser(commonName).setAuthentication(myauth).setMetaonly(false).setUse100(true)
+ .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo();
+ (new File(di.getSpool())).mkdirs();
+ String auth = NodeUtils.getNodeAuthHdr(commonName, nodeauthkey);
+ destInfos.add(di);
+ nodeinfo.put(commonName, di);
+ nodes.put(auth, new IsFrom(commonName));
+ }
+ return destInfos;
+ }
+
+ @NotNull
+ private HashMap<String, ArrayList<Redirection>> addSubRedirInfoToNodeConfig(ProvData pd) {
+ HashMap<String, ArrayList<Redirection>> rdtab = new HashMap<>();
+ for (ProvForceIngress pfi : pd.getForceIngress()) {
+ ArrayList<Redirection> redirections = rdtab.get(pfi.getFeedId());
+ if (redirections == null) {
+ redirections = new ArrayList<>();
+ rdtab.put(pfi.getFeedId(), redirections);
+ }
+ Redirection redirection = new Redirection();
+ if (pfi.getSubnet() != null) {
+ redirection.snm = new SubnetMatcher(pfi.getSubnet());
+ }
+ redirection.user = pfi.getUser();
+ redirection.nodes = pfi.getNodes();
+ redirections.add(redirection);
+ }
+ return rdtab;
+ }
+
+ @NotNull
+ private HashMap<String, HashMap<String, String>> addFeedUsersToNodeConfig(ProvData pd) {
+ HashMap<String, HashMap<String, String>> pfutab = new HashMap<>();
+ for (ProvFeedUser pfu : pd.getFeedUsers()) {
+ HashMap<String, String> userInfo = pfutab.get(pfu.getFeedId());
+ if (userInfo == null) {
+ userInfo = new HashMap<>();
+ pfutab.put(pfu.getFeedId(), userInfo);
+ }
+ userInfo.put(pfu.getCredentials(), pfu.getUser());
+ }
+ return pfutab;
+ }
+
+ @NotNull
+ private HashMap<String, String> addEgressRoutesToNodeConfig(ProvData pd, String myname) {
+ HashMap<String, String> egrtab = new HashMap<>();
+ for (ProvForceEgress pfe : pd.getForceEgress()) {
+ if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
+ continue;
+ }
+ egrtab.put(pfe.getSubId(), pfe.getNode());
+ }
+ return egrtab;
+ }
+
+ @NotNull
+ private HashMap<String, ArrayList<SubnetMatcher>> addFeedSubnetToNodeConfig(ProvData pd) {
+ HashMap<String, ArrayList<SubnetMatcher>> pfstab = new HashMap<>();
+ for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
+ ArrayList<SubnetMatcher> subnetMatchers = pfstab.get(pfs.getFeedId());
+ if (subnetMatchers == null) {
+ subnetMatchers = new ArrayList<>();
+ pfstab.put(pfs.getFeedId(), subnetMatchers);
+ }
+ subnetMatchers.add(new SubnetMatcher(pfs.getCidr()));
+ }
+ return pfstab;
+ }
+
+ @NotNull
+ private HashSet<String> addFeedsToNodeConfig(ProvData pd) {
+ HashSet<String> allfeeds = new HashSet<>();
+ for (ProvFeed pfx : pd.getFeeds()) {
+ if (pfx.getStatus() == null) {
+ allfeeds.add(pfx.getId());
+ }
+ }
+ return allfeeds;
+ }
+
+ @NotNull
+ private HashMap<String, StringBuilder> addSubsToNodeConfig(ProvData pd, String spooldir,
+ ArrayList<DestInfo> destInfos, PathFinder pf, HashMap<String, String> egrtab, HashSet<String> allfeeds) {
+ HashMap<String, StringBuilder> feedTargets = new HashMap<>();
+ for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+ String subId = provSubscription.getSubId();
+ String feedId = provSubscription.getFeedId();
+ if (isFeedOrSubKnown(allfeeds, subId, feedId)) {
+ continue;
+ }
+ int sididx = 999;
+ try {
+ sididx = Integer.parseInt(subId);
+ sididx -= sididx % 100;
+ } catch (Exception e) {
+ logger.error("NODE0517 Exception NodeConfig: " + e);
+ }
+ String subscriptionDirectory = sididx + "/" + subId;
+ DestInfo destinationInfo = new DestInfo("s:" + subId,
+ spooldir + "/s/" + subscriptionDirectory, provSubscription);
+ (new File(destinationInfo.getSpool())).mkdirs();
+ destInfos.add(destinationInfo);
+ provSubscriptions.put(subId, provSubscription);
+ subinfo.put(subId, destinationInfo);
+ String egr = egrtab.get(subId);
+ if (egr != null) {
+ subId = pf.getPath(egr) + subId;
+ }
+ StringBuilder sb = feedTargets.get(feedId);
+ if (sb == null) {
+ sb = new StringBuilder();
+ feedTargets.put(feedId, sb);
+ }
+ sb.append(' ').append(subId);
+ }
+ return feedTargets;
+ }
+
+ private void addFeedTargetsToNodeConfig(ProvData pd, HashMap<String, ArrayList<Redirection>> rdtab,
+ HashMap<String, HashMap<String, String>> pfutab, HashMap<String, ArrayList<SubnetMatcher>> pfstab,
+ HashMap<String, StringBuilder> feedTargets) {
+ for (ProvFeed pfx : pd.getFeeds()) {
+ String fid = pfx.getId();
+ Feed feed = feeds.get(fid);
+ if (feed != null) {
+ continue;
+ }
+ feed = new Feed();
+ feeds.put(fid, feed);
+ feed.createdDate = pfx.getCreatedDate();
+ feed.loginfo = pfx.getLogData();
+ feed.status = pfx.getStatus();
+ /*
+ * AAF changes: TDP EPIC US# 307413
+ * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
+ */
+ feed.aafInstance = pfx.getAafInstance();
+ ArrayList<SubnetMatcher> v1 = pfstab.get(fid);
+ if (v1 == null) {
+ feed.subnets = new SubnetMatcher[0];
+ } else {
+ feed.subnets = v1.toArray(new SubnetMatcher[0]);
+ }
+ HashMap<String, String> h1 = pfutab.get(fid);
+ if (h1 == null) {
+ h1 = new HashMap();
+ }
+ feed.authusers = h1;
+ ArrayList<Redirection> v2 = rdtab.get(fid);
+ if (v2 == null) {
+ feed.redirections = new Redirection[0];
+ } else {
+ feed.redirections = v2.toArray(new Redirection[0]);
+ }
+ StringBuilder sb = feedTargets.get(fid);
+ if (sb == null) {
+ feed.targets = new Target[0];
+ } else {
+ feed.targets = parseRouting(sb.toString());
+ }
+ }
+ }
+
+ /**
+ * Parse a target string into an array of targets.
+ *
+ * @param routing Target string
+ * @return Array of targets.
+ */
+ public Target[] parseRouting(String routing) {
+ routing = routing.trim();
+ if ("".equals(routing)) {
+ return (new Target[0]);
+ }
+ String[] routingTable = routing.split("\\s+");
+ HashMap<String, Target> tmap = new HashMap<>();
+ HashSet<String> subset = new HashSet<>();
+ ArrayList<Target> targets = new ArrayList<>();
+ for (int i = 0; i < routingTable.length; i++) {
+ String target = routingTable[i];
+ int index = target.indexOf('/');
+ if (index == -1) {
+ addTarget(subset, targets, target);
+ } else {
+ addTargetWithRouting(tmap, targets, target, index);
+ }
+ }
+ return (targets.toArray(new Target[0]));
+ }
+
+ /**
+ * Check whether this is a valid node-to-node transfer.
+ *
+ * @param credentials Credentials offered by the supposed node
+ * @param ip IP address the request came from
+ */
+ public boolean isAnotherNode(String credentials, String ip) {
+ IsFrom node = nodes.get(credentials);
+ return (node != null && node.isFrom(ip));
+ }
+
+ /**
+ * Check whether publication is allowed.
+ *
+ * @param feedid The ID of the feed being requested.
+ * @param credentials The offered credentials
+ * @param ip The requesting IP address
+ */
+ public String isPublishPermitted(String feedid, String credentials, String ip) {
+ Feed feed = feeds.get(feedid);
+ String nf = "Feed does not exist";
+ if (feed != null) {
+ nf = feed.status;
+ }
+ if (nf != null) {
+ return (nf);
+ }
+ String user = feed.authusers.get(credentials);
+ if (user == null) {
+ return (PUBLISHER_NOT_PERMITTED);
+ }
+ if (feed.subnets.length == 0) {
+ return (null);
+ }
+ byte[] addr = NodeUtils.getInetAddress(ip);
+ for (SubnetMatcher snm : feed.subnets) {
+ if (snm.matches(addr)) {
+ return (null);
+ }
+ }
+ return (PUBLISHER_NOT_PERMITTED);
+ }
+
+ /**
+ * Check whether publication is allowed for AAF Feed.
+ *
+ * @param feedid The ID of the feed being requested.
+ * @param ip The requesting IP address
+ */
+ public String isPublishPermitted(String feedid, String ip) {
+ Feed feed = feeds.get(feedid);
+ String nf = "Feed does not exist";
+ if (feed != null) {
+ nf = feed.status;
+ }
+ if (nf != null) {
+ return nf;
+ }
+ if (feed.subnets.length == 0) {
+ return null;
+ }
+ byte[] addr = NodeUtils.getInetAddress(ip);
+ for (SubnetMatcher snm : feed.subnets) {
+ if (snm.matches(addr)) {
+ return null;
+ }
+ }
+ return PUBLISHER_NOT_PERMITTED;
+ }
+
/**
- * Raw configuration entry for a data router node
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested.
+ */
+ public boolean isDeletePermitted(String subId) {
+ ProvSubscription provSubscription = provSubscriptions.get(subId);
+ return provSubscription.isPrivilegedSubscriber();
+ }
+
+ /**
+ * Get authenticated user.
+ */
+ public String getAuthUser(String feedid, String credentials) {
+ return (feeds.get(feedid).authusers.get(credentials));
+ }
+
+ /**
+ * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID.
+ *
+ * @param feedid The ID of the feed specified
+ */
+ public String getAafInstance(String feedid) {
+ Feed feed = feeds.get(feedid);
+ return feed.aafInstance;
+ }
+
+ /**
+ * Check if the request should be redirected to a different ingress node.
+ */
+ public String getIngressNode(String feedid, String user, String ip) {
+ Feed feed = feeds.get(feedid);
+ if (feed.redirections.length == 0) {
+ return (null);
+ }
+ byte[] addr = NodeUtils.getInetAddress(ip);
+ for (Redirection r : feed.redirections) {
+ if ((r.user != null && !user.equals(r.user)) || (r.snm != null && !r.snm.matches(addr))) {
+ continue;
+ }
+ for (String n : r.nodes) {
+ if (myname.equals(n)) {
+ return (null);
+ }
+ }
+ if (r.nodes.length == 0) {
+ return (null);
+ }
+ return (r.nodes[rrcntr++ % r.nodes.length]);
+ }
+ return (null);
+ }
+
+ /**
+ * Get a provisioned configuration parameter.
+ */
+ public String getProvParam(String name) {
+ return (params.get(name));
+ }
+
+ /**
+ * Get all the DestInfos.
+ */
+ public DestInfo[] getAllDests() {
+ return (alldests);
+ }
+
+ /**
+ * Get the targets for a feed.
+ *
+ * @param feedid The feed ID
+ * @return The targets this feed should be delivered to
+ */
+ public Target[] getTargets(String feedid) {
+ if (feedid == null) {
+ return (new Target[0]);
+ }
+ Feed feed = feeds.get(feedid);
+ if (feed == null) {
+ return (new Target[0]);
+ }
+ return (feed.targets);
+ }
+
+ /**
+ * Get the creation date for a feed.
+ *
+ * @param feedid The feed ID
+ * @return the timestamp of creation date of feed id passed
+ */
+ public String getCreatedDate(String feedid) {
+ Feed feed = feeds.get(feedid);
+ return (feed.createdDate);
+ }
+
+ /**
+ * Get the feed ID for a subscription.
+ *
+ * @param subid The subscription ID
+ * @return The feed ID
+ */
+ public String getFeedId(String subid) {
+ DestInfo di = subinfo.get(subid);
+ if (di == null) {
+ return (null);
+ }
+ return (di.getLogData());
+ }
+
+ /**
+ * Get the spool directory for a subscription.
+ *
+ * @param subid The subscription ID
+ * @return The spool directory
+ */
+ public String getSpoolDir(String subid) {
+ DestInfo di = subinfo.get(subid);
+ if (di == null) {
+ return (null);
+ }
+ return (di.getSpool());
+ }
+
+ /**
+ * Get the Authorization value this node uses.
+ *
+ * @return The Authorization header value for this node
+ */
+ public String getMyAuth() {
+ return (myauth);
+ }
+
+ private boolean isFeedOrSubKnown(HashSet<String> allfeeds, String subId, String feedId) {
+ return !allfeeds.contains(feedId) || subinfo.get(subId) != null;
+ }
+
+ private void addTargetWithRouting(HashMap<String, Target> tmap, ArrayList<Target> targets, String target,
+ int index) {
+ String node = target.substring(0, index);
+ String rtg = target.substring(index + 1);
+ DestInfo di = nodeinfo.get(node);
+ if (di == null) {
+ targets.add(new Target(null, target));
+ } else {
+ Target tt = tmap.get(node);
+ if (tt == null) {
+ tt = new Target(di, rtg);
+ tmap.put(node, tt);
+ targets.add(tt);
+ } else {
+ tt.addRouting(rtg);
+ }
+ }
+ }
+
+ private void addTarget(HashSet<String> subset, ArrayList<Target> targets, String target) {
+ DestInfo destInfo = subinfo.get(target);
+ if (destInfo == null) {
+ targets.add(new Target(null, target));
+ } else {
+ if (!subset.contains(target)) {
+ subset.add(target);
+ targets.add(new Target(destInfo, null));
+ }
+ }
+ }
+
+ /**
+ * Raw configuration entry for a data router node.
*/
public static class ProvNode {
@@ -59,7 +540,7 @@ public class NodeConfig {
}
/**
- * Get the cname of the node
+ * Get the cname of the node.
*/
public String getCName() {
return (cname);
@@ -67,7 +548,7 @@ public class NodeConfig {
}
/**
- * Raw configuration entry for a provisioning parameter
+ * Raw configuration entry for a provisioning parameter.
*/
public static class ProvParam {
@@ -121,7 +602,7 @@ public class NodeConfig {
* @param id The feed ID of the entry.
* @param logdata String for log entries about the entry.
* @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or
- * null if it is valid.
+ * null if it is valid.
*/
public ProvFeed(String id, String logdata, String status, String createdDate, String aafInstance) {
this.id = id;
@@ -134,9 +615,8 @@ public class NodeConfig {
/**
* Get the created date of the data feed.
*/
- public String getCreatedDate()
- {
- return(createdDate);
+ public String getCreatedDate() {
+ return (createdDate);
}
/**
@@ -178,7 +658,7 @@ public class NodeConfig {
private String credentials;
/**
- * Construct a feed user configuration entry
+ * Construct a feed user configuration entry.
*
* @param feedid The feed id.
* @param user The user that will publish to the feed.
@@ -213,7 +693,7 @@ public class NodeConfig {
}
/**
- * Raw configuration entry for a feed subnet
+ * Raw configuration entry for a feed subnet.
*/
public static class ProvFeedSubnet {
@@ -221,7 +701,7 @@ public class NodeConfig {
private String cidr;
/**
- * Construct a feed subnet configuration entry
+ * Construct a feed subnet configuration entry.
*
* @param feedid The feed ID
* @param cidr The CIDR allowed to publish to the feed.
@@ -247,7 +727,7 @@ public class NodeConfig {
}
/**
- * Raw configuration entry for a subscription
+ * Raw configuration entry for a subscription.
*/
public static class ProvSubscription {
@@ -263,21 +743,23 @@ public class NodeConfig {
private boolean decompress;
/**
- * Construct a subscription configuration entry
+ * Construct a subscription configuration entry.
*
* @param subid The subscription ID
* @param feedid The feed ID
* @param url The base delivery URL (not including the fileid)
* @param authuser The user in the credentials used to deliver
* @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the
- * Authorization header.
+ * Authorization header.
* @param metaonly Is this a meta data only subscription?
* @param use100 Should we send Expect: 100-continue?
* @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
* @param followRedirect Is follow redirect of destination enabled?
* @param decompress To see if they want their information compressed or decompressed
*/
- public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect, boolean decompress) {
+ public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
+ boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect,
+ boolean decompress) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
@@ -291,79 +773,79 @@ public class NodeConfig {
}
/**
- * Get the subscription ID
+ * Get the subscription ID.
*/
public String getSubId() {
return (subid);
}
/**
- * Get the feed ID
+ * Get the feed ID.
*/
public String getFeedId() {
return (feedid);
}
/**
- * Get the delivery URL
+ * Get the delivery URL.
*/
public String getURL() {
return (url);
}
/**
- * Get the user
+ * Get the user.
*/
public String getAuthUser() {
return (authuser);
}
/**
- * Get the delivery credentials
+ * Get the delivery credentials.
*/
public String getCredentials() {
return (credentials);
}
/**
- * Is this a meta data only subscription?
+ * Is this a meta data only subscription.
*/
public boolean isMetaDataOnly() {
return (metaonly);
}
/**
- * Should we send Expect: 100-continue?
+ * Should we send Expect: 100-continue.
*/
public boolean isUsing100() {
return (use100);
}
/**
- * Can we wait to receive a delete file call before deleting file
+ * Can we wait to receive a delete file call before deleting file.
*/
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
/**
- * Should i decompress the file before sending it on
- */
+ * Should I decompress the file before sending it on.
+ */
public boolean isDecompress() {
return (decompress);
}
/**
- * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706
- * Get the followRedirect of this destination
+ * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706 Get the followRedirect of this
+ * destination.
*/
boolean getFollowRedirect() {
- return(followRedirect);
+ return (followRedirect);
}
}
/**
- * Raw configuration entry for controlled ingress to the data router node
+ * Raw configuration entry for controlled ingress to the data router node.
*/
public static class ProvForceIngress {
@@ -373,11 +855,11 @@ public class NodeConfig {
private String[] nodes;
/**
- * Construct a forced ingress configuration entry
+ * Construct a forced ingress configuration entry.
*
* @param feedid The feed ID that this entry applies to
* @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all
- * publisher IP addresses
+ * publisher IP addresses
* @param user The publishing user this entry applies to or "" if it applies to all publishing users.
* @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to.
*/
@@ -386,7 +868,7 @@ public class NodeConfig {
this.subnet = subnet;
this.user = user;
//Sonar fix
- if(nodes == null) {
+ if (nodes == null) {
this.nodes = new String[0];
} else {
this.nodes = Arrays.copyOf(nodes, nodes.length);
@@ -394,28 +876,28 @@ public class NodeConfig {
}
/**
- * Get the feed ID
+ * Get the feed ID.
*/
public String getFeedId() {
return (feedid);
}
/**
- * Get the subnet
+ * Get the subnet.
*/
public String getSubnet() {
return (subnet);
}
/**
- * Get the user
+ * Get the user.
*/
public String getUser() {
return (user);
}
/**
- * Get the node
+ * Get the node.
*/
public String[] getNodes() {
return (nodes);
@@ -423,7 +905,7 @@ public class NodeConfig {
}
/**
- * Raw configuration entry for controlled egress from the data router
+ * Raw configuration entry for controlled egress from the data router.
*/
public static class ProvForceEgress {
@@ -431,7 +913,7 @@ public class NodeConfig {
private String node;
/**
- * Construct a forced egress configuration entry
+ * Construct a forced egress configuration entry.
*
* @param subid The subscription ID the subscription with forced egress
* @param node The node handling deliveries for this subscription
@@ -442,14 +924,14 @@ public class NodeConfig {
}
/**
- * Get the subscription ID
+ * Get the subscription ID.
*/
public String getSubId() {
return (subid);
}
/**
- * Get the node
+ * Get the node.
*/
public String getNode() {
return (node);
@@ -457,7 +939,7 @@ public class NodeConfig {
}
/**
- * Raw configuration entry for routing within the data router network
+ * Raw configuration entry for routing within the data router network.
*/
public static class ProvHop {
@@ -466,14 +948,7 @@ public class NodeConfig {
private String via;
/**
- * A human readable description of this entry
- */
- public String toString() {
- return ("Hop " + from + "->" + to + " via " + via);
- }
-
- /**
- * Construct a hop entry
+ * Construct a hop entry.
*
* @param from The FQDN of the node with the data to be delivered
* @param to The FQDN of the node that will deliver to the subscriber
@@ -486,21 +961,28 @@ public class NodeConfig {
}
/**
- * Get the from node
+ * A human readable description of this entry.
+ */
+ public String toString() {
+ return ("Hop " + from + "->" + to + " via " + via);
+ }
+
+ /**
+ * Get the from node.
*/
public String getFrom() {
return (from);
}
/**
- * Get the to node
+ * Get the to node.
*/
public String getTo() {
return (to);
}
/**
- * Get the next intermediate node
+ * Get the next intermediate node.
*/
public String getVia() {
return (via);
@@ -519,431 +1001,10 @@ public class NodeConfig {
String loginfo;
String status;
SubnetMatcher[] subnets;
- Hashtable<String, String> authusers = new Hashtable<String, String>();
+ HashMap<String, String> authusers = new HashMap<>();
Redirection[] redirections;
Target[] targets;
String createdDate;
String aafInstance;
}
-
- private Hashtable<String, String> params = new Hashtable<>();
- private Hashtable<String, Feed> feeds = new Hashtable<>();
- private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
- private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
- private Hashtable<String, IsFrom> nodes = new Hashtable<>();
- private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
- private String myname;
- private String myauth;
- private DestInfo[] alldests;
- private int rrcntr;
-
- /**
- * Process the raw provisioning data to configure this node
- *
- * @param pd The parsed provisioning data
- * @param myname My name as seen by external systems
- * @param spooldir The directory where temporary files live
- * @param port The port number for URLs
- * @param nodeauthkey The keying string used to generate node authentication credentials
- */
- public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
- this.myname = myname;
- for (ProvParam p : pd.getParams()) {
- params.put(p.getName(), p.getValue());
- }
- Vector<DestInfo> destInfos = new Vector<>();
- myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
- for (ProvNode pn : pd.getNodes()) {
- String cName = pn.getCName();
- if (nodeinfo.get(cName) != null) {
- continue;
- }
- String auth = NodeUtils.getNodeAuthHdr(cName, nodeauthkey);
- DestInfo di = new DestInfo.DestInfoBuilder().setName("n:" + cName).setSpool(spooldir + "/n/" + cName).setSubid(null)
- .setLogdata("n2n-" + cName).setUrl("https://" + cName + ":" + port + "/internal/publish")
- .setAuthuser(cName).setAuthentication(myauth).setMetaonly(false).setUse100(true)
- .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo();
- (new File(di.getSpool())).mkdirs();
- destInfos.add(di);
- nodeinfo.put(cName, di);
- nodes.put(auth, new IsFrom(cName));
- }
- PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
- Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<>();
- for (ProvForceIngress pfi : pd.getForceIngress()) {
- Vector<Redirection> v = rdtab.get(pfi.getFeedId());
- if (v == null) {
- v = new Vector<>();
- rdtab.put(pfi.getFeedId(), v);
- }
- Redirection r = new Redirection();
- if (pfi.getSubnet() != null) {
- r.snm = new SubnetMatcher(pfi.getSubnet());
- }
- r.user = pfi.getUser();
- r.nodes = pfi.getNodes();
- v.add(r);
- }
- Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<>();
- for (ProvFeedUser pfu : pd.getFeedUsers()) {
- Hashtable<String, String> t = pfutab.get(pfu.getFeedId());
- if (t == null) {
- t = new Hashtable<>();
- pfutab.put(pfu.getFeedId(), t);
- }
- t.put(pfu.getCredentials(), pfu.getUser());
- }
- Hashtable<String, String> egrtab = new Hashtable<>();
- for (ProvForceEgress pfe : pd.getForceEgress()) {
- if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
- continue;
- }
- egrtab.put(pfe.getSubId(), pfe.getNode());
- }
- Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
- for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
- Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
- if (v == null) {
- v = new Vector<>();
- pfstab.put(pfs.getFeedId(), v);
- }
- v.add(new SubnetMatcher(pfs.getCidr()));
- }
- Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
- HashSet<String> allfeeds = new HashSet<>();
- for (ProvFeed pfx : pd.getFeeds()) {
- if (pfx.getStatus() == null) {
- allfeeds.add(pfx.getId());
- }
- }
- for (ProvSubscription provSubscription : pd.getSubscriptions()) {
- String subId = provSubscription.getSubId();
- String feedId = provSubscription.getFeedId();
- if (!allfeeds.contains(feedId)) {
- continue;
- }
- if (subinfo.get(subId) != null) {
- continue;
- }
- int sididx = 999;
- try {
- sididx = Integer.parseInt(subId);
- sididx -= sididx % 100;
- } catch (Exception e) {
- logger.error("NODE0517 Exception NodeConfig: "+e);
- }
- String subscriptionDirectory = sididx + "/" + subId;
- DestInfo destinationInfo = new DestInfo("s:" + subId,
- spooldir + "/s/" + subscriptionDirectory, provSubscription);
- (new File(destinationInfo.getSpool())).mkdirs();
- destInfos.add(destinationInfo);
- provSubscriptions.put(subId, provSubscription);
- subinfo.put(subId, destinationInfo);
- String egr = egrtab.get(subId);
- if (egr != null) {
- subId = pf.getPath(egr) + subId;
- }
- StringBuffer sb = feedTargets.get(feedId);
- if (sb == null) {
- sb = new StringBuffer();
- feedTargets.put(feedId, sb);
- }
- sb.append(' ').append(subId);
- }
- alldests = destInfos.toArray(new DestInfo[0]);
- for (ProvFeed pfx : pd.getFeeds()) {
- String fid = pfx.getId();
- Feed f = feeds.get(fid);
- if (f != null) {
- continue;
- }
- f = new Feed();
- feeds.put(fid, f);
- f.createdDate = pfx.getCreatedDate();
- f.loginfo = pfx.getLogData();
- f.status = pfx.getStatus();
- /*
- * AAF changes: TDP EPIC US# 307413
- * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
- */
- f.aafInstance = pfx.getAafInstance();
- Vector<SubnetMatcher> v1 = pfstab.get(fid);
- if (v1 == null) {
- f.subnets = new SubnetMatcher[0];
- } else {
- f.subnets = v1.toArray(new SubnetMatcher[0]);
- }
- Hashtable<String, String> h1 = pfutab.get(fid);
- if (h1 == null) {
- h1 = new Hashtable<String, String>();
- }
- f.authusers = h1;
- Vector<Redirection> v2 = rdtab.get(fid);
- if (v2 == null) {
- f.redirections = new Redirection[0];
- } else {
- f.redirections = v2.toArray(new Redirection[0]);
- }
- StringBuffer sb = feedTargets.get(fid);
- if (sb == null) {
- f.targets = new Target[0];
- } else {
- f.targets = parseRouting(sb.toString());
- }
- }
- }
-
- /**
- * Parse a target string into an array of targets
- *
- * @param routing Target string
- * @return Array of targets.
- */
- public Target[] parseRouting(String routing) {
- routing = routing.trim();
- if ("".equals(routing)) {
- return (new Target[0]);
- }
- String[] xx = routing.split("\\s+");
- Hashtable<String, Target> tmap = new Hashtable<String, Target>();
- HashSet<String> subset = new HashSet<String>();
- Vector<Target> tv = new Vector<Target>();
- Target[] ret = new Target[xx.length];
- for (int i = 0; i < xx.length; i++) {
- String t = xx[i];
- int j = t.indexOf('/');
- if (j == -1) {
- DestInfo di = subinfo.get(t);
- if (di == null) {
- tv.add(new Target(null, t));
- } else {
- if (!subset.contains(t)) {
- subset.add(t);
- tv.add(new Target(di, null));
- }
- }
- } else {
- String node = t.substring(0, j);
- String rtg = t.substring(j + 1);
- DestInfo di = nodeinfo.get(node);
- if (di == null) {
- tv.add(new Target(null, t));
- } else {
- Target tt = tmap.get(node);
- if (tt == null) {
- tt = new Target(di, rtg);
- tmap.put(node, tt);
- tv.add(tt);
- } else {
- tt.addRouting(rtg);
- }
- }
- }
- }
- return (tv.toArray(new Target[0]));
- }
-
- /**
- * Check whether this is a valid node-to-node transfer
- *
- * @param credentials Credentials offered by the supposed node
- * @param ip IP address the request came from
- */
- public boolean isAnotherNode(String credentials, String ip) {
- IsFrom n = nodes.get(credentials);
- return (n != null && n.isFrom(ip));
- }
-
- /**
- * Check whether publication is allowed.
- *
- * @param feedid The ID of the feed being requested.
- * @param credentials The offered credentials
- * @param ip The requesting IP address
- */
- public String isPublishPermitted(String feedid, String credentials, String ip) {
- Feed f = feeds.get(feedid);
- String nf = "Feed does not exist";
- if (f != null) {
- nf = f.status;
- }
- if (nf != null) {
- return (nf);
- }
- String user = f.authusers.get(credentials);
- if (user == null) {
- return ("Publisher not permitted for this feed");
- }
- if (f.subnets.length == 0) {
- return (null);
- }
- byte[] addr = NodeUtils.getInetAddress(ip);
- for (SubnetMatcher snm : f.subnets) {
- if (snm.matches(addr)) {
- return (null);
- }
- }
- return ("Publisher not permitted for this feed");
- }
-
- /**
- * Check whether delete file is allowed.
- *
- * @param subId The ID of the subscription being requested.
- */
- public boolean isDeletePermitted(String subId) {
- ProvSubscription provSubscription = provSubscriptions.get(subId);
- return provSubscription.isPrivilegedSubscriber();
- }
-
- /**
- * Check whether publication is allowed for AAF Feed.
- * @param feedid The ID of the feed being requested.
- * @param ip The requesting IP address
- */
- public String isPublishPermitted(String feedid, String ip) {
- Feed f = feeds.get(feedid);
- String nf = "Feed does not exist";
- if (f != null) {
- nf = f.status;
- }
- if (nf != null) {
- return(nf);
- }
- if (f.subnets.length == 0) {
- return(null);
- }
- byte[] addr = NodeUtils.getInetAddress(ip);
- for (SubnetMatcher snm: f.subnets) {
- if (snm.matches(addr)) {
- return(null);
- }
- }
- return("Publisher not permitted for this feed");
- }
-
- /**
- * Get authenticated user
- */
- public String getAuthUser(String feedid, String credentials) {
- return (feeds.get(feedid).authusers.get(credentials));
- }
-
- /**
- * AAF changes: TDP EPIC US# 307413
- * Check AAF_instance for feed ID
- * @param feedid The ID of the feed specified
- */
- public String getAafInstance(String feedid) {
- Feed f = feeds.get(feedid);
- return f.aafInstance;
- }
-
- /**
- * Check if the request should be redirected to a different ingress node
- */
- public String getIngressNode(String feedid, String user, String ip) {
- Feed f = feeds.get(feedid);
- if (f.redirections.length == 0) {
- return (null);
- }
- byte[] addr = NodeUtils.getInetAddress(ip);
- for (Redirection r : f.redirections) {
- if (r.user != null && !user.equals(r.user)) {
- continue;
- }
- if (r.snm != null && !r.snm.matches(addr)) {
- continue;
- }
- for (String n : r.nodes) {
- if (myname.equals(n)) {
- return (null);
- }
- }
- if (r.nodes.length == 0) {
- return (null);
- }
- return (r.nodes[rrcntr++ % r.nodes.length]);
- }
- return (null);
- }
-
- /**
- * Get a provisioned configuration parameter
- */
- public String getProvParam(String name) {
- return (params.get(name));
- }
-
- /**
- * Get all the DestInfos
- */
- public DestInfo[] getAllDests() {
- return (alldests);
- }
-
- /**
- * Get the targets for a feed
- *
- * @param feedid The feed ID
- * @return The targets this feed should be delivered to
- */
- public Target[] getTargets(String feedid) {
- if (feedid == null) {
- return (new Target[0]);
- }
- Feed f = feeds.get(feedid);
- if (f == null) {
- return (new Target[0]);
- }
- return (f.targets);
- }
-
- /**
- * Get the creation date for a feed
- * @param feedid The feed ID
- * @return the timestamp of creation date of feed id passed
- */
- public String getCreatedDate(String feedid) {
- Feed f = feeds.get(feedid);
- return(f.createdDate);
- }
-
- /**
- * Get the feed ID for a subscription
- *
- * @param subid The subscription ID
- * @return The feed ID
- */
- public String getFeedId(String subid) {
- DestInfo di = subinfo.get(subid);
- if (di == null) {
- return (null);
- }
- return (di.getLogData());
- }
-
- /**
- * Get the spool directory for a subscription
- *
- * @param subid The subscription ID
- * @return The spool directory
- */
- public String getSpoolDir(String subid) {
- DestInfo di = subinfo.get(subid);
- if (di == null) {
- return (null);
- }
- return (di.getSpool());
- }
-
- /**
- * Get the Authorization value this node uses
- *
- * @return The Authorization header value for this node
- */
- public String getMyAuth() {
- return (myauth);
- }
-
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
index 16099e62..786befce 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
@@ -26,8 +26,6 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
@@ -35,22 +33,24 @@ import java.io.Reader;
import java.net.URL;
import java.util.Properties;
import java.util.Timer;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
/**
* Maintain the configuration of a Data Router node
- * <p>
- * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention
+ *
+ * <p>The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention
* subsystems to access configuration information.
- * <p>
- * There are two basic sets of configuration data. The static local configuration data, stored in a local configuration
- * file (created as part of installation by SWM), and the dynamic global configuration data fetched from the data router
- * provisioning server.
+ *
+ * <p>There are two basic sets of configuration data. The static local configuration data, stored in a local
+ * configuration file (created as part of installation by SWM), and the dynamic global configuration data fetched from
+ * the data router provisioning server.
*/
public class NodeConfigManager implements DeliveryQueueHelper {
- private static EELFLogger eelfLogger = EELFManager.getInstance()
- .getLogger(NodeConfigManager.class);
+ private static final String CHANGE_ME = "changeme";
+ private static final String NODE_CONFIG_MANAGER = "NodeConfigManager";
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class);
private static NodeConfigManager base = new NodeConfigManager();
private Timer timer = new Timer("Node Configuration Timer", true);
@@ -94,7 +94,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
private String eventlogsuffix;
private String eventloginterval;
private boolean followredirects;
- private String [] enabledprotocols;
+ private String[] enabledprotocols;
private String aafType;
private String aafInstance;
private String aafAction;
@@ -103,14 +103,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
/**
- * Get the default node configuration manager
- */
- public static NodeConfigManager getInstance() {
- return base;
- }
-
- /**
- * Initialize the configuration of a Data Router node
+ * Initialize the configuration of a Data Router node.
*/
private NodeConfigManager() {
@@ -120,8 +113,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
drNodeProperties.load(new FileInputStream(System
.getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties")));
} catch (Exception e) {
- NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
- eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, System.getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"));
+ NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
+ eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e,
+ System.getProperty("org.onap.dmaap.datarouter.node.properties",
+ "/opt/app/datartr/etc/node.properties"));
}
provurl = drNodeProperties.getProperty("ProvisioningURL", "https://dmaap-dr-prov:8443/internal/prov");
/*
@@ -143,8 +138,8 @@ public class NodeConfigManager implements DeliveryQueueHelper {
try {
provhost = (new URL(provurl)).getHost();
} catch (Exception e) {
- NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
- eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl);
+ NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
+ eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl);
System.exit(1);
}
eelfLogger.info("NODE0303 Provisioning server is " + provhost);
@@ -153,8 +148,6 @@ public class NodeConfigManager implements DeliveryQueueHelper {
gfport = Integer.parseInt(drNodeProperties.getProperty("IntHttpPort", "8080"));
svcport = Integer.parseInt(drNodeProperties.getProperty("IntHttpsPort", "8443"));
port = Integer.parseInt(drNodeProperties.getProperty("ExtHttpsPort", "443"));
- long minpfinterval = Long.parseLong(drNodeProperties.getProperty("MinProvFetchInterval", "10000"));
- long minrsinterval = Long.parseLong(drNodeProperties.getProperty("MinRedirSaveInterval", "10000"));
spooldir = drNodeProperties.getProperty("SpoolDir", "spool");
File fdir = new File(spooldir + "/f");
fdir.mkdirs();
@@ -168,14 +161,14 @@ public class NodeConfigManager implements DeliveryQueueHelper {
logretention = Long.parseLong(drNodeProperties.getProperty("LogRetention", "30")) * 86400000L;
eventlogprefix = logdir + "/events";
eventlogsuffix = ".log";
- String redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat");
+ redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat");
kstype = drNodeProperties.getProperty("KeyStoreType", "jks");
ksfile = drNodeProperties.getProperty("KeyStoreFile", "etc/keystore");
- kspass = drNodeProperties.getProperty("KeyStorePassword", "changeme");
- kpass = drNodeProperties.getProperty("KeyPassword", "changeme");
+ kspass = drNodeProperties.getProperty("KeyStorePassword", CHANGE_ME);
+ kpass = drNodeProperties.getProperty("KeyPassword", CHANGE_ME);
tstype = drNodeProperties.getProperty("TrustStoreType", "jks");
tsfile = drNodeProperties.getProperty("TrustStoreFile");
- tspass = drNodeProperties.getProperty("TrustStorePassword", "changeme");
+ tspass = drNodeProperties.getProperty("TrustStorePassword", CHANGE_ME);
if (tsfile != null && tsfile.length() > 0) {
System.setProperty("javax.net.ssl.trustStoreType", tstype);
System.setProperty("javax.net.ssl.trustStore", tsfile);
@@ -185,13 +178,15 @@ public class NodeConfigManager implements DeliveryQueueHelper {
quiesce = new File(drNodeProperties.getProperty("QuiesceFile", "etc/SHUTDOWN"));
myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);
if (myname == null) {
- NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
+ NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);
eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);
System.exit(1);
}
eelfLogger.info("NODE0304 My certificate says my name is " + myname);
pid = new PublishId(myname);
+ long minrsinterval = Long.parseLong(drNodeProperties.getProperty("MinRedirSaveInterval", "10000"));
+ long minpfinterval = Long.parseLong(drNodeProperties.getProperty("MinProvFetchInterval", "10000"));
rdmgr = new RedirManager(redirfile, minrsinterval, timer);
pfetcher = new RateLimitedOperation(minpfinterval, timer) {
public void run() {
@@ -202,6 +197,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
pfetcher.request();
}
+ /**
+ * Get the default node configuration manager.
+ */
+ public static NodeConfigManager getInstance() {
+ return base;
+ }
+
private void localconfig() {
followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
@@ -218,42 +220,53 @@ public class NodeConfigManager implements DeliveryQueueHelper {
try {
initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
+ eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
}
try {
- waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+ waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL"))
+ * 1000);
} catch (Exception e) {
+ eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
}
try {
maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
+ eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
}
try {
expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
} catch (Exception e) {
+ eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e);
}
try {
failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
} catch (Exception e) {
+ eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e);
}
try {
deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
} catch (Exception e) {
+ eelfLogger.trace("Error parsing DELIVERY_THREADS", e);
}
try {
fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
} catch (Exception e) {
+ eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e);
}
try {
fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
} catch (Exception e) {
+ eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e);
}
try {
fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
} catch (Exception e) {
+ eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e);
}
try {
fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
} catch (Exception e) {
+ eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e);
}
if (fdpstart < 0.01) {
fdpstart = 0.01;
@@ -272,26 +285,30 @@ public class NodeConfigManager implements DeliveryQueueHelper {
private void fetchconfig() {
try {
eelfLogger.info("NodeConfigMan.fetchConfig: provurl:: " + provurl);
- Reader r = new InputStreamReader((new URL(provurl)).openStream());
- config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);
+ Reader reader = new InputStreamReader((new URL(provurl)).openStream());
+ config = new NodeConfig(new ProvData(reader), myname, spooldir, port, nak);
localconfig();
configtasks.startRun();
- Runnable rr;
- while ((rr = configtasks.next()) != null) {
- try {
- rr.run();
- } catch (Exception e) {
- eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
- }
- }
+ runTasks();
} catch (Exception e) {
NodeUtils.setIpAndFqdnForEelf("fetchconfigs");
eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
- eelfLogger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e.getMessage());
+ eelfLogger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e);
pfetcher.request();
}
}
+ private void runTasks() {
+ Runnable rr;
+ while ((rr = configtasks.next()) != null) {
+ try {
+ rr.run();
+ } catch (Exception e) {
+ eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
+ }
+ }
+ }
+
/**
* Process a gofetch request from a particular IP address. If the IP address is not an IP address we would go to to
* fetch the provisioning data, ignore the request. If the data has been fetched very recently (default 10
@@ -307,14 +324,14 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Am I configured?
+ * Am I configured.
*/
public boolean isConfigured() {
return (config != null);
}
/**
- * Am I shut down?
+ * Am I shut down.
*/
public boolean isShutdown() {
return (quiesce.exists());
@@ -331,7 +348,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Given a set of credentials and an IP address, is this request from another node?
+ * Given a set of credentials and an IP address, is this request from another node.
*
* @param credentials Credentials offered by the supposed node
* @param ip IP address the request came from
@@ -354,16 +371,6 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Check whether delete file is allowed.
- *
- * @param subId The ID of the subscription being requested
- * @return True if the delete file is permitted for the subscriber.
- */
- public boolean isDeletePermitted(String subId) {
- return (config.isDeletePermitted(subId));
- }
-
- /**
* Check whether publication is allowed for AAF Feed.
*
* @param feedid The ID of the feed being requested
@@ -371,7 +378,17 @@ public class NodeConfigManager implements DeliveryQueueHelper {
* @return True if the IP and credentials are valid for the specified feed.
*/
public String isPublishPermitted(String feedid, String ip) {
- return(config.isPublishPermitted(feedid, ip));
+ return (config.isPublishPermitted(feedid, ip));
+ }
+
+ /**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested
+ * @return True if the delete file is permitted for the subscriber.
+ */
+ public boolean isDeletePermitted(String subId) {
+ return (config.isDeletePermitted(subId));
}
/**
@@ -386,12 +403,16 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * AAF changes: TDP EPIC US# 307413
- * Check AAF_instance for feed ID in NodeConfig
+ * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID in NodeConfig.
+ *
* @param feedid The ID of the feed specified
*/
public String getAafInstance(String feedid) {
- return(config.getAafInstance(feedid));
+ return (config.getAafInstance(feedid));
+ }
+
+ public String getAafInstance() {
+ return aafInstance;
}
/**
@@ -407,7 +428,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Get a provisioned configuration parameter (from the provisioning server configuration)
+ * Get a provisioned configuration parameter (from the provisioning server configuration).
*
* @param name The name of the parameter
* @return The value of the parameter or null if it is not defined.
@@ -417,7 +438,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Get a provisioned configuration parameter (from the provisioning server configuration)
+ * Get a provisioned configuration parameter (from the provisioning server configuration).
*
* @param name The name of the parameter
* @param defaultValue The value to use if the parameter is not defined
@@ -432,7 +453,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Generate a publish ID
+ * Generate a publish ID.
*/
public String getPublishId() {
return (pid.next());
@@ -446,14 +467,14 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Register a task to run whenever the configuration changes
+ * Register a task to run whenever the configuration changes.
*/
public void registerConfigTask(Runnable task) {
configtasks.addTask(task);
}
/**
- * Deregister a task to run whenever the configuration changes
+ * Deregister a task to run whenever the configuration changes.
*/
public void deregisterConfigTask(Runnable task) {
configtasks.removeTask(task);
@@ -476,14 +497,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Is a destination redirected?
- */
- public boolean isDestRedirected(DestInfo destinfo) {
- return (followredirects && rdmgr.isRedirected(destinfo.getSubId()));
- }
-
- /**
- * Set up redirection on receipt of a 3XX from a target URL
+ * Set up redirection on receipt of a 3XX from a target URL.
*/
public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
fileid = "/" + fileid;
@@ -500,24 +514,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Set up redirection on receipt of a 3XX from a target URL
- */
- public boolean handleRedirectionSubLevel(DeliveryTask task, DestInfo destinfo, String redirto, String fileid) {
- fileid = "/" + fileid;
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
- if (task.getFollowRedirects() && subid != null && redirto.endsWith(fileid)) {
- redirto = redirto.substring(0, redirto.length() - fileid.length());
- if (!redirto.equals(purl)) {
- rdmgr.redirect(subid, purl, redirto);
- return true;
- }
- }
- return false;
- }
-
- /**
- * Handle unreachable target URL
+ * Handle unreachable target URL.
*/
public void handleUnreachable(DestInfo destinationInfo) {
String subid = destinationInfo.getSubId();
@@ -527,35 +524,35 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Get the timeout before retrying after an initial delivery failure
+ * Get the timeout before retrying after an initial delivery failure.
*/
public long getInitFailureTimer() {
return (initfailuretimer);
}
/**
- * Get the timeout before retrying after delivery and wait for file processing
+ * Get the timeout before retrying after delivery and wait for file processing.
*/
public long getWaitForFileProcessFailureTimer() {
return (waitForFileProcessFailureTimer);
}
/**
- * Get the maximum timeout between delivery attempts
+ * Get the maximum timeout between delivery attempts.
*/
public long getMaxFailureTimer() {
return (maxfailuretimer);
}
/**
- * Get the ratio between consecutive delivery attempts
+ * Get the ratio between consecutive delivery attempts.
*/
public double getFailureBackoff() {
return (failurebackoff);
}
/**
- * Get the expiration timer for deliveries
+ * Get the expiration timer for deliveries.
*/
public long getExpirationTimer() {
return (expirationtimer);
@@ -576,7 +573,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Get the targets for a feed
+ * Get the targets for a feed.
*
* @param feedid The feed ID
* @return The targets this feed should be delivered to
@@ -586,149 +583,160 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Get the creation date for a feed
- * @param feedid The feed ID
- * @return the timestamp of creation date of feed id passed
+ * Get the spool directory for temporary files.
*/
- public String getCreatedDate(String feedid) {
- return(config.getCreatedDate(feedid));
+ public String getSpoolDir() {
+ return (spooldir + "/f");
}
/**
- * Get the spool directory for temporary files
+ * Get the spool directory for a subscription.
*/
- public String getSpoolDir() {
- return (spooldir + "/f");
+ public String getSpoolDir(String subid, String remoteaddr) {
+ if (provcheck.isFrom(remoteaddr)) {
+ String sdir = config.getSpoolDir(subid);
+ if (sdir != null) {
+ eelfLogger.info("NODE0310 Received subscription reset request for subscription " + subid
+ + " from provisioning server " + remoteaddr);
+ } else {
+ eelfLogger.info("NODE0311 Received subscription reset request for unknown subscription " + subid
+ + " from provisioning server " + remoteaddr);
+ }
+ return (sdir);
+ } else {
+ eelfLogger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
+ return (null);
+ }
}
/**
- * Get the base directory for spool directories
+ * Get the base directory for spool directories.
*/
public String getSpoolBase() {
return (spooldir);
}
/**
- * Get the key store type
+ * Get the key store type.
*/
public String getKSType() {
return (kstype);
}
/**
- * Get the key store file
+ * Get the key store file.
*/
public String getKSFile() {
return (ksfile);
}
/**
- * Get the key store password
+ * Get the key store password.
*/
public String getKSPass() {
return (kspass);
}
/**
- * Get the key password
+ * Get the key password.
*/
public String getKPass() {
return (kpass);
}
/**
- * Get the http port
+ * Get the http port.
*/
public int getHttpPort() {
return (gfport);
}
/**
- * Get the https port
+ * Get the https port.
*/
public int getHttpsPort() {
return (svcport);
}
/**
- * Get the externally visible https port
+ * Get the externally visible https port.
*/
public int getExtHttpsPort() {
return (port);
}
/**
- * Get the external name of this machine
+ * Get the external name of this machine.
*/
public String getMyName() {
return (myname);
}
/**
- * Get the number of threads to use for delivery
+ * Get the number of threads to use for delivery.
*/
public int getDeliveryThreads() {
return (deliverythreads);
}
/**
- * Get the URL for uploading the event log data
+ * Get the URL for uploading the event log data.
*/
public String getEventLogUrl() {
return (eventlogurl);
}
/**
- * Get the prefix for the names of event log files
+ * Get the prefix for the names of event log files.
*/
public String getEventLogPrefix() {
return (eventlogprefix);
}
/**
- * Get the suffix for the names of the event log files
+ * Get the suffix for the names of the event log files.
*/
public String getEventLogSuffix() {
return (eventlogsuffix);
}
/**
- * Get the interval between event log file rollovers
+ * Get the interval between event log file rollovers.
*/
public String getEventLogInterval() {
return (eventloginterval);
}
/**
- * Should I follow redirects from subscribers?
+ * Should I follow redirects from subscribers.
*/
public boolean isFollowRedirects() {
return (followredirects);
}
/**
- * Get the directory where the event and node log files live
+ * Get the directory where the event and node log files live.
*/
public String getLogDir() {
return (logdir);
}
/**
- * How long do I keep log files (in milliseconds)
+ * How long do I keep log files (in milliseconds).
*/
public long getLogRetention() {
return (logretention);
}
/**
- * Get the timer
+ * Get the timer.
*/
public Timer getTimer() {
return (timer);
}
/**
- * Get the feed ID for a subscription
+ * Get the feed ID for a subscription.
*
* @param subid The subscription ID
* @return The feed ID
@@ -738,7 +746,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Get the authorization string this node uses
+ * Get the authorization string this node uses.
*
* @return The Authorization string for this node
*/
@@ -763,72 +771,33 @@ public class NodeConfigManager implements DeliveryQueueHelper {
}
/**
- * Disable and enable protocols
- * */
+ * Disable and enable protocols.
+ */
public String[] getEnabledprotocols() {
return enabledprotocols;
}
- public void setEnabledprotocols(String[] enabledprotocols) {
- this.enabledprotocols = enabledprotocols.clone();
- }
-
- /**
- * Get the spool directory for a subscription
- */
- public String getSpoolDir(String subid, String remoteaddr) {
- if (provcheck.isFrom(remoteaddr)) {
- String sdir = config.getSpoolDir(subid);
- if (sdir != null) {
- eelfLogger.info("NODE0310 Received subscription reset request for subscription " + subid
- + " from provisioning server " + remoteaddr);
- } else {
- eelfLogger.info("NODE0311 Received subscription reset request for unknown subscription " + subid
- + " from provisioning server " + remoteaddr);
- }
- return (sdir);
- } else {
- eelfLogger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
- return (null);
- }
- }
public String getAafType() {
return aafType;
}
- public void setAafType(String aafType) {
- this.aafType = aafType;
- }
- public String getAafInstance() {
- return aafInstance;
- }
- public void setAafInstance(String aafInstance) {
- this.aafInstance = aafInstance;
- }
+
public String getAafAction() {
return aafAction;
}
- public void setAafAction(String aafAction) {
- this.aafAction = aafAction;
- }
+
/*
* Get aafURL from SWM variable
* */
public String getAafURL() {
return aafURL;
}
- public void setAafURL(String aafURL) {
- this.aafURL = aafURL;
- }
- public boolean getCadiEnabeld() {
+ public boolean getCadiEnabled() {
return cadiEnabled;
}
- public void setCadiEnabled(boolean cadiEnabled) {
- this.cadiEnabled = cadiEnabled;
- }
/**
- * Builds the permissions string to be verified
+ * Builds the permissions string to be verified.
*
* @param aafInstance The aaf instance
* @return The permissions
@@ -837,12 +806,12 @@ public class NodeConfigManager implements DeliveryQueueHelper {
try {
String type = getAafType();
String action = getAafAction();
- if (aafInstance == null || aafInstance.equals("")) {
+ if ("".equals(aafInstance)) {
aafInstance = getAafInstance();
}
return type + "|" + aafInstance + "|" + action;
} catch (Exception e) {
- eelfLogger.error("NODE0543 NodeConfigManager.getPermission: ", e.getMessage());
+ eelfLogger.error("NODE0543 NodeConfigManager.getPermission: ", e);
}
return null;
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
index 058295d3..abec7393 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
@@ -23,79 +23,42 @@
package org.onap.dmaap.datarouter.node;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Properties;
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletException;
import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.server.*;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.onap.aaf.cadi.PropAccess;
-import javax.servlet.DispatcherType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.EnumSet;
-import java.util.Properties;
-
/**
- * The main starting point for the Data Router node
+ * The main starting point for the Data Router node.
*/
public class NodeMain {
- private NodeMain() {
- }
-
private static EELFLogger nodeMainLogger = EELFManager.getInstance().getLogger(NodeMain.class);
-
- class Inner {
- InputStream getCadiProps() {
- InputStream in = null;
- try {
- in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties");
- } catch (Exception e) {
- nodeMainLogger.error("Exception in Inner.getCadiProps() method " + e.getMessage());
- }
- return in;
- }
- }
-
- private static class WaitForConfig implements Runnable {
-
- private NodeConfigManager localNodeConfigManager;
-
- WaitForConfig(NodeConfigManager ncm) {
- this.localNodeConfigManager = ncm;
- }
-
- public synchronized void run() {
- notify();
- }
-
- synchronized void waitForConfig() {
- localNodeConfigManager.registerConfigTask(this);
- while (!localNodeConfigManager.isConfigured()) {
- nodeMainLogger.info("NODE0003 Waiting for Node Configuration");
- try {
- wait();
- } catch (Exception exception) {
- nodeMainLogger
- .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(),
- exception);
- }
- }
- localNodeConfigManager.deregisterConfigTask(this);
- nodeMainLogger.info("NODE0004 Node Configuration Data Received");
- }
- }
-
private static Delivery delivery;
private static NodeConfigManager nodeConfigManager;
+ private NodeMain() {
+ }
+
/**
- * Reset the retry timer for a subscription
+ * Reset the retry timer for a subscription.
*/
static void resetQueue(String subid, String ip) {
delivery.resetQueue(nodeConfigManager.getSpoolDir(subid, ip));
@@ -103,9 +66,9 @@ public class NodeMain {
/**
* Start the data router.
- * <p>
- * The location of the node configuration file can be set using the org.onap.dmaap.datarouter.node.properties system
- * property. By default, it is "/opt/app/datartr/etc/node.properties".
+ *
+ * <p>The location of the node configuration file can be set using the org.onap.dmaap.datarouter.node.properties
+ * system property. By default, it is "/opt/app/datartr/etc/node.properties".
*/
public static void main(String[] args) throws Exception {
nodeMainLogger.info("NODE0001 Data Router Node Starting");
@@ -123,7 +86,8 @@ public class NodeMain {
httpConfiguration.setRequestHeaderSize(2048);
// HTTP connector
- try (ServerConnector httpServerConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration))) {
+ try (ServerConnector httpServerConnector = new ServerConnector(server,
+ new HttpConnectionFactory(httpConfiguration))) {
httpServerConnector.setPort(nodeConfigManager.getHttpPort());
httpServerConnector.setIdleTimeout(2000);
@@ -134,7 +98,8 @@ public class NodeMain {
sslContextFactory.setKeyStorePassword(nodeConfigManager.getKSPass());
sslContextFactory.setKeyManagerPassword(nodeConfigManager.getKPass());
- //SP-6 : Fixes for SDV scan to exclude/remove DES/3DES ciphers are taken care by upgrading jdk in descriptor.xml
+ //SP-6: Fixes for SDV scan to exclude/remove DES/3DES
+ // ciphers are taken care by upgrading jdk in descriptor.xml
sslContextFactory.setExcludeCipherSuites(
"SSL_RSA_WITH_DES_CBC_SHA",
"SSL_DHE_RSA_WITH_DES_CBC_SHA",
@@ -147,9 +112,12 @@ public class NodeMain {
sslContextFactory.addExcludeProtocols("SSLv3");
sslContextFactory.setIncludeProtocols(nodeConfigManager.getEnabledprotocols());
- nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" + String.join(",", sslContextFactory.getExcludeProtocols()));
- nodeMainLogger.info("NODE00004 Supported protocols node server:-" + String.join(",", sslContextFactory.getIncludeProtocols()));
- nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" + String.join(",", sslContextFactory.getExcludeCipherSuites()));
+ nodeMainLogger.info("NODE00004 Unsupported protocols node server:-"
+ + String.join(",", sslContextFactory.getExcludeProtocols()));
+ nodeMainLogger.info("NODE00004 Supported protocols node server:-"
+ + String.join(",", sslContextFactory.getIncludeProtocols()));
+ nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-"
+ + String.join(",", sslContextFactory.getExcludeCipherSuites()));
HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration);
httpsConfiguration.setRequestHeaderSize(8192);
@@ -174,20 +142,8 @@ public class NodeMain {
servletContextHandler.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
//CADI Filter activation check
- if (nodeConfigManager.getCadiEnabeld()) {
- Properties cadiProperties = new Properties();
- try {
- Inner obj = new NodeMain().new Inner();
- InputStream in = obj.getCadiProps();
- cadiProperties.load(in);
- } catch (IOException e1) {
- nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties " + e1.getMessage());
- }
- cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL());
- nodeMainLogger.info("NODE00005 aaf_url set to - " + cadiProperties.getProperty("aaf_url"));
-
- PropAccess access = new PropAccess(cadiProperties);
- servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet.of(DispatcherType.REQUEST));
+ if (nodeConfigManager.getCadiEnabled()) {
+ enableCadi(servletContextHandler);
}
server.setHandler(servletContextHandler);
@@ -199,9 +155,69 @@ public class NodeMain {
server.start();
nodeMainLogger.info("NODE00006 Node Server started-" + server.getState());
} catch (Exception e) {
- nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e.getMessage());
+ nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable: " + e.getMessage(), e);
}
server.join();
nodeMainLogger.info("NODE00007 Node Server joined - " + server.getState());
}
+
+ private static void enableCadi(ServletContextHandler servletContextHandler) throws ServletException {
+ Properties cadiProperties = new Properties();
+ try {
+ Inner obj = new NodeMain().new Inner();
+ InputStream in = obj.getCadiProps();
+ cadiProperties.load(in);
+ } catch (IOException e1) {
+ nodeMainLogger
+ .error("NODE00005 Exception in NodeMain.Main() loading CADI properties " + e1.getMessage(), e1);
+ }
+ cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL());
+ nodeMainLogger.info("NODE00005 aaf_url set to - " + cadiProperties.getProperty("aaf_url"));
+
+ PropAccess access = new PropAccess(cadiProperties);
+ servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet
+ .of(DispatcherType.REQUEST));
+ }
+
+ private static class WaitForConfig implements Runnable {
+
+ private NodeConfigManager localNodeConfigManager;
+
+ WaitForConfig(NodeConfigManager ncm) {
+ this.localNodeConfigManager = ncm;
+ }
+
+ public synchronized void run() {
+ notify();
+ }
+
+ synchronized void waitForConfig() {
+ localNodeConfigManager.registerConfigTask(this);
+ while (!localNodeConfigManager.isConfigured()) {
+ nodeMainLogger.info("NODE0003 Waiting for Node Configuration");
+ try {
+ wait();
+ } catch (Exception exception) {
+ nodeMainLogger
+ .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(),
+ exception);
+ }
+ }
+ localNodeConfigManager.deregisterConfigTask(this);
+ nodeMainLogger.info("NODE0004 Node Configuration Data Received");
+ }
+ }
+
+ class Inner {
+
+ InputStream getCadiProps() {
+ InputStream in = null;
+ try {
+ in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties");
+ } catch (Exception e) {
+ nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e);
+ }
+ return in;
+ }
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
index a9842116..3b82484a 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
@@ -24,15 +24,10 @@
package org.onap.dmaap.datarouter.node;
+import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-import org.slf4j.MDC;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
@@ -45,13 +40,17 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.regex.Pattern;
-
-import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
/**
- * Servlet for handling all http and https requests to the data router node
- * <p>
- * Handled requests are:
+ * Servlet for handling all http and https requests to the data router node.
+ *
+ * <p>Handled requests are:
* <br>
* GET http://<i>node</i>/internal/fetchProv - fetch the provisioning data
* <br>
@@ -61,29 +60,33 @@ import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
*/
public class NodeServlet extends HttpServlet {
+ private static final String FROM = " from ";
+ private static final String INVALID_REQUEST_URI = "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.";
+ private static final String IO_EXCEPTION = "IOException";
+ private static final String ON_BEHALF_OF = "X-DMAAP-DR-ON-BEHALF-OF";
private static NodeConfigManager config;
- private static Pattern MetaDataPattern;
+ private static Pattern metaDataPattern;
private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeServlet.class);
- private final Delivery delivery;
static {
final String ws = "\\s*";
// assume that \\ and \" have been replaced by X
final String string = "\"[^\"]*\"";
- //String string = "\"(?:[^\"\\\\]|\\\\.)*\"";
final String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";
final String value = "(?:" + string + "|" + number + "|null|true|false)";
final String item = string + ws + ":" + ws + value + ws;
final String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;
- MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
+ metaDataPattern = Pattern.compile(object, Pattern.DOTALL);
}
+ private final Delivery delivery;
+
NodeServlet(Delivery delivery) {
this.delivery = delivery;
}
/**
- * Get the NodeConfigurationManager
+ * Get the NodeConfigurationManager.
*/
@Override
public void init() {
@@ -91,7 +94,7 @@ public class NodeServlet extends HttpServlet {
eelfLogger.info("NODE0101 Node Servlet Configured");
}
- private boolean down(HttpServletResponse resp) throws IOException {
+ private boolean down(HttpServletResponse resp) {
if (config.isShutdown() || !config.isConfigured()) {
sendResponseError(resp, HttpServletResponse.SC_SERVICE_UNAVAILABLE, eelfLogger);
eelfLogger.info("NODE0102 Rejecting request: Service is being quiesced");
@@ -101,7 +104,7 @@ public class NodeServlet extends HttpServlet {
}
/**
- * Handle a GET for /internal/fetchProv
+ * Handle a GET for /internal/fetchProv.
*/
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
@@ -109,15 +112,10 @@ public class NodeServlet extends HttpServlet {
NodeUtils.setRequestIdAndInvocationId(req);
eelfLogger.info(EelfMsgs.ENTRY);
try {
- eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+ eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
getIdFromPath(req) + "");
- try {
- if (down(resp)) {
- return;
- }
-
- } catch (IOException ioe) {
- eelfLogger.error("IOException" + ioe.getMessage());
+ if (down(resp)) {
+ return;
}
String path = req.getPathInfo();
String qs = req.getQueryString();
@@ -138,7 +136,7 @@ public class NodeServlet extends HttpServlet {
}
}
- eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);
+ eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + FROM + ip);
sendResponseError(resp, HttpServletResponse.SC_NOT_FOUND, eelfLogger);
} finally {
eelfLogger.info(EelfMsgs.EXIT);
@@ -146,50 +144,55 @@ public class NodeServlet extends HttpServlet {
}
/**
- * Handle all PUT requests
+ * Handle all PUT requests.
*/
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) {
NodeUtils.setIpAndFqdnForEelf("doPut");
NodeUtils.setRequestIdAndInvocationId(req);
eelfLogger.info(EelfMsgs.ENTRY);
- eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+ eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
getIdFromPath(req) + "");
try {
common(req, resp, true);
} catch (IOException ioe) {
- eelfLogger.error("IOException" + ioe.getMessage());
+ eelfLogger.error(IO_EXCEPTION, ioe);
eelfLogger.info(EelfMsgs.EXIT);
}
}
/**
- * Handle all DELETE requests
+ * Handle all DELETE requests.
*/
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) {
NodeUtils.setIpAndFqdnForEelf("doDelete");
NodeUtils.setRequestIdAndInvocationId(req);
eelfLogger.info(EelfMsgs.ENTRY);
- eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+ eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
getIdFromPath(req) + "");
try {
common(req, resp, false);
} catch (IOException ioe) {
- eelfLogger.error("IOException " + ioe.getMessage());
+ eelfLogger.error(IO_EXCEPTION, ioe);
eelfLogger.info(EelfMsgs.EXIT);
}
}
private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+ final String PUBLISH = "/publish/";
+ final String INTERNAL_PUBLISH = "/internal/publish/";
+ final String HTTPS = "https://";
+ final String USER = " user ";
String fileid = getFileId(req, resp);
- if (fileid == null) return;
+ if (fileid == null) {
+ return;
+ }
String feedid = null;
String user = null;
String ip = req.getRemoteAddr();
String lip = req.getLocalAddr();
String pubid = null;
- String xpubid = null;
String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
Target[] targets = null;
boolean isAAFFeed = false;
@@ -199,37 +202,38 @@ public class NodeServlet extends HttpServlet {
}
String credentials = req.getHeader("Authorization");
if (credentials == null) {
- eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0306 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");
eelfLogger.info(EelfMsgs.EXIT);
return;
}
- if (fileid.startsWith("/publish/")) {
+ if (fileid.startsWith(PUBLISH)) {
fileid = fileid.substring(9);
- int i = fileid.indexOf('/');
- if (i == -1 || i == fileid.length() - 1) {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ int index = fileid.indexOf('/');
+ if (index == -1 || index == fileid.length() - 1) {
+ eelfLogger.error("NODE0205 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
"Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid.");
eelfLogger.info(EelfMsgs.EXIT);
return;
}
- feedid = fileid.substring(0, i);
+ feedid = fileid.substring(0, index);
- if (config.getCadiEnabeld()) {
+ if (config.getCadiEnabled()) {
String path = req.getPathInfo();
if (!path.startsWith("/internal") && feedid != null) {
String aafInstance = config.getAafInstance(feedid);
- if (!(aafInstance.equalsIgnoreCase("legacy"))) {
+ if (!("legacy".equalsIgnoreCase(aafInstance))) {
isAAFFeed = true;
String permission = config.getPermission(aafInstance);
eelfLogger.info("NodeServlet.common() permission string - " + permission);
//Check in CADI Framework API if user has AAF permission or not
if (!req.isUserInRole(permission)) {
String message = "AAF disallows access to permission string - " + permission;
- eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());
+ eelfLogger.error("NODE0307 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo()
+ + FROM + req.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
eelfLogger.info(EelfMsgs.EXIT);
return;
@@ -238,11 +242,10 @@ public class NodeServlet extends HttpServlet {
}
}
- fileid = fileid.substring(i + 1);
+ fileid = fileid.substring(index + 1);
pubid = config.getPublishId();
- xpubid = req.getHeader("X-DMAAP-DR-PUBLISH-ID");
targets = config.getTargets(feedid);
- } else if (fileid.startsWith("/internal/publish/")) {
+ } else if (fileid.startsWith(INTERNAL_PUBLISH)) {
if (!config.isAnotherNode(credentials, ip)) {
eelfLogger.error("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);
resp.sendError(HttpServletResponse.SC_FORBIDDEN);
@@ -254,18 +257,18 @@ public class NodeServlet extends HttpServlet {
user = "datartr"; // SP6 : Added usr as datartr to avoid null entries for internal routing
targets = config.parseRouting(req.getHeader("X-DMAAP-DR-ROUTING"));
} else {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0204 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ INVALID_REQUEST_URI);
eelfLogger.info(EelfMsgs.EXIT);
return;
}
if (fileid.indexOf('/') != -1) {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0202 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ INVALID_REQUEST_URI);
eelfLogger.info(EelfMsgs.EXIT);
return;
}
@@ -278,14 +281,16 @@ public class NodeServlet extends HttpServlet {
if (xp != 443) {
hp = hp + ":" + xp;
}
- String logurl = "https://" + hp + "/internal/publish/" + fileid;
+ String logurl = HTTPS + hp + INTERNAL_PUBLISH + fileid;
if (feedid != null) {
- logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;
+ logurl = HTTPS + hp + PUBLISH + feedid + "/" + fileid;
//Cadi code starts
if (!isAAFFeed) {
String reason = config.isPublishPermitted(feedid, credentials, ip);
if (reason != null) {
- eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason " + PathUtil.cleanString(reason));
+ eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil
+ .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil
+ .cleanString(ip) + " reason " + PathUtil.cleanString(reason));
resp.sendError(HttpServletResponse.SC_FORBIDDEN, reason);
eelfLogger.info(EelfMsgs.EXIT);
return;
@@ -294,9 +299,12 @@ public class NodeServlet extends HttpServlet {
} else {
String reason = config.isPublishPermitted(feedid, ip);
if (reason != null) {
- eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason Invalid AAF user- " + PathUtil.cleanString(reason));
+ eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil
+ .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil
+ .cleanString(ip) + " reason Invalid AAF user- " + PathUtil.cleanString(reason));
String message = "Invalid AAF user- " + PathUtil.cleanString(reason);
- eelfLogger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + PathUtil.cleanString(req.getPathInfo()) + " from " + PathUtil.cleanString(req.getRemoteAddr()));
+ eelfLogger.info("NODE0308 Rejecting unauthenticated PUT or DELETE of " + PathUtil
+ .cleanString(req.getPathInfo()) + FROM + PathUtil.cleanString(req.getRemoteAddr()));
resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
return;
}
@@ -316,25 +324,26 @@ public class NodeServlet extends HttpServlet {
if (iport != 443) {
port = ":" + iport;
}
- String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;
- eelfLogger.info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil.cleanString(redirto)); //Fortify scan fixes - log forging
+ String redirto = HTTPS + newnode + port + PUBLISH + feedid + "/" + fileid;
+ eelfLogger
+ .info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + USER
+ + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil
+ .cleanString(redirto)); //Fortify scan fixes - log forging
resp.sendRedirect(PathUtil.cleanString(redirto)); //Fortify scan fixes-open redirect - 2 issues
eelfLogger.info(EelfMsgs.EXIT);
return;
}
resp.setHeader("X-DMAAP-DR-PUBLISH-ID", pubid);
}
- if (req.getPathInfo().startsWith("/internal/publish/")) {
+ if (req.getPathInfo().startsWith(INTERNAL_PUBLISH)) {
feedid = req.getHeader("X-DMAAP-DR-FEED-ID");
}
String fbase = PathUtil.cleanString(config.getSpoolDir() + "/" + pubid); //Fortify scan fixes-Path manipulation
File data = new File(fbase);
File meta = new File(fbase + ".M");
- OutputStream dos = null;
Writer mw = null;
- InputStream is = null;
try {
- StringBuffer mx = new StringBuffer();
+ StringBuilder mx = new StringBuilder();
mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
Enumeration hnames = req.getHeaderNames();
String ctype = null;
@@ -343,13 +352,13 @@ public class NodeServlet extends HttpServlet {
while (hnames.hasMoreElements()) {
String hn = (String) hnames.nextElement();
String hnlc = hn.toLowerCase();
- if ((isput && ("content-type".equals(hnlc) ||
- "content-language".equals(hnlc) ||
- "content-md5".equals(hnlc) ||
- "content-range".equals(hnlc))) ||
- "x-dmaap-dr-meta".equals(hnlc) ||
- (feedid == null && "x-dmaap-dr-received".equals(hnlc)) ||
- (hnlc.startsWith("x-") && !hnlc.startsWith("x-dmaap-dr-"))) {
+ if ((isput && ("content-type".equals(hnlc)
+ || "content-language".equals(hnlc)
+ || "content-md5".equals(hnlc)
+ || "content-range".equals(hnlc)))
+ || "x-dmaap-dr-meta".equals(hnlc)
+ || (feedid == null && "x-dmaap-dr-received".equals(hnlc))
+ || (hnlc.startsWith("x-") && !hnlc.startsWith("x-dmaap-dr-"))) {
Enumeration hvals = req.getHeaders(hn);
while (hvals.hasMoreElements()) {
String hv = (String) hvals.nextElement();
@@ -364,13 +373,17 @@ public class NodeServlet extends HttpServlet {
}
if ("x-dmaap-dr-meta".equals(hnlc)) {
if (hv.length() > 4096) {
- eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
+ eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed "
+ + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip "
+ + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");
eelfLogger.info(EelfMsgs.EXIT);
return;
}
- if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {
- eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
+ if (!metaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {
+ eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed "
+ + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip "
+ + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");
eelfLogger.info(EelfMsgs.EXIT);
return;
@@ -388,28 +401,12 @@ public class NodeServlet extends HttpServlet {
}
mx.append("X-DMAAP-DR-RECEIVED\t").append(rcvd).append('\n');
String metadata = mx.toString();
- byte[] buf = new byte[1024 * 1024];
- int i;
- try {
- is = req.getInputStream();
- dos = new FileOutputStream(data);
- while ((i = is.read(buf)) > 0) {
- dos.write(buf, 0, i);
- }
- is.close();
- is = null;
- dos.close();
- dos = null;
- } catch (IOException ioe) {
- long exlen = -1;
- try {
- exlen = Long.parseLong(req.getHeader("Content-Length"));
- } catch (Exception e) {
- eelfLogger.error("NODE0529 Exception common: " + e);
- }
- StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());
- eelfLogger.info(EelfMsgs.EXIT);
- throw ioe;
+ long exlen = getExlen(req);
+ String message = writeInputStreamToFile(req, data);
+ if (message != null) {
+ StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user,
+ message);
+ throw new IOException(message);
}
Path dpath = Paths.get(fbase);
for (Target t : targets) {
@@ -418,7 +415,8 @@ public class NodeServlet extends HttpServlet {
// TODO: unknown destination
continue;
}
- String dbase = PathUtil.cleanString(di.getSpool() + "/" + pubid); //Fortify scan fixes-Path Manipulation
+ String dbase = PathUtil
+ .cleanString(di.getSpool() + "/" + pubid); //Fortify scan fixes-Path Manipulation
Files.createLink(Paths.get(dbase), dpath);
mw = new FileWriter(meta);
mw.write(metadata);
@@ -427,45 +425,28 @@ public class NodeServlet extends HttpServlet {
}
mw.close();
meta.renameTo(new File(dbase + ".M"));
-
}
resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
try {
resp.getOutputStream().close();
} catch (IOException ioe) {
- long exlen = -1;
- try {
- exlen = Long.parseLong(req.getHeader("Content-Length"));
- } catch (Exception e) {
- eelfLogger.error("NODE00000 Exception common: " + e.getMessage());
- }
- StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());
+ StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user,
+ ioe.getMessage());
//Fortify scan fixes - log forging
- eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe.toString(), ioe);
-
+ eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid)
+ + USER + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe
+ .toString(), ioe);
throw ioe;
}
- StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);
+ StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user,
+ HttpServletResponse.SC_NO_CONTENT);
} catch (IOException ioe) {
- eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);
+ eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + USER + user
+ + " ip " + ip + " " + ioe.toString(), ioe);
eelfLogger.info(EelfMsgs.EXIT);
throw ioe;
} finally {
- if (is != null) {
- try {
- is.close();
- } catch (Exception e) {
- eelfLogger.error("NODE0530 Exception common: " + e);
- }
- }
- if (dos != null) {
- try {
- dos.close();
- } catch (Exception e) {
- eelfLogger.error("NODE0531 Exception common: " + e);
- }
- }
if (mw != null) {
try {
mw.close();
@@ -486,22 +467,49 @@ public class NodeServlet extends HttpServlet {
}
}
+ private String writeInputStreamToFile(HttpServletRequest req, File data) {
+ byte[] buf = new byte[1024 * 1024];
+ int bytesRead;
+ try (OutputStream dos = new FileOutputStream(data);
+ InputStream is = req.getInputStream()) {
+ while ((bytesRead = is.read(buf)) > 0) {
+ dos.write(buf, 0, bytesRead);
+ }
+ } catch (IOException ioe) {
+ eelfLogger.error("NODE0530 Exception common: " + ioe, ioe);
+ eelfLogger.info(EelfMsgs.EXIT);
+ return ioe.getMessage();
+ }
+ return null;
+ }
+
+ private long getExlen(HttpServletRequest req) {
+ long exlen = -1;
+ try {
+ exlen = Long.parseLong(req.getHeader("Content-Length"));
+ } catch (Exception e) {
+ eelfLogger.error("NODE0529 Exception common: " + e);
+ }
+ return exlen;
+ }
+
private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+ final String FROM_DR_MESSAGE = ".M) from DR Node: ";
try {
fileid = fileid.substring(8);
- int i = fileid.indexOf('/');
- if (i == -1 || i == fileid.length() - 1) {
- eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+ int index = fileid.indexOf('/');
+ if (index == -1 || index == fileid.length() - 1) {
+ eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
"Invalid request URI. Expecting <subId>/<pubId>.");
eelfLogger.info(EelfMsgs.EXIT);
return;
}
- String subscriptionId = fileid.substring(0, i);
+ String subscriptionId = fileid.substring(0, index);
int subId = Integer.parseInt(subscriptionId);
- pubid = fileid.substring(i + 1);
- String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ pubid = fileid.substring(index + 1);
+ String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
+ config.getMyName() + ".";
int subIdDir = subId - (subId % 100);
if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
@@ -509,7 +517,7 @@ public class NodeServlet extends HttpServlet {
}
boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
if (result) {
- eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
+ config.getMyName());
resp.setStatus(HttpServletResponse.SC_OK);
eelfLogger.info(EelfMsgs.EXIT);
@@ -519,8 +527,8 @@ public class NodeServlet extends HttpServlet {
eelfLogger.info(EelfMsgs.EXIT);
}
} catch (IOException ioe) {
- eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
- + config.getMyName() + ". Error: " + ioe.getMessage());
+ eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
+ + config.getMyName(), ioe);
eelfLogger.info(EelfMsgs.EXIT);
}
}
@@ -533,7 +541,7 @@ public class NodeServlet extends HttpServlet {
}
if (!req.isSecure()) {
eelfLogger.error(
- "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+ "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
eelfLogger.info(EelfMsgs.EXIT);
@@ -541,17 +549,18 @@ public class NodeServlet extends HttpServlet {
}
String fileid = req.getPathInfo();
if (fileid == null) {
- eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ eelfLogger.error("NODE0201 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ INVALID_REQUEST_URI);
eelfLogger.info(EelfMsgs.EXIT);
return null;
}
return fileid;
}
- private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+ private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage)
+ throws IOException {
try {
boolean deletePermitted = config.isDeletePermitted(subscriptionId);
if (!deletePermitted) {
@@ -562,7 +571,8 @@ public class NodeServlet extends HttpServlet {
return false;
}
} catch (NullPointerException npe) {
- eelfLogger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist");
+ eelfLogger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId
+ + " does not exist", npe);
resp.sendError(HttpServletResponse.SC_NOT_FOUND);
eelfLogger.info(EelfMsgs.EXIT);
return false;
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
index 4601f99c..d4fc7dbe 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
@@ -24,15 +24,22 @@
package org.onap.dmaap.datarouter.node;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;
+import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.security.KeyStore;
+import java.security.KeyStoreException;
import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -47,10 +54,8 @@ import org.apache.commons.lang3.StringUtils;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
-import static com.att.eelf.configuration.Configuration.*;
-
/**
- * Utility functions for the data router node
+ * Utility functions for the data router node.
*/
public class NodeUtils {
@@ -61,9 +66,9 @@ public class NodeUtils {
}
/**
- * Base64 encode a byte array
+ * Base64 encode a byte array.
*
- * @param raw The bytes to be encoded
+ * @param raw The bytes to be encoded
* @return The encoded string
*/
public static String base64Encode(byte[] raw) {
@@ -71,7 +76,7 @@ public class NodeUtils {
}
/**
- * Given a user and password, generate the credentials
+ * Given a user and password, generate the credentials.
*
* @param user User name
* @param password User password
@@ -85,7 +90,7 @@ public class NodeUtils {
}
/**
- * Given a node name, generate the credentials
+ * Given a node name, generate the credentials.
*
* @param node Node name
*/
@@ -117,16 +122,12 @@ public class NodeUtils {
KeyStore ks;
try {
ks = KeyStore.getInstance(kstype);
- try (FileInputStream fileInputStream = new FileInputStream(ksfile)) {
- ks.load(fileInputStream, kspass.toCharArray());
- } catch (IOException ioException) {
- eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(),
- ioException);
+ if (loadKeyStore(ksfile, kspass, ks)) {
return (null);
}
} catch (Exception e) {
setIpAndFqdnForEelf("getCanonicalName");
- eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, ksfile, e.toString());
+ eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, e, ksfile);
return (null);
}
return (getCanonicalName(ks));
@@ -142,32 +143,19 @@ public class NodeUtils {
try {
Enumeration<String> aliases = ks.aliases();
while (aliases.hasMoreElements()) {
- String s = aliases.nextElement();
- if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {
- X509Certificate c = (X509Certificate) ks.getCertificate(s);
- if (c != null) {
- String subject = c.getSubjectX500Principal().getName();
- String[] parts = subject.split(",");
- if (parts.length < 1) {
- return (null);
- }
- subject = parts[5].trim();
- if (!subject.startsWith("CN=")) {
- return (null);
-
- }
- return (subject.substring(3));
- }
+ String name = getNameFromSubject(ks, aliases);
+ if (name != null) {
+ return name;
}
}
} catch (Exception e) {
- eelfLogger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e.getMessage());
+ eelfLogger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e);
}
return (null);
}
/**
- * Given a string representation of an IP address, get the corresponding byte array
+ * Given a string representation of an IP address, get the corresponding byte array.
*
* @param ip The IP address as a string
* @return The IP address as a byte array or null if the address is invalid
@@ -184,48 +172,48 @@ public class NodeUtils {
}
/**
- * Given a uri with parameters, split out the feed ID and file ID
+ * Given a uri with parameters, split out the feed ID and file ID.
*/
public static String[] getFeedAndFileID(String uriandparams) {
int end = uriandparams.length();
- int i = uriandparams.indexOf('#');
- if (i != -1 && i < end) {
- end = i;
+ int index = uriandparams.indexOf('#');
+ if (index != -1 && index < end) {
+ end = index;
}
- i = uriandparams.indexOf('?');
- if (i != -1 && i < end) {
- end = i;
+ index = uriandparams.indexOf('?');
+ if (index != -1 && index < end) {
+ end = index;
}
end = uriandparams.lastIndexOf('/', end);
if (end < 2) {
return (null);
}
- i = uriandparams.lastIndexOf('/', end - 1);
- if (i == -1) {
+ index = uriandparams.lastIndexOf('/', end - 1);
+ if (index == -1) {
return (null);
}
- return (new String[]{uriandparams.substring(i + 1, end), uriandparams.substring(end + 1)});
+ return (new String[]{uriandparams.substring(index + 1, end), uriandparams.substring(end + 1)});
}
/**
* Escape fields that might contain vertical bar, backslash, or newline by replacing them with backslash p,
* backslash e and backslash n.
*/
- public static String loge(String s) {
- if (s == null) {
- return (s);
+ public static String loge(String string) {
+ if (string == null) {
+ return (string);
}
- return (s.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n"));
+ return (string.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n"));
}
/**
* Undo what loge does.
*/
- public static String unloge(String s) {
- if (s == null) {
- return (s);
+ public static String unloge(String string) {
+ if (string == null) {
+ return (string);
}
- return (s.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\"));
+ return (string.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\"));
}
/**
@@ -244,9 +232,9 @@ public class NodeUtils {
return (logDate.format(when));
}
- /* Method prints method name, server FQDN and IP Address of the machine in EELF logs
- * @Method - setIpAndFqdnForEelf - Rally:US664892
- * @Params - method, prints method name in EELF log.
+ /** Method prints method name, server FQDN and IP Address of the machine in EELF logs.
+ *
+ * @param method Prints method name in EELF log.
*/
public static void setIpAndFqdnForEelf(String method) {
MDC.clear();
@@ -262,9 +250,9 @@ public class NodeUtils {
}
- /* Method sets RequestIs and InvocationId for se in EELF logs
- * @Method - setIpAndFqdnForEelf
- * @Params - Req, Request used to get RequestId and InvocationId
+ /** Method sets RequestIs and InvocationId for se in EELF logs.
+ *
+ * @param req Request used to get RequestId and InvocationId.
*/
public static void setRequestIdAndInvocationId(HttpServletRequest req) {
String reqId = req.getHeader("X-ONAP-RequestID");
@@ -279,30 +267,65 @@ public class NodeUtils {
MDC.put("InvocationId", invId);
}
+ /**
+ * Sends error as response with error code input.
+ */
public static void sendResponseError(HttpServletResponse response, int errorCode, EELFLogger intlogger) {
try {
response.sendError(errorCode);
} catch (IOException ioe) {
- intlogger.error("IOException" + ioe.getMessage());
+ intlogger.error("IOException", ioe);
}
}
/**
- * Method to check to see if file is of type gzip
+ * Method to check to see if file is of type gzip.
*
- * @param file The name of the file to be checked
- * @return True if the file is of type gzip
+ * @param file The name of the file to be checked
+ * @return True if the file is of type gzip
*/
- public static boolean isFiletypeGzip(File file){
- try(FileInputStream fileInputStream = new FileInputStream(file);
- GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
+ public static boolean isFiletypeGzip(File file) {
+ try (FileInputStream fileInputStream = new FileInputStream(file);
+ GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
return true;
- }catch (IOException e){
+ } catch (IOException e) {
eelfLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e);
return false;
}
}
+ private static boolean loadKeyStore(String ksfile, String kspass, KeyStore ks)
+ throws NoSuchAlgorithmException, CertificateException {
+ try (FileInputStream fileInputStream = new FileInputStream(ksfile)) {
+ ks.load(fileInputStream, kspass.toCharArray());
+ } catch (IOException ioException) {
+ eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(),
+ ioException);
+ return true;
+ }
+ return false;
+ }
+
+
+ private static String getNameFromSubject(KeyStore ks, Enumeration<String> aliases) throws KeyStoreException {
+ String alias = aliases.nextElement();
+ if (ks.entryInstanceOf(alias, KeyStore.PrivateKeyEntry.class)) {
+ X509Certificate cert = (X509Certificate) ks.getCertificate(alias);
+ if (cert != null) {
+ String subject = cert.getSubjectX500Principal().getName();
+ String[] parts = subject.split(",");
+ if (parts.length < 1) {
+ return null;
+ }
+ subject = parts[5].trim();
+ if (!subject.startsWith("CN=")) {
+ return null;
+ }
+ return subject.substring(3);
+ }
+ }
+ return null;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
index fec2ca39..fe3fdb6e 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
@@ -24,43 +24,70 @@
package org.onap.dmaap.datarouter.node;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Vector;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop;
/**
* Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to
- * get from this node to any other node
+ * get from this node to any other node.
*/
-public class PathFinder {
+class PathFinder {
- private static class Hop {
+ private ArrayList<String> errors = new ArrayList<>();
+ private HashMap<String, String> routes = new HashMap<>();
- boolean mark;
- boolean bad;
- NodeConfig.ProvHop basis;
+ /**
+ * Find routes from a specified origin to all of the nodes given a set of specified next hops.
+ *
+ * @param origin where we start
+ * @param nodes where we can go
+ * @param hops detours along the way
+ */
+ PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {
+ HashSet<String> known = new HashSet<>();
+ HashMap<String, HashMap<String, Hop>> ht = new HashMap<>();
+ for (String n : nodes) {
+ known.add(n);
+ ht.put(n, new HashMap<>());
+ }
+ for (NodeConfig.ProvHop ph : hops) {
+ Hop hop = getHop(known, ht, ph);
+ if (hop == null) {
+ continue;
+ }
+ if (ph.getVia().equals(ph.getTo())) {
+ errors.add(ph + " gives destination as via");
+ hop.bad = true;
+ }
+ }
+ for (String n : known) {
+ if (n.equals(origin)) {
+ routes.put(n, "");
+ }
+ routes.put(n, plot(origin, n, ht.get(n)) + "/");
+ }
}
- private Vector<String> errors = new Vector<String>();
- private Hashtable<String, String> routes = new Hashtable<String, String>();
-
/**
- * Get list of errors encountered while finding paths
+ * Get list of errors encountered while finding paths.
*
* @return array of error descriptions
*/
- public String[] getErrors() {
- return (errors.toArray(new String[errors.size()]));
+ String[] getErrors() {
+ return (errors.toArray(new String[0]));
}
/**
- * Get the route from this node to the specified node
+ * Get the route from this node to the specified node.
*
* @param destination node
* @return list of node names separated by and ending with "/"
*/
- public String getPath(String destination) {
+ String getPath(String destination) {
String ret = routes.get(destination);
if (ret == null) {
return ("");
@@ -68,13 +95,12 @@ public class PathFinder {
return (ret);
}
- private String plot(String from, String to, Hashtable<String, Hop> info) {
+ private String plot(String from, String to, HashMap<String, Hop> info) {
Hop nh = info.get(from);
if (nh == null || nh.bad) {
return (to);
}
if (nh.mark) {
- // loop detected;
while (!nh.bad) {
nh.bad = true;
errors.add(nh.basis + " is part of a cycle");
@@ -83,63 +109,46 @@ public class PathFinder {
return (to);
}
nh.mark = true;
- String x = plot(nh.basis.getVia(), to, info);
+ String route = plot(nh.basis.getVia(), to, info);
nh.mark = false;
if (nh.bad) {
return (to);
}
- return (nh.basis.getVia() + "/" + x);
+ return (nh.basis.getVia() + "/" + route);
}
- /**
- * Find routes from a specified origin to all of the nodes given a set of specified next hops.
- *
- * @param origin where we start
- * @param nodes where we can go
- * @param hops detours along the way
- */
- public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {
- HashSet<String> known = new HashSet<String>();
- Hashtable<String, Hashtable<String, Hop>> ht = new Hashtable<String, Hashtable<String, Hop>>();
- for (String n : nodes) {
- known.add(n);
- ht.put(n, new Hashtable<String, Hop>());
+ @Nullable
+ private Hop getHop(HashSet<String> known, HashMap<String, HashMap<String, Hop>> ht, ProvHop ph) {
+ if (!known.contains(ph.getFrom())) {
+ errors.add(ph + " references unknown from node");
+ return null;
}
- for (NodeConfig.ProvHop ph : hops) {
- if (!known.contains(ph.getFrom())) {
- errors.add(ph + " references unknown from node");
- continue;
- }
- if (!known.contains(ph.getTo())) {
- errors.add(ph + " references unknown destination node");
- continue;
- }
- Hashtable<String, Hop> ht2 = ht.get(ph.getTo());
- Hop h = ht2.get(ph.getFrom());
- if (h != null) {
- h.bad = true;
- errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia());
- continue;
- }
- h = new Hop();
- h.basis = ph;
- ht2.put(ph.getFrom(), h);
- if (!known.contains(ph.getVia())) {
- errors.add(ph + " references unknown via node");
- h.bad = true;
- continue;
- }
- if (ph.getVia().equals(ph.getTo())) {
- errors.add(ph + " gives destination as via");
- h.bad = true;
- continue;
- }
+ if (!known.contains(ph.getTo())) {
+ errors.add(ph + " references unknown destination node");
+ return null;
}
- for (String n : known) {
- if (n.equals(origin)) {
- routes.put(n, "");
- }
- routes.put(n, plot(origin, n, ht.get(n)) + "/");
+ HashMap<String, Hop> ht2 = ht.get(ph.getTo());
+ Hop hop = ht2.get(ph.getFrom());
+ if (hop != null) {
+ hop.bad = true;
+ errors.add(ph + " gives duplicate next hop - previous via was " + hop.basis.getVia());
+ return null;
+ }
+ hop = new Hop();
+ hop.basis = ph;
+ ht2.put(ph.getFrom(), hop);
+ if (!known.contains(ph.getVia())) {
+ errors.add(ph + " references unknown via node");
+ hop.bad = true;
+ return null;
}
+ return hop;
+ }
+
+ private static class Hop {
+
+ boolean mark;
+ boolean bad;
+ NodeConfig.ProvHop basis;
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
index a4034410..d67c9094 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
@@ -1,67 +1,84 @@
-/**
- * -
+/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * <p>
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dmaap.datarouter.node;
/**
- * FORTIFY SCAN FIXES
+ * FORTIFY SCAN FIXES.
* <p>This Utility is used for Fortify fixes. It Validates the path url formed from
- * the string passed in the request parameters.</p>
- *
+ * the string passed in the request parameters.</p>
*/
class PathUtil {
+ private PathUtil() {
+ throw new IllegalStateException("Utility Class");
+ }
+
/**
* This method takes String as the parameter and return the filtered path string.
- * @param aString String to clean
+ *
+ * @param string String to clean
* @return A cleaned String
*/
- static String cleanString(String aString) {
- if (aString == null) return null;
- String cleanString = "";
- for (int i = 0; i < aString.length(); ++i) {
- cleanString += cleanChar(aString.charAt(i));
+ static String cleanString(String string) {
+ if (string == null) {
+ return null;
+ }
+ StringBuilder cleanString = new StringBuilder();
+ for (int i = 0; i < string.length(); ++i) {
+ cleanString.append(cleanChar(string.charAt(i)));
}
- return cleanString;
+ return cleanString.toString();
}
/**
* This method filters the valid special characters in path string.
- * @param aChar The char to be cleaned
+ *
+ * @param character The char to be cleaned
* @return The cleaned char
*/
- private static char cleanChar(char aChar) {
+ private static char cleanChar(char character) {
// 0 - 9
for (int i = 48; i < 58; ++i) {
- if (aChar == i) return (char) i;
+ if (character == i) {
+ return (char) i;
+ }
}
// 'A' - 'Z'
for (int i = 65; i < 91; ++i) {
- if (aChar == i) return (char) i;
+ if (character == i) {
+ return (char) i;
+ }
}
// 'a' - 'z'
for (int i = 97; i < 123; ++i) {
- if (aChar == i) return (char) i;
+ if (character == i) {
+ return (char) i;
+ }
}
+ return getValidCharacter(character);
+ }
+
+ private static char getValidCharacter(char character) {
// other valid characters
- switch (aChar) {
+ switch (character) {
case '/':
return '/';
case '.':
@@ -82,7 +99,8 @@ class PathUtil {
return '_';
case ' ':
return ' ';
+ default:
+ return '%';
}
- return '%';
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
index 1af7dda4..03e952c1 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
@@ -24,22 +24,36 @@
package org.onap.dmaap.datarouter.node;
-import java.io.*;
-import java.util.*;
-
-import org.json.*;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import org.jetbrains.annotations.Nullable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeed;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedSubnet;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedUser;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceEgress;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceIngress;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvNode;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvParam;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvSubscription;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
/**
* Parser for provisioning data from the provisioning server.
- * <p>
- * The ProvData class uses a Reader for the text configuration from the
- * provisioning server to construct arrays of raw configuration entries.
+ *
+ * <p>The ProvData class uses a Reader for the text configuration from the provisioning server to construct arrays of
+ * raw configuration entries.
*/
public class ProvData {
+
+ private static final String FEED_ID = "feedid";
private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(ProvData.class);
private NodeConfig.ProvNode[] pn;
private NodeConfig.ProvParam[] pp;
@@ -51,208 +65,39 @@ public class ProvData {
private NodeConfig.ProvForceEgress[] pfe;
private NodeConfig.ProvHop[] ph;
- private static String[] gvasa(JSONArray a, int index) {
- return (gvasa(a.get(index)));
- }
-
- private static String[] gvasa(JSONObject o, String key) {
- return (gvasa(o.opt(key)));
- }
-
- private static String[] gvasa(Object o) {
- if (o instanceof JSONArray) {
- JSONArray a = (JSONArray) o;
- Vector<String> v = new Vector<String>();
- for (int i = 0; i < a.length(); i++) {
- String s = gvas(a, i);
- if (s != null) {
- v.add(s);
- }
- }
- return (v.toArray(new String[v.size()]));
- } else {
- String s = gvas(o);
- if (s == null) {
- return (new String[0]);
- } else {
- return (new String[]{s});
- }
- }
- }
-
- private static String gvas(JSONArray a, int index) {
- return (gvas(a.get(index)));
- }
-
- private static String gvas(JSONObject o, String key) {
- return (gvas(o.opt(key)));
- }
-
- private static String gvas(Object o) {
- if (o instanceof Boolean || o instanceof Number || o instanceof String) {
- return (o.toString());
- }
- return (null);
- }
-
/**
- * Construct raw provisioing data entries from the text (JSON)
- * provisioning document received from the provisioning server
+ * Construct raw provisioing data entries from the text (JSON) provisioning document received from the provisioning
+ * server.
*
- * @param r The reader for the JSON text.
+ * @param reader The reader for the JSON text.
*/
- public ProvData(Reader r) throws IOException {
- Vector<NodeConfig.ProvNode> pnv = new Vector<NodeConfig.ProvNode>();
- Vector<NodeConfig.ProvParam> ppv = new Vector<NodeConfig.ProvParam>();
- Vector<NodeConfig.ProvFeed> pfv = new Vector<NodeConfig.ProvFeed>();
- Vector<NodeConfig.ProvFeedUser> pfuv = new Vector<NodeConfig.ProvFeedUser>();
- Vector<NodeConfig.ProvFeedSubnet> pfsnv = new Vector<NodeConfig.ProvFeedSubnet>();
- Vector<NodeConfig.ProvSubscription> psv = new Vector<NodeConfig.ProvSubscription>();
- Vector<NodeConfig.ProvForceIngress> pfiv = new Vector<NodeConfig.ProvForceIngress>();
- Vector<NodeConfig.ProvForceEgress> pfev = new Vector<NodeConfig.ProvForceEgress>();
- Vector<NodeConfig.ProvHop> phv = new Vector<NodeConfig.ProvHop>();
+ public ProvData(Reader reader) throws IOException {
+ ArrayList<ProvNode> pnv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvParam> ppv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvFeed> pfv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvFeedUser> pfuv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvFeedSubnet> pfsnv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvSubscription> psv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvForceIngress> pfiv = new ArrayList<>();
+ ArrayList<NodeConfig.ProvForceEgress> pfev = new ArrayList<>();
+ ArrayList<NodeConfig.ProvHop> phv = new ArrayList<>();
try {
- JSONTokener jtx = new JSONTokener(r);
+ JSONTokener jtx = new JSONTokener(reader);
JSONObject jcfg = new JSONObject(jtx);
char c = jtx.nextClean();
if (c != '\0') {
throw new JSONException("Spurious characters following configuration");
}
- r.close();
- JSONArray jfeeds = jcfg.optJSONArray("feeds");
- if (jfeeds != null) {
- for (int fx = 0; fx < jfeeds.length(); fx++) {
- JSONObject jfeed = jfeeds.getJSONObject(fx);
- String stat = null;
- if (jfeed.optBoolean("suspend", false)) {
- stat = "Feed is suspended";
- }
- if (jfeed.optBoolean("deleted", false)) {
- stat = "Feed is deleted";
- }
- String fid = gvas(jfeed, "feedid");
- String fname = gvas(jfeed, "name");
- String fver = gvas(jfeed, "version");
- String createdDate = gvas(jfeed, "created_date");
- /*
- * START - AAF changes
- * TDP EPIC US# 307413
- * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds
- */
- String aafInstance = gvas(jfeed, "aaf_instance");
- pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat,createdDate, aafInstance));
- /*
- * END - AAF changes
- */
- JSONObject jauth = jfeed.optJSONObject("authorization");
- if (jauth == null) {
- continue;
- }
- JSONArray jeids = jauth.optJSONArray("endpoint_ids");
- if (jeids != null) {
- for (int ux = 0; ux < jeids.length(); ux++) {
- JSONObject ju = jeids.getJSONObject(ux);
- String login = gvas(ju, "id");
- String password = gvas(ju, "password");
- pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));
- }
- }
- JSONArray jeips = jauth.optJSONArray("endpoint_addrs");
- if (jeips != null) {
- for (int ix = 0; ix < jeips.length(); ix++) {
- String sn = gvas(jeips, ix);
- pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn));
- }
- }
- }
- }
- JSONArray jsubs = jcfg.optJSONArray("subscriptions");
- if (jsubs != null) {
- for (int sx = 0; sx < jsubs.length(); sx++) {
- JSONObject jsub = jsubs.getJSONObject(sx);
- if (jsub.optBoolean("suspend", false)) {
- continue;
- }
- String sid = gvas(jsub, "subid");
- String fid = gvas(jsub, "feedid");
- JSONObject jdel = jsub.getJSONObject("delivery");
- String delurl = gvas(jdel, "url");
- String id = gvas(jdel, "user");
- String password = gvas(jdel, "password");
- boolean monly = jsub.getBoolean("metadataOnly");
- boolean use100 = jdel.getBoolean("use100");
- boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
- boolean decompress = jsub.getBoolean("decompress");
- boolean followRedirect = jsub.getBoolean("follow_redirect");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, followRedirect, decompress));
- }
- }
- JSONObject jparams = jcfg.optJSONObject("parameters");
- if (jparams != null) {
- for (String pname : JSONObject.getNames(jparams)) {
- String pvalue = gvas(jparams, pname);
- if (pvalue != null) {
- ppv.add(new NodeConfig.ProvParam(pname, pvalue));
- }
- }
- String sfx = gvas(jparams, "PROV_DOMAIN");
- JSONArray jnodes = jparams.optJSONArray("NODES");
- if (jnodes != null) {
- for (int nx = 0; nx < jnodes.length(); nx++) {
- String nn = gvas(jnodes, nx);
- if (nn.indexOf('.') == -1) {
- nn = nn + "." + sfx;
- }
- pnv.add(new NodeConfig.ProvNode(nn));
- }
- }
- }
- JSONArray jingresses = jcfg.optJSONArray("ingress");
- if (jingresses != null) {
- for (int fx = 0; fx < jingresses.length(); fx++) {
- JSONObject jingress = jingresses.getJSONObject(fx);
- String fid = gvas(jingress, "feedid");
- String subnet = gvas(jingress, "subnet");
- String user = gvas(jingress, "user");
- String[] nodes = gvasa(jingress, "node");
- if (fid == null || "".equals(fid)) {
- continue;
- }
- if ("".equals(subnet)) {
- subnet = null;
- }
- if ("".equals(user)) {
- user = null;
- }
- pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes));
- }
- }
- JSONObject jegresses = jcfg.optJSONObject("egress");
- if (jegresses != null && JSONObject.getNames(jegresses) != null) {
- for (String esid : JSONObject.getNames(jegresses)) {
- String enode = gvas(jegresses, esid);
- if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {
- pfev.add(new NodeConfig.ProvForceEgress(esid, enode));
- }
- }
- }
- JSONArray jhops = jcfg.optJSONArray("routing");
- if (jhops != null) {
- for (int fx = 0; fx < jhops.length(); fx++) {
- JSONObject jhop = jhops.getJSONObject(fx);
- String from = gvas(jhop, "from");
- String to = gvas(jhop, "to");
- String via = gvas(jhop, "via");
- if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {
- continue;
- }
- phv.add(new NodeConfig.ProvHop(from, to, via));
- }
- }
+ reader.close();
+ addJSONFeeds(pfv, pfuv, pfsnv, jcfg);
+ addJSONSubs(psv, jcfg);
+ addJSONParams(pnv, ppv, jcfg);
+ addJSONRoutingInformation(pfiv, pfev, phv, jcfg);
} catch (JSONException jse) {
NodeUtils.setIpAndFqdnForEelf("ProvData");
eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString());
- eelfLogger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);
+ eelfLogger
+ .error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);
throw new IOException(jse.toString(), jse);
}
pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]);
@@ -266,66 +111,294 @@ public class ProvData {
ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]);
}
+ private static String[] gvasa(JSONObject object, String key) {
+ return (gvasa(object.opt(key)));
+ }
+
+ private static String[] gvasa(Object object) {
+ if (object instanceof JSONArray) {
+ JSONArray jsonArray = (JSONArray) object;
+ ArrayList<String> array = new ArrayList<>();
+ for (int i = 0; i < jsonArray.length(); i++) {
+ String string = gvas(jsonArray, i);
+ if (string != null) {
+ array.add(string);
+ }
+ }
+ return (array.toArray(new String[array.size()]));
+ } else {
+ String string = gvas(object);
+ if (string == null) {
+ return (new String[0]);
+ } else {
+ return (new String[]{string});
+ }
+ }
+ }
+
+ private static String gvas(JSONArray array, int index) {
+ return (gvas(array.get(index)));
+ }
+
+ private static String gvas(JSONObject object, String key) {
+ return (gvas(object.opt(key)));
+ }
+
+ private static String gvas(Object object) {
+ if (object instanceof Boolean || object instanceof Number || object instanceof String) {
+ return (object.toString());
+ }
+ return (null);
+ }
+
/**
- * Get the raw node configuration entries
+ * Get the raw node configuration entries.
*/
public NodeConfig.ProvNode[] getNodes() {
return (pn);
}
/**
- * Get the raw parameter configuration entries
+ * Get the raw parameter configuration entries.
*/
public NodeConfig.ProvParam[] getParams() {
return (pp);
}
/**
- * Ge the raw feed configuration entries
+ * Ge the raw feed configuration entries.
*/
public NodeConfig.ProvFeed[] getFeeds() {
return (pf);
}
/**
- * Get the raw feed user configuration entries
+ * Get the raw feed user configuration entries.
*/
public NodeConfig.ProvFeedUser[] getFeedUsers() {
return (pfu);
}
/**
- * Get the raw feed subnet configuration entries
+ * Get the raw feed subnet configuration entries.
*/
public NodeConfig.ProvFeedSubnet[] getFeedSubnets() {
return (pfsn);
}
/**
- * Get the raw subscription entries
+ * Get the raw subscription entries.
*/
public NodeConfig.ProvSubscription[] getSubscriptions() {
return (ps);
}
/**
- * Get the raw forced ingress entries
+ * Get the raw forced ingress entries.
*/
public NodeConfig.ProvForceIngress[] getForceIngress() {
return (pfi);
}
/**
- * Get the raw forced egress entries
+ * Get the raw forced egress entries.
*/
public NodeConfig.ProvForceEgress[] getForceEgress() {
return (pfe);
}
/**
- * Get the raw next hop entries
+ * Get the raw next hop entries.
*/
public NodeConfig.ProvHop[] getHops() {
return (ph);
}
+
+ @Nullable
+ private String getFeedStatus(JSONObject jfeed) {
+ String stat = null;
+ if (jfeed.optBoolean("suspend", false)) {
+ stat = "Feed is suspended";
+ }
+ if (jfeed.optBoolean("deleted", false)) {
+ stat = "Feed is deleted";
+ }
+ return stat;
+ }
+
+ private void addJSONFeeds(ArrayList<ProvFeed> pfv, ArrayList<ProvFeedUser> pfuv, ArrayList<ProvFeedSubnet> pfsnv,
+ JSONObject jcfg) {
+ JSONArray jfeeds = jcfg.optJSONArray("feeds");
+ if (jfeeds != null) {
+ for (int fx = 0; fx < jfeeds.length(); fx++) {
+ addJSONFeed(pfv, pfuv, pfsnv, jfeeds, fx);
+ }
+ }
+ }
+
+ private void addJSONFeed(ArrayList<ProvFeed> pfv, ArrayList<ProvFeedUser> pfuv, ArrayList<ProvFeedSubnet> pfsnv,
+ JSONArray jfeeds, int fx) {
+ JSONObject jfeed = jfeeds.getJSONObject(fx);
+ String stat = getFeedStatus(jfeed);
+ String fid = gvas(jfeed, FEED_ID);
+ String fname = gvas(jfeed, "name");
+ String fver = gvas(jfeed, "version");
+ String createdDate = gvas(jfeed, "created_date");
+ /*
+ * START - AAF changes
+ * TDP EPIC US# 307413
+ * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds
+ */
+ String aafInstance = gvas(jfeed, "aaf_instance");
+ pfv.add(new ProvFeed(fid, fname + "//" + fver, stat, createdDate, aafInstance));
+ /*
+ * END - AAF changes
+ */
+ addJSONFeedAuthArrays(pfuv, pfsnv, jfeed, fid);
+ }
+
+ private void addJSONFeedAuthArrays(ArrayList<ProvFeedUser> pfuv, ArrayList<ProvFeedSubnet> pfsnv, JSONObject jfeed,
+ String fid) {
+ JSONObject jauth = jfeed.optJSONObject("authorization");
+ if (jauth == null) {
+ return;
+ }
+ JSONArray jeids = jauth.optJSONArray("endpoint_ids");
+ if (jeids != null) {
+ for (int ux = 0; ux < jeids.length(); ux++) {
+ JSONObject ju = jeids.getJSONObject(ux);
+ String login = gvas(ju, "id");
+ String password = gvas(ju, "password");
+ pfuv.add(new ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));
+ }
+ }
+ JSONArray jeips = jauth.optJSONArray("endpoint_addrs");
+ if (jeips != null) {
+ for (int ix = 0; ix < jeips.length(); ix++) {
+ String sn = gvas(jeips, ix);
+ pfsnv.add(new ProvFeedSubnet(fid, sn));
+ }
+ }
+ }
+
+ private void addJSONSubs(ArrayList<ProvSubscription> psv, JSONObject jcfg) {
+ JSONArray jsubs = jcfg.optJSONArray("subscriptions");
+ if (jsubs != null) {
+ for (int sx = 0; sx < jsubs.length(); sx++) {
+ addJSONSub(psv, jsubs, sx);
+ }
+ }
+ }
+
+ private void addJSONSub(ArrayList<ProvSubscription> psv, JSONArray jsubs, int sx) {
+ JSONObject jsub = jsubs.getJSONObject(sx);
+ if (jsub.optBoolean("suspend", false)) {
+ return;
+ }
+ String sid = gvas(jsub, "subid");
+ String fid = gvas(jsub, FEED_ID);
+ JSONObject jdel = jsub.getJSONObject("delivery");
+ String delurl = gvas(jdel, "url");
+ String id = gvas(jdel, "user");
+ String password = gvas(jdel, "password");
+ boolean monly = jsub.getBoolean("metadataOnly");
+ boolean use100 = jdel.getBoolean("use100");
+ boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+ boolean decompress = jsub.getBoolean("decompress");
+ boolean followRedirect = jsub.getBoolean("follow_redirect");
+ psv.add(new ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100,
+ privilegedSubscriber, followRedirect, decompress));
+ }
+
+ private void addJSONParams(ArrayList<ProvNode> pnv, ArrayList<ProvParam> ppv, JSONObject jcfg) {
+ JSONObject jparams = jcfg.optJSONObject("parameters");
+ if (jparams != null) {
+ for (String pname : JSONObject.getNames(jparams)) {
+ addJSONParam(ppv, jparams, pname);
+ }
+ addJSONNodesToParams(pnv, jparams);
+ }
+ }
+
+ private void addJSONParam(ArrayList<ProvParam> ppv, JSONObject jparams, String pname) {
+ String pvalue = gvas(jparams, pname);
+ if (pvalue != null) {
+ ppv.add(new ProvParam(pname, pvalue));
+ }
+ }
+
+ private void addJSONNodesToParams(ArrayList<ProvNode> pnv, JSONObject jparams) {
+ String sfx = gvas(jparams, "PROV_DOMAIN");
+ JSONArray jnodes = jparams.optJSONArray("NODES");
+ if (jnodes != null) {
+ for (int nx = 0; nx < jnodes.length(); nx++) {
+ String nn = gvas(jnodes, nx);
+ if (nn == null) {
+ continue;
+ }
+ if (nn.indexOf('.') == -1) {
+ nn = nn + "." + sfx;
+ }
+ pnv.add(new ProvNode(nn));
+ }
+ }
+ }
+
+ private void addJSONRoutingInformation(ArrayList<ProvForceIngress> pfiv, ArrayList<ProvForceEgress> pfev,
+ ArrayList<ProvHop> phv, JSONObject jcfg) {
+ JSONArray jingresses = jcfg.optJSONArray("ingress");
+ if (jingresses != null) {
+ for (int fx = 0; fx < jingresses.length(); fx++) {
+ addJSONIngressRoute(pfiv, jingresses, fx);
+ }
+ }
+ JSONObject jegresses = jcfg.optJSONObject("egress");
+ if (jegresses != null && JSONObject.getNames(jegresses) != null) {
+ for (String esid : JSONObject.getNames(jegresses)) {
+ addJSONEgressRoute(pfev, jegresses, esid);
+ }
+ }
+ JSONArray jhops = jcfg.optJSONArray("routing");
+ if (jhops != null) {
+ for (int fx = 0; fx < jhops.length(); fx++) {
+ addJSONRoutes(phv, jhops, fx);
+ }
+ }
+ }
+
+ private void addJSONIngressRoute(ArrayList<ProvForceIngress> pfiv, JSONArray jingresses, int fx) {
+ JSONObject jingress = jingresses.getJSONObject(fx);
+ String fid = gvas(jingress, FEED_ID);
+ String subnet = gvas(jingress, "subnet");
+ String user = gvas(jingress, "user");
+ if (fid == null || "".equals(fid)) {
+ return;
+ }
+ if ("".equals(subnet)) {
+ subnet = null;
+ }
+ if ("".equals(user)) {
+ user = null;
+ }
+ String[] nodes = gvasa(jingress, "node");
+ pfiv.add(new ProvForceIngress(fid, subnet, user, nodes));
+ }
+
+ private void addJSONEgressRoute(ArrayList<ProvForceEgress> pfev, JSONObject jegresses, String esid) {
+ String enode = gvas(jegresses, esid);
+ if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {
+ pfev.add(new ProvForceEgress(esid, enode));
+ }
+ }
+
+ private void addJSONRoutes(ArrayList<ProvHop> phv, JSONArray jhops, int fx) {
+ JSONObject jhop = jhops.getJSONObject(fx);
+ String from = gvas(jhop, "from");
+ String to = gvas(jhop, "to");
+ String via = gvas(jhop, "via");
+ if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {
+ return;
+ }
+ phv.add(new ProvHop(from, to, via));
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
index 3d4908e8..d1d2abb3 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
@@ -25,14 +25,15 @@
package org.onap.dmaap.datarouter.node;
/**
- * Generate publish IDs
+ * Generate publish IDs.
*/
public class PublishId {
+
private long nextuid;
private String myname;
/**
- * Generate publish IDs for the specified name
+ * Generate publish IDs for the specified name.
*
* @param myname Unique identifier for this publish ID generator (usually fqdn of server)
*/
@@ -41,7 +42,8 @@ public class PublishId {
}
/**
- * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes.
+ * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log
+ * correlation purposes.
*/
public synchronized String next() {
long now = System.currentTimeMillis();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
index 42af8ca0..02704553 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
@@ -24,13 +24,15 @@
package org.onap.dmaap.datarouter.node;
-import java.util.*;
+import java.util.Timer;
+import java.util.TimerTask;
/**
- * Execute an operation no more frequently than a specified interval
+ * Execute an operation no more frequently than a specified interval.
*/
public abstract class RateLimitedOperation implements Runnable {
+
private boolean marked; // a timer task exists
private boolean executing; // the operation is currently in progress
private boolean remark; // a request was made while the operation was in progress
@@ -39,33 +41,19 @@ public abstract class RateLimitedOperation implements Runnable {
private long mininterval;
/**
- * Create a rate limited operation
+ * Create a rate limited operation.
*
- * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin
- * @param timer The timer used to perform deferred executions
+ * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can
+ * begin
+ * @param timer The timer used to perform deferred executions
*/
public RateLimitedOperation(long mininterval, Timer timer) {
this.timer = timer;
this.mininterval = mininterval;
}
- private class deferred extends TimerTask {
- public void run() {
- execute();
- }
- }
-
- private synchronized void unmark() {
- marked = false;
- }
-
- private void execute() {
- unmark();
- request();
- }
-
/**
- * Request that the operation be performed by this thread or at a later time by the timer
+ * Request that the operation be performed by this thread or at a later time by the timer.
*/
public void request() {
if (premark()) {
@@ -73,7 +61,8 @@ public abstract class RateLimitedOperation implements Runnable {
}
do {
run();
- } while (demark());
+ }
+ while (demark());
}
private synchronized boolean premark() {
@@ -90,7 +79,7 @@ public abstract class RateLimitedOperation implements Runnable {
if (last + mininterval > now) {
// too soon - schedule a timer
marked = true;
- timer.schedule(new deferred(), last + mininterval - now);
+ timer.schedule(new Deferred(), last + mininterval - now);
return (true);
}
last = now;
@@ -107,4 +96,20 @@ public abstract class RateLimitedOperation implements Runnable {
}
return (false);
}
+
+ private class Deferred extends TimerTask {
+
+ public void run() {
+ execute();
+ }
+
+ private void execute() {
+ unmark();
+ request();
+ }
+
+ private synchronized void unmark() {
+ marked = false;
+ }
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
index 7e4078f8..b4a3f0a7 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
@@ -24,101 +24,100 @@
package org.onap.dmaap.datarouter.node;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.io.BufferedReader;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStream;
-import java.util.Hashtable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Timer;
/**
- * Track redirections of subscriptions
+ * Track redirections of subscriptions.
*/
-public class RedirManager {
+class RedirManager {
- private Hashtable<String, String> sid2primary = new Hashtable<String, String>();
- private Hashtable<String, String> sid2secondary = new Hashtable<String, String>();
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(RedirManager.class);
+ private RateLimitedOperation op;
+ private HashMap<String, String> sid2primary = new HashMap<>();
+ private HashMap<String, String> sid2secondary = new HashMap<>();
private String redirfile;
- RateLimitedOperation op;
/**
* Create a mechanism for maintaining subscription redirections.
*
* @param redirfile The file to store the redirection information.
- * @param mininterval The minimum number of milliseconds between writes to the redirection
- * information file.
+ * @param mininterval The minimum number of milliseconds between writes to the redirection information file.
* @param timer The timer thread used to run delayed file writes.
*/
- public RedirManager(String redirfile, long mininterval, Timer timer) {
+ RedirManager(String redirfile, long mininterval, Timer timer) {
this.redirfile = redirfile;
op = new RateLimitedOperation(mininterval, timer) {
public void run() {
try {
- StringBuffer sb = new StringBuffer();
- for (String s : sid2primary.keySet()) {
- sb.append(s).append(' ').append(sid2primary.get(s)).append(' ')
- .append(sid2secondary.get(s)).append('\n');
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : sid2primary.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ sb.append(key).append(' ').append(value).append(' ')
+ .append(sid2secondary.get(key)).append('\n');
}
try (OutputStream os = new FileOutputStream(RedirManager.this.redirfile)) {
os.write(sb.toString().getBytes());
}
} catch (Exception e) {
+ eelfLogger.error("Exception", e);
}
}
};
try {
- String s;
+ String line;
try (BufferedReader br = new BufferedReader(new FileReader(redirfile))) {
- while ((s = br.readLine()) != null) {
- s = s.trim();
- String[] sx = s.split(" ");
- if (s.startsWith("#") || sx.length != 3) {
- continue;
- }
- sid2primary.put(sx[0], sx[1]);
- sid2secondary.put(sx[0], sx[2]);
+ while ((line = br.readLine()) != null) {
+ addSubRedirInfo(line);
}
}
} catch (Exception e) {
- // missing file is normal
+ eelfLogger.debug("Missing file is normal", e);
}
}
/**
- * Set up redirection. If a request is to be sent to subscription ID sid, and that is
- * configured to go to URL primary, instead, go to secondary.
+ * Set up redirection. If a request is to be sent to subscription ID sid, and that is configured to go to URL
+ * primary, instead, go to secondary.
*
* @param sid The subscription ID to be redirected
* @param primary The URL associated with that subscription ID
* @param secondary The replacement URL to use instead
*/
- public synchronized void redirect(String sid, String primary, String secondary) {
+ synchronized void redirect(String sid, String primary, String secondary) {
sid2primary.put(sid, primary);
sid2secondary.put(sid, secondary);
op.request();
}
/**
- * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its
- * primary URL.
+ * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its primary URL.
*
* @param sid The subscription ID to remove from the table.
*/
- public synchronized void forget(String sid) {
+ synchronized void forget(String sid) {
sid2primary.remove(sid);
sid2secondary.remove(sid);
op.request();
}
/**
- * Look up where to send a subscription. If the primary has changed or there is no redirection,
- * use the primary. Otherwise, redirect to the secondary URL.
+ * Look up where to send a subscription. If the primary has changed or there is no redirection, use the primary.
+ * Otherwise, redirect to the secondary URL.
*
* @param sid The subscription ID to look up.
* @param primary The configured primary URL.
* @return The destination URL to really use.
*/
- public synchronized String lookup(String sid, String primary) {
+ synchronized String lookup(String sid, String primary) {
String oprim = sid2primary.get(sid);
if (primary.equals(oprim)) {
return (sid2secondary.get(sid));
@@ -129,9 +128,19 @@ public class RedirManager {
}
/**
- * Is a subscription redirected?
+ * Is a subscription redirected.
*/
- public synchronized boolean isRedirected(String sid) {
+ synchronized boolean isRedirected(String sid) {
return (sid != null && sid2secondary.get(sid) != null);
}
+
+ private void addSubRedirInfo(String subRedirInfo) {
+ subRedirInfo = subRedirInfo.trim();
+ String[] sx = subRedirInfo.split(" ");
+ if (subRedirInfo.startsWith("#") || sx.length != 3) {
+ return;
+ }
+ sid2primary.put(sx[0], sx[1]);
+ sid2secondary.put(sx[0], sx[2]);
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
index c8a7bd0c..53e53145 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
@@ -23,19 +23,29 @@
package org.onap.dmaap.datarouter.node;
-import java.util.regex.*;
-import java.util.*;
-import java.io.*;
-import java.nio.file.*;
-import java.text.*;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
- * Logging for data router delivery events (PUB/DEL/EXP)
+ * Logging for data router delivery events (PUB/DEL/EXP).
*/
public class StatusLog {
+
+ private static final String EXCEPTION = "Exception";
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(StatusLog.class);
private static StatusLog instance = new StatusLog();
- private HashSet<String> toship = new HashSet<String>();
- private SimpleDateFormat filedate;
+ private SimpleDateFormat filedate = new SimpleDateFormat("-yyyyMMddHHmm");
+
private String prefix = "logs/events";
private String suffix = ".log";
private String plainfile;
@@ -43,89 +53,81 @@ public class StatusLog {
private long nexttime;
private OutputStream os;
private long intvl;
- private NodeConfigManager config = NodeConfigManager.getInstance();
+ private static NodeConfigManager config = NodeConfigManager.getInstance();
- {
- try {
- filedate = new SimpleDateFormat("-yyyyMMddHHmm");
- } catch (Exception e) {
- }
+ private StatusLog() {
}
/**
- * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are specified, assume seconds.
+ * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are
+ * specified, assume seconds.
*/
public static long parseInterval(String interval, int def) {
try {
- Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval);
- if (m.matches()) {
- int dur = 0;
- String x = m.group(1);
- if (x != null) {
- dur += 3600 * Integer.parseInt(x);
- }
- x = m.group(2);
- if (x != null) {
- dur += 60 * Integer.parseInt(x);
- }
- x = m.group(3);
- if (x != null) {
- dur += Integer.parseInt(x);
- }
- if (dur < 60) {
- dur = 60;
- }
+ Matcher matcher = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval);
+ if (matcher.matches()) {
+ int dur = getDur(matcher);
int best = 86400;
int dist = best - dur;
if (dur > best) {
dist = dur - best;
}
- int base = 1;
- for (int i = 0; i < 8; i++) {
- int base2 = base;
- base *= 2;
- for (int j = 0; j < 4; j++) {
- int base3 = base2;
- base2 *= 3;
- for (int k = 0; k < 3; k++) {
- int cur = base3;
- base3 *= 5;
- int ndist = cur - dur;
- if (dur > cur) {
- ndist = dur - cur;
- }
- if (ndist < dist) {
- best = cur;
- dist = ndist;
- }
- }
- }
- }
+ best = getBest(dur, best, dist);
def = best * 1000;
}
} catch (Exception e) {
+ eelfLogger.error(EXCEPTION, e);
}
return (def);
}
- private synchronized void checkRoll(long now) throws IOException {
- if (now >= nexttime) {
- if (os != null) {
- os.close();
- os = null;
+ private static int getBest(int dur, int best, int dist) {
+ int base = 1;
+ for (int i = 0; i < 8; i++) {
+ int base2 = base;
+ base *= 2;
+ for (int j = 0; j < 4; j++) {
+ int base3 = base2;
+ base2 *= 3;
+ for (int k = 0; k < 3; k++) {
+ int cur = base3;
+ base3 *= 5;
+ int ndist = cur - dur;
+ if (dur > cur) {
+ ndist = dur - cur;
+ }
+ if (ndist < dist) {
+ best = cur;
+ dist = ndist;
+ }
+ }
}
- intvl = parseInterval(config.getEventLogInterval(), 300000);
- prefix = config.getEventLogPrefix();
- suffix = config.getEventLogSuffix();
- nexttime = now - now % intvl + intvl;
- curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;
- plainfile = prefix + suffix;
- notify();
}
+ return best;
+ }
+
+ private static int getDur(Matcher matcher) {
+ int dur = 0;
+ String match = matcher.group(1);
+ if (match != null) {
+ dur += 3600 * Integer.parseInt(match);
+ }
+ match = matcher.group(2);
+ if (match != null) {
+ dur += 60 * Integer.parseInt(match);
+ }
+ match = matcher.group(3);
+ if (match != null) {
+ dur += Integer.parseInt(match);
+ }
+ if (dur < 60) {
+ dur = 60;
+ }
+ return dur;
}
/**
- * Get the name of the current log file
+ * Get the name of the current log file.
*
* @return The full path name of the current event log file
*/
@@ -133,109 +135,107 @@ public class StatusLog {
try {
instance.checkRoll(System.currentTimeMillis());
} catch (Exception e) {
+ eelfLogger.error(EXCEPTION, e);
}
return (instance.curfile);
}
- private synchronized void log(String s) {
- try {
- long now = System.currentTimeMillis();
- checkRoll(now);
- if (os == null) {
- os = new FileOutputStream(curfile, true);
- (new File(plainfile)).delete();
- Files.createLink(Paths.get(plainfile), Paths.get(curfile));
- }
- os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes());
- os.flush();
- } catch (IOException ioe) {
- }
- }
-
/**
* Log a received publication attempt.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed id given by the publisher
* @param requrl The URL of the received request
* @param method The method (DELETE or PUT) in the received request
- * @param ctype The content type (if method is PUT and clen > 0)
- * @param clen The content length (if method is PUT)
- * @param srcip The IP address of the publisher
- * @param user The identity of the publisher
+ * @param ctype The content type (if method is PUT and clen > 0)
+ * @param clen The content length (if method is PUT)
+ * @param srcip The IP address of the publisher
+ * @param user The identity of the publisher
* @param status The status returned to the publisher
*/
- public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) {
- instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status);
+ public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen,
+ String srcip, String user, int status) {
+ instance.log(
+ "PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip
+ + "|" + user + "|" + status);
}
/**
- * Log a data transfer error receiving a publication attempt
+ * Log a data transfer error receiving a publication attempt.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed id given by the publisher
* @param requrl The URL of the received request
* @param method The method (DELETE or PUT) in the received request
- * @param ctype The content type (if method is PUT and clen > 0)
- * @param clen The expected content length (if method is PUT)
- * @param rcvd The content length received
- * @param srcip The IP address of the publisher
- * @param user The identity of the publisher
- * @param error The error message from the IO exception
+ * @param ctype The content type (if method is PUT and clen > 0)
+ * @param clen The expected content length (if method is PUT)
+ * @param rcvd The content length received
+ * @param srcip The IP address of the publisher
+ * @param user The identity of the publisher
+ * @param error The error message from the IO exception
*/
- public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) {
- instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error);
+ public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen,
+ long rcvd, String srcip, String user, String error) {
+ instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd
+ + "|" + srcip + "|" + user + "|" + error);
}
/**
* Log a delivery attempt.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed ID
- * @param subid The (space delimited list of) subscription ID
+ * @param subid The (space delimited list of) subscription ID
* @param requrl The URL used in the attempt
* @param method The method (DELETE or PUT) in the attempt
- * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
- * @param clen The content length (if PUT and not metaonly)
- * @param user The identity given to the subscriber
+ * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
+ * @param clen The content length (if PUT and not metaonly)
+ * @param user The identity given to the subscriber
* @param status The status returned by the subscriber or -1 if an exeception occured trying to connect
* @param xpubid The publish ID returned by the subscriber
*/
- public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) {
+ public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype,
+ long clen, String user, int status, String xpubid) {
if (feedid == null) {
return;
}
- instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid);
+ instance.log(
+ "DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen
+ + "|" + user + "|" + status + "|" + xpubid);
}
/**
- * Log delivery attempts expired
+ * Log delivery attempts expired.
*
- * @param pubid The publish ID assigned by the node
- * @param feedid The feed ID
- * @param subid The (space delimited list of) subscription ID
- * @param requrl The URL that would be delivered to
- * @param method The method (DELETE or PUT) in the request
- * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
- * @param clen The content length (if PUT and not metaonly)
- * @param reason The reason the attempts were discontinued
+ * @param pubid The publish ID assigned by the node
+ * @param feedid The feed ID
+ * @param subid The (space delimited list of) subscription ID
+ * @param requrl The URL that would be delivered to
+ * @param method The method (DELETE or PUT) in the request
+ * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
+ * @param clen The content length (if PUT and not metaonly)
+ * @param reason The reason the attempts were discontinued
* @param attempts The number of attempts made
*/
- public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) {
+ public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype,
+ long clen, String reason, int attempts) {
if (feedid == null) {
return;
}
- instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts);
+ instance.log(
+ "EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen
+ + "|" + reason + "|" + attempts);
}
/**
* Log extra statistics about unsuccessful delivery attempts.
*
- * @param pubid The publish ID assigned by the node
+ * @param pubid The publish ID assigned by the node
* @param feedid The feed ID
- * @param subid The (space delimited list of) subscription ID
- * @param clen The content length
- * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred.
+ * @param subid The (space delimited list of) subscription ID
+ * @param clen The content length
+ * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the
+ * number of bytes sent before an error occurred.
*/
public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) {
if (feedid == null) {
@@ -244,6 +244,35 @@ public class StatusLog {
instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent);
}
- private StatusLog() {
+ private synchronized void checkRoll(long now) throws IOException {
+ if (now >= nexttime) {
+ if (os != null) {
+ os.close();
+ os = null;
+ }
+ intvl = parseInterval(config.getEventLogInterval(), 300000);
+ prefix = config.getEventLogPrefix();
+ suffix = config.getEventLogSuffix();
+ nexttime = now - now % intvl + intvl;
+ curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;
+ plainfile = prefix + suffix;
+ notify();
+ }
+ }
+
+ private synchronized void log(String string) {
+ try {
+ long now = System.currentTimeMillis();
+ checkRoll(now);
+ if (os == null) {
+ os = new FileOutputStream(curfile, true);
+ (new File(plainfile)).delete();
+ Files.createLink(Paths.get(plainfile), Paths.get(curfile));
+ }
+ os.write((NodeUtils.logts(new Date(now)) + '|' + string + '\n').getBytes());
+ os.flush();
+ } catch (IOException ioe) {
+ eelfLogger.error("IOException", ioe);
+ }
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
index 6f74df48..2f510120 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
@@ -25,33 +25,34 @@
package org.onap.dmaap.datarouter.node;
/**
- * Compare IP addresses as byte arrays to a subnet specified as a CIDR
+ * Compare IP addresses as byte arrays to a subnet specified as a CIDR.
*/
public class SubnetMatcher {
+
private byte[] sn;
private int len;
private int mask;
/**
- * Construct a subnet matcher given a CIDR
+ * Construct a subnet matcher given a CIDR.
*
* @param subnet The CIDR to match
*/
public SubnetMatcher(String subnet) {
- int i = subnet.lastIndexOf('/');
- if (i == -1) {
+ int index = subnet.lastIndexOf('/');
+ if (index == -1) {
sn = NodeUtils.getInetAddress(subnet);
len = sn.length;
} else {
- len = Integer.parseInt(subnet.substring(i + 1));
- sn = NodeUtils.getInetAddress(subnet.substring(0, i));
+ len = Integer.parseInt(subnet.substring(index + 1));
+ sn = NodeUtils.getInetAddress(subnet.substring(0, index));
mask = ((0xff00) >> (len % 8)) & 0xff;
len /= 8;
}
}
/**
- * Is the IP address in the CIDR?
+ * Is the IP address in the CIDR.
*
* @param addr the IP address as bytes in network byte order
* @return true if the IP address matches.
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
index eb10876e..475c876c 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
@@ -25,17 +25,18 @@
package org.onap.dmaap.datarouter.node;
/**
- * A destination to deliver a message
+ * A destination to deliver a message.
*/
public class Target {
+
private DestInfo destinfo;
private String routing;
/**
- * A destination to deliver a message
+ * A destination to deliver a message.
*
* @param destinfo Either info for a subscription ID or info for a node-to-node transfer
- * @param routing For a node-to-node transfer, what to do when it gets there.
+ * @param routing For a node-to-node transfer, what to do when it gets there.
*/
public Target(DestInfo destinfo, String routing) {
this.destinfo = destinfo;
@@ -43,21 +44,21 @@ public class Target {
}
/**
- * Add additional routing
+ * Add additional routing.
*/
public void addRouting(String routing) {
this.routing = this.routing + " " + routing;
}
/**
- * Get the destination information for this target
+ * Get the destination information for this target.
*/
public DestInfo getDestInfo() {
return (destinfo);
}
/**
- * Get the next hop information for this target
+ * Get the next hop information for this target.
*/
public String getRouting() {
return (routing);
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
index 33e4f801..a77277f2 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
@@ -24,63 +24,54 @@
package org.onap.dmaap.datarouter.node;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Iterator;
/**
- * Manage a list of tasks to be executed when an event occurs.
- * This makes the following guarantees:
+ * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees:
* <ul>
* <li>Tasks can be safely added and removed in the middle of a run.</li>
* <li>No task will be returned more than once during a run.</li>
* <li>No task will be returned when it is not, at that moment, in the list of tasks.</li>
* <li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li>
- * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is called.
+ * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is
+ * called.
* </ul>
*/
-public class TaskList {
+class TaskList {
+
private Iterator<Runnable> runlist;
- private HashSet<Runnable> tasks = new HashSet<Runnable>();
+ private HashSet<Runnable> tasks = new HashSet<>();
private HashSet<Runnable> togo;
private HashSet<Runnable> sofar;
private HashSet<Runnable> added;
private HashSet<Runnable> removed;
/**
- * Construct a new TaskList
- */
- public TaskList() {
- }
-
- /**
* Start executing the sequence of tasks.
*/
- public synchronized void startRun() {
- sofar = new HashSet<Runnable>();
- added = new HashSet<Runnable>();
- removed = new HashSet<Runnable>();
- togo = new HashSet<Runnable>(tasks);
+ synchronized void startRun() {
+ sofar = new HashSet<>();
+ added = new HashSet<>();
+ removed = new HashSet<>();
+ togo = new HashSet<>(tasks);
runlist = togo.iterator();
}
/**
- * Get the next task to execute
+ * Get the next task to execute.
*/
- public synchronized Runnable next() {
+ synchronized Runnable next() {
while (runlist != null) {
if (runlist.hasNext()) {
Runnable task = runlist.next();
- if (removed.contains(task)) {
- continue;
+ if (addTaskToSoFar(task)) {
+ return task;
}
- if (sofar.contains(task)) {
- continue;
- }
- sofar.add(task);
- return (task);
}
- if (added.size() != 0) {
+ if (!added.isEmpty()) {
togo = added;
- added = new HashSet<Runnable>();
+ added = new HashSet<>();
removed.clear();
runlist = togo.iterator();
continue;
@@ -97,7 +88,7 @@ public class TaskList {
/**
* Add a task to the list of tasks to run whenever the event occurs.
*/
- public synchronized void addTask(Runnable task) {
+ synchronized void addTask(Runnable task) {
if (runlist != null) {
added.add(task);
removed.remove(task);
@@ -108,11 +99,22 @@ public class TaskList {
/**
* Remove a task from the list of tasks to run whenever the event occurs.
*/
- public synchronized void removeTask(Runnable task) {
+ synchronized void removeTask(Runnable task) {
if (runlist != null) {
removed.add(task);
added.remove(task);
}
tasks.remove(task);
}
+
+ private boolean addTaskToSoFar(Runnable task) {
+ if (removed.contains(task)) {
+ return false;
+ }
+ if (sofar.contains(task)) {
+ return false;
+ }
+ sofar.add(task);
+ return true;
+ }
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/AuditFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/AuditFilter.java
new file mode 100644
index 00000000..a278c2e3
--- /dev/null
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/AuditFilter.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.datarouter.node.eelf;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.spi.FilterReply;
+
+
+public class AuditFilter extends Filter<ILoggingEvent> {
+ @Override
+ public FilterReply decide(ILoggingEvent event) {
+ if (event.getMessage().contains("DEL|") || event.getMessage().contains("PUB|") || event.getMessage().contains("PBF|")
+ || event.getMessage().contains("EXP|") || event.getMessage().contains("DLX|")) {
+ return FilterReply.ACCEPT;
+ } else {
+ return FilterReply.DENY;
+ }
+ }
+}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java
deleted file mode 100644
index b733e7e4..00000000
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START==================================================
- * * org.onap.dmaap
- * * ===========================================================================
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * * ===========================================================================
- * * Licensed under the Apache License, Version 2.0 (the "License");
- * * you may not use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * * ============LICENSE_END====================================================
- * *
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * *
- ******************************************************************************/
-package org.onap.dmaap.datarouter.node.eelf;
-
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.filter.Filter;
-import ch.qos.logback.core.spi.FilterReply;
-
-/*
- * When EELF functionality added it default started logging Jetty logs as well which in turn stopped existing functionality of logging jetty statements in node.log
- * added code in logback.xml to add jetty statements in node.log.
- * This class removes extran EELF statements from node.log since they are being logged in apicalls.log
- */
-public class EELFFilter extends Filter<ILoggingEvent> {
- @Override
- public FilterReply decide(ILoggingEvent event) {
- if (event.getMessage().contains("EELF")) {
- return FilterReply.DENY;
- } else {
- return FilterReply.ACCEPT;
- }
- }
-}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/JettyFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/JettyFilter.java
new file mode 100644
index 00000000..69f51d82
--- /dev/null
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/JettyFilter.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.datarouter.node.eelf;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.spi.FilterReply;
+
+
+public class JettyFilter extends Filter<ILoggingEvent> {
+ @Override
+ public FilterReply decide(ILoggingEvent event) {
+ if (event.getMessage().contains("org.eclipse.jetty")) {
+ return FilterReply.ACCEPT;
+ } else {
+ return FilterReply.DENY;
+ }
+ }
+}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/MetricsFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/MetricsFilter.java
new file mode 100644
index 00000000..0fa57d4a
--- /dev/null
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/MetricsFilter.java
@@ -0,0 +1,42 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.datarouter.node.eelf;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.spi.FilterReply;
+
+
+public class MetricsFilter extends Filter<ILoggingEvent> {
+ @Override
+ public FilterReply decide(ILoggingEvent event) {
+ if (event.getLevel().equals(Level.INFO) && !event.getMessage().contains("jetty")) {
+ if (!event.getMessage().contains("DEL|") && !event.getMessage().contains("PUB|") && !event.getMessage().contains(
+ "PBF|") && !event.getMessage().contains("EXP|") && !event.getMessage().contains("DLX|")) {
+ return FilterReply.ACCEPT;
+ }
+ } else {
+ return FilterReply.DENY;
+ }
+ return FilterReply.DENY;
+ }
+}