diff options
Diffstat (limited to 'datarouter-node/src/main/java/org')
24 files changed, 2121 insertions, 1762 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java index 991d8660..245dbccd 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java @@ -17,23 +17,24 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ + package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.onap.aaf.cadi.PropAccess; -import org.onap.aaf.cadi.filter.CadiFilter; - +import java.io.IOException; import javax.servlet.FilterChain; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.io.IOException; +import org.onap.aaf.cadi.PropAccess; +import org.onap.aaf.cadi.filter.CadiFilter; public class DRNodeCadiFilter extends CadiFilter { + private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeServlet.class); DRNodeCadiFilter(boolean init, PropAccess access) throws ServletException { @@ -41,23 +42,16 @@ public class DRNodeCadiFilter extends CadiFilter { } @Override - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { HttpServletRequest httpRequest = (HttpServletRequest) request; String path = httpRequest.getPathInfo(); if (!(path.startsWith("/internal"))) { - if (!(httpRequest.getMethod().equalsIgnoreCase("POST"))) { - if (httpRequest.getMethod().equalsIgnoreCase("DELETE") && path.startsWith("/delete")) { + if (!("POST".equalsIgnoreCase(httpRequest.getMethod()))) { + if ("DELETE".equalsIgnoreCase(httpRequest.getMethod()) && path.startsWith("/delete")) { chain.doFilter(request, response); } else { - String feedId = getFeedId(request, response); - String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId); - if (aafDbInstance != null && !aafDbInstance.equals("") && !aafDbInstance.equalsIgnoreCase("legacy")) { - logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance); - super.doFilter(request, response, chain); - } else { - logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed"); - chain.doFilter(request, response); - } + doFilterWithFeedId(request, response, chain); } } } else { @@ -72,7 +66,8 @@ public class DRNodeCadiFilter extends CadiFilter { if (fileid == null) { logger.error("NODE0105 Rejecting bad URI for PUT " + req.getPathInfo() + " from " + req.getRemoteAddr()); try { - resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, + "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); } catch (IOException e) { logger.error("NODE0541 DRNodeCadiFilter.getFeedId: ", e); } @@ -82,19 +77,34 @@ public class DRNodeCadiFilter extends CadiFilter { if (fileid.startsWith("/publish/")) { fileid = fileid.substring(9); - int i = fileid.indexOf('/'); - if (i == -1 || i == fileid.length() - 1) { - logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + int index = fileid.indexOf('/'); + if (index == -1 || index == fileid.length() - 1) { + logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); try { - resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid."); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, + "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. " + + "Possible missing fileid."); } catch (IOException e) { logger.error("NODE0542 DRNodeCadiFilter.getFeedId: ", e); } return null; } - feedid = fileid.substring(0, i); + feedid = fileid.substring(0, index); } return feedid; } + private void doFilterWithFeedId(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + String feedId = getFeedId(request, response); + String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId); + if (aafDbInstance != null && !"".equals(aafDbInstance) && !"legacy".equalsIgnoreCase(aafDbInstance)) { + logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance); + super.doFilter(request, response, chain); + } else { + logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed"); + chain.doFilter(request, response); + } + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java index 4c21b342..df73c1e9 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java @@ -23,85 +23,37 @@ package org.onap.dmaap.datarouter.node; -import java.util.*; -import java.io.*; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.Objects; /** * Main control point for delivering files to destinations. - * <p> - * The Delivery class manages assignment of delivery threads to delivery - * queues and creation and destruction of delivery queues as - * configuration changes. DeliveryQueues are assigned threads based on a - * modified round-robin approach giving priority to queues with more work - * as measured by both bytes to deliver and files to deliver and lower - * priority to queues that already have delivery threads working. - * A delivery thread continues to work for a delivery queue as long as - * that queue has more files to deliver. + * + * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of + * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin + * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower + * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery + * queue as long as that queue has more files to deliver. */ public class Delivery { - private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class); - - private static class DelItem implements Comparable<DelItem> { - private String pubid; - private String spool; - - public int compareTo(DelItem x) { - int i = pubid.compareTo(x.pubid); - if (i == 0) { - i = spool.compareTo(x.spool); - } - return (i); - } - - public String getPublishId() { - return (pubid); - } - - public String getSpool() { - return (spool); - } - - public DelItem(String pubid, String spool) { - this.pubid = pubid; - this.spool = spool; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DelItem delItem = (DelItem) o; - return Objects.equals(pubid, delItem.pubid) && - Objects.equals(getSpool(), delItem.getSpool()); - } - - @Override - public int hashCode() { - return Objects.hash(pubid, getSpool()); - } - } + private static final String TOTAL = " total="; + private static final String YELLOW = " yellow="; + private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class); private double fdstart; private double fdstop; private int threads; private int curthreads; private NodeConfigManager config; - private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>(); + private Hashtable<String, DeliveryQueue> dqs = new Hashtable<>(); private DeliveryQueue[] queues = new DeliveryQueue[0]; private int qpos = 0; private long nextcheck; - private Runnable cmon = new Runnable() { - public void run() { - checkconfig(); - } - }; /** * Constructs a new Delivery system using the specified configuration manager. @@ -110,10 +62,37 @@ public class Delivery { */ public Delivery(NodeConfigManager config) { this.config = config; + Runnable cmon = this::checkconfig; config.registerConfigTask(cmon); checkconfig(); } + /** + * Reset the retry timer for a delivery queue. + */ + public synchronized void resetQueue(String spool) { + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + dq.resetQueue(); + } + } + } + + /** + * Mark the task in spool a success. + */ + public synchronized boolean markTaskSuccess(String spool, String pubId) { + boolean succeeded = false; + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + succeeded = dq.markTaskSuccess(pubId); + } + } + return succeeded; + } + private void cleardir(String dir) { if (dqs.get(dir) != null) { return; @@ -131,12 +110,11 @@ public class Delivery { File spoolfile = new File(config.getSpoolBase()); long tspace = spoolfile.getTotalSpace(); long start = (long) (tspace * fdstart); - long stop = (long) (tspace * fdstop); long cur = spoolfile.getUsableSpace(); if (cur >= start) { return; } - Vector<DelItem> cv = new Vector<DelItem>(); + ArrayList<DelItem> cv = new ArrayList<>(); for (String sdir : dqs.keySet()) { for (String meta : (new File(sdir)).list()) { if (!meta.endsWith(".M") || meta.charAt(0) == '.') { @@ -147,27 +125,21 @@ public class Delivery { } DelItem[] items = cv.toArray(new DelItem[cv.size()]); Arrays.sort(items); - logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace); - for (DelItem item : items) { - long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); - logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk"); - if (amount > 0) { - cur += amount; - if (cur >= stop) { - cur = spoolfile.getUsableSpace(); - } - if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); - return; - } - } + long stop = (long) (tspace * fdstop); + logger.info( + "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace); + if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) { + return; } cur = spoolfile.getUsableSpace(); if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); + logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop + + TOTAL + tspace); return; } - logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace); + logger.warn( + "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW + + stop + TOTAL + tspace); } private void cleardirs() { @@ -206,7 +178,7 @@ public class Delivery { DestInfo[] alldis = config.getAllDests(); DeliveryQueue[] nqs = new DeliveryQueue[alldis.length]; qpos = 0; - Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>(); + Hashtable<String, DeliveryQueue> ndqs = new Hashtable<>(); for (DestInfo di : alldis) { String spl = di.getSpool(); DeliveryQueue dq = dqs.get(spl); @@ -223,11 +195,8 @@ public class Delivery { cleardirs(); while (curthreads < threads) { curthreads++; - (new Thread() { - { - setName("Delivery Thread"); - } - + (new Thread("Delivery Thread") { + @Override public void run() { dodelivery(); } @@ -276,29 +245,69 @@ public class Delivery { } } - /** - * Reset the retry timer for a delivery queue - */ - public synchronized void resetQueue(String spool) { - if (spool != null) { - DeliveryQueue dq = dqs.get(spool); - if (dq != null) { - dq.resetQueue(); + private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) { + for (DelItem item : items) { + long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); + logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + + " to free up disk"); + if (amount > 0) { + cur += amount; + if (cur >= stop) { + cur = spoolfile.getUsableSpace(); + } + if (cur >= stop) { + logger.info( + "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop + + TOTAL + tspace); + return true; + } } } + return false; } - /** - * Mark the task in spool a success - */ - public synchronized boolean markTaskSuccess(String spool, String pubId) { - boolean succeeded = false; - if (spool != null) { - DeliveryQueue dq = dqs.get(spool); - if (dq != null) { - succeeded = dq.markTaskSuccess(pubId); + private static class DelItem implements Comparable<DelItem> { + + private String pubid; + private String spool; + + public DelItem(String pubid, String spool) { + this.pubid = pubid; + this.spool = spool; + } + + public int compareTo(DelItem other) { + int diff = pubid.compareTo(other.pubid); + if (diff == 0) { + diff = spool.compareTo(other.spool); } + return (diff); + } + + public String getPublishId() { + return (pubid); + } + + public String getSpool() { + return (spool); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + DelItem delItem = (DelItem) object; + return Objects.equals(pubid, delItem.pubid) + && Objects.equals(getSpool(), delItem.getSpool()); + } + + @Override + public int hashCode() { + return Objects.hash(pubid, getSpool()); } - return succeeded; } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java index 3d485878..8cdafd6f 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -26,46 +26,41 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import java.io.*; -import java.util.*; +import java.io.File; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.Vector; +import org.jetbrains.annotations.Nullable; /** * Mechanism for monitoring and controlling delivery of files to a destination. - * <p> - * The DeliveryQueue class maintains lists of DeliveryTasks for a single - * destination (a subscription or another data router node) and assigns - * delivery threads to try to deliver them. It also maintains a delivery - * status that causes it to back off on delivery attempts after a failure. - * <p> - * If the most recent delivery result was a failure, then no more attempts - * will be made for a period of time. Initially, and on the first failure - * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds). - * If, after this delay, additional failures occur, each failure will - * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a - * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer(). - * Note that this behavior applies to the delivery queue as a whole and not - * to individual files in the queue. If multiple files are being - * delivered and one fails, the delay will be started. If a second - * delivery fails while the delay was active, it will not change the delay - * or change the duration of any subsequent delay. - * If, however, it succeeds, it will cancel the delay. - * <p> - * The queue maintains 3 collections of files to deliver: A todo list of - * files that will be attempted, a working set of files that are being - * attempted, and a retry set of files that were attempted and failed. - * Whenever the todo list is empty and needs to be refilled, a scan of the - * spool directory is made and the file names sorted. Any files in the working set are ignored. - * If a DeliveryTask for the file is in the retry set, then that delivery - * task is placed on the todo list. Otherwise, a new DeliveryTask for the - * file is created and placed on the todo list. - * If, when a DeliveryTask is about to be removed from the todo list, its - * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead - * marked as expired. - * <p> - * A delivery queue also maintains a skip flag. This flag is true if the - * failure timer is active or if no files are found in a directory scan. + * + * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single destination (a subscription or another data + * router node) and assigns delivery threads to try to deliver them. It also maintains a delivery status that causes it + * to back off on delivery attempts after a failure. + * + * <p>If the most recent delivery result was a failure, then no more attempts will be made for a period of time. + * Initially, and on the first failure following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() + * (milliseconds). If, after this delay, additional failures occur, each failure will multiply the delay by + * DeliveryQueueHelper.getFailureBackoff() up to a maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer(). + * Note that this behavior applies to the delivery queue as a whole and not to individual files in the queue. If + * multiple files are being delivered and one fails, the delay will be started. If a second delivery fails while the + * delay was active, it will not change the delay or change the duration of any subsequent delay. If, however, it + * succeeds, it will cancel the delay. + * + * <p>The queue maintains 3 collections of files to deliver: A to do list of files that will be attempted, a working + * set of files that are being attempted, and a retry set of files that were attempted and failed. Whenever the to do + * list is empty and needs to be refilled, a scan of the spool directory is made and the file names sorted. Any files + * in the working set are ignored. If a DeliveryTask for the file is in the retry set, then that delivery task is placed + * on the to do list. Otherwise, a new DeliveryTask for the file is created and placed on the to do list. If, when a + * DeliveryTask is about to be removed from the to do list, its age exceeds DeliveryQueueHelper.getExpirationTimer(), + * then it is instead marked as expired. + * + * <p>A delivery queue also maintains a skip flag. This flag is true if the failure timer is active or if no files are + * found in a directory scan. */ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { + private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class); private DeliveryQueueHelper deliveryQueueHelper; private DestInfo destinationInfo; @@ -79,6 +74,16 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { private Vector<DeliveryTask> todo = new Vector<>(); /** + * Create a delivery queue for a given destination info. + */ + DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { + this.deliveryQueueHelper = deliveryQueueHelper; + this.destinationInfo = destinationInfo; + dir = new File(destinationInfo.getSpool()); + dir.mkdirs(); + } + + /** * Try to cancel a delivery task. * * @return The length of the task in bytes or 0 if the task cannot be cancelled. @@ -106,7 +111,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (dt.isCleaned()) { return (0); } - StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts()); + StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), + dt.getLength(), "diskFull", dt.getAttempts()); dt.clean(); return (dt.getLength()); } @@ -144,7 +150,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (failduration == 0) { if (destinationInfo.isPrivilegedSubscriber()) { failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer(); - } else{ + } else { failduration = deliveryQueueHelper.getInitFailureTimer(); } } @@ -205,144 +211,106 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { todo = new Vector<>(); String[] files = dir.list(); Arrays.sort(files); - for (String fname : files) { - if (!fname.endsWith(".M")) { - continue; - } - String fname2 = fname.substring(0, fname.length() - 2); - long pidtime = 0; - int dot = fname2.indexOf('.'); - if (dot < 1) { - continue; - } - try { - pidtime = Long.parseLong(fname2.substring(0, dot)); - } catch (Exception e) { - logger.error("Exception", e); - } - if (pidtime < 1000000000000L) { - continue; - } - if (working.get(fname2) != null) { - continue; - } - DeliveryTask dt = retry.get(fname2); - if (dt == null) { - dt = new DeliveryTask(this, fname2); - } - todo.add(dt); - } + scanForNextTask(files); retry = new Hashtable<>(); } - if (todoindex < todo.size()) { - DeliveryTask dt = todo.get(todoindex); - if (dt.isCleaned()) { - todoindex++; - continue; - } - if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) { - retry.put(dt.getPublishId(), dt); - todoindex++; - continue; - } - if (dt.getDate() >= mindate) { - return (dt); - } - todoindex++; - reportExpiry(dt); - continue; + DeliveryTask dt = getDeliveryTask(mindate); + if (dt != null) { + return dt; } - return (null); + return null; } } /** - * Create a delivery queue for a given destination info - */ - DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { - this.deliveryQueueHelper = deliveryQueueHelper; - this.destinationInfo = destinationInfo; - dir = new File(destinationInfo.getSpool()); - dir.mkdirs(); - } - - /** - * Update the destination info for this delivery queue + * Update the destination info for this delivery queue. */ public void config(DestInfo destinationInfo) { this.destinationInfo = destinationInfo; } /** - * Get the dest info + * Get the dest info. */ public DestInfo getDestinationInfo() { return (destinationInfo); } /** - * Get the config manager + * Get the config manager. */ public DeliveryQueueHelper getConfig() { return (deliveryQueueHelper); } /** - * Exceptional condition occurred during delivery + * Exceptional condition occurred during delivery. */ public void reportDeliveryExtra(DeliveryTask task, long sent) { StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent); } /** - * Message too old to deliver + * Message too old to deliver. */ void reportExpiry(DeliveryTask task) { - StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); markExpired(task); } /** - * Completed a delivery attempt + * Completed a delivery attempt. */ public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); if (destinationInfo.isPrivilegedSubscriber()) { - task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer()); + task.setResumeTime( + System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer()); markFailWithRetry(task); } else { markSuccess(task); } } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) { markRedirect(task); } else { - StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); + StatusLog + .logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } - } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time. - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); - StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); + } else if (status < 500 && status != 429) { + // Status 429 is the standard response for Too Many Requests and indicates + // that a file needs to be delivered again at a later time. + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } else { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); markFailWithRetry(task); } } /** - * Delivery failed by reason of an exception + * Delivery failed by reason of an exception. */ public void reportException(DeliveryTask task, Exception exception) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString()); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), + task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString()); deliveryQueueHelper.handleUnreachable(destinationInfo); markFailWithRetry(task); } /** - * Get the feed ID for a subscription + * Get the feed ID for a subscription. * * @param subid The subscription ID * @return The feed ID @@ -352,22 +320,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { } /** - * Get the URL to deliver a message to given the file ID + * Get the URL to deliver a message to given the file ID. */ public String getDestURL(String fileid) { return (deliveryQueueHelper.getDestURL(destinationInfo, fileid)); } /** - * Deliver files until there's a failure or there are no more - * files to deliver + * Deliver files until there's a failure or there are no more files to deliver. */ public void run() { - DeliveryTask t; + DeliveryTask task; long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit(); int filestogo = deliveryQueueHelper.getFairFileLimit(); - while ((t = getNext()) != null) { - t.run(); + while ((task = getNext()) != null) { + task.run(); if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { break; } @@ -375,21 +342,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { } /** - * Is there no work to do for this queue right now? + * Is there no work to do for this queue right now. */ synchronized boolean isSkipSet() { return (peekNext() == null); } /** - * Reset the retry timer + * Reset the retry timer. */ void resetQueue() { resumetime = System.currentTimeMillis(); } /** - * Get task if in queue and mark as success + * Get task if in queue and mark as success. */ boolean markTaskSuccess(String pubId) { DeliveryTask task = working.get(pubId); @@ -407,4 +374,63 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { } return false; } + + private void scanForNextTask(String[] files) { + for (String fname : files) { + String pubId = getPubId(fname); + if (pubId == null) { + continue; + } + DeliveryTask dt = retry.get(pubId); + if (dt == null) { + dt = new DeliveryTask(this, pubId); + } + todo.add(dt); + } + } + + @Nullable + private DeliveryTask getDeliveryTask(long mindate) { + if (todoindex < todo.size()) { + DeliveryTask dt = todo.get(todoindex); + if (dt.isCleaned()) { + todoindex++; + } + if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) { + retry.put(dt.getPublishId(), dt); + todoindex++; + } + if (dt.getDate() >= mindate) { + return (dt); + } + todoindex++; + reportExpiry(dt); + } + return null; + } + + @Nullable + private String getPubId(String fname) { + if (!fname.endsWith(".M")) { + return null; + } + String fname2 = fname.substring(0, fname.length() - 2); + long pidtime = 0; + int dot = fname2.indexOf('.'); + if (dot < 1) { + return null; + } + try { + pidtime = Long.parseLong(fname2.substring(0, dot)); + } catch (Exception e) { + logger.error("Exception", e); + } + if (pidtime < 1000000000000L) { + return null; + } + if (working.get(fname2) != null) { + return null; + } + return fname2; + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index 018c3aff..2093d6d4 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -24,29 +24,39 @@ package org.onap.dmaap.datarouter.node; -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.zip.GZIPInputStream; +import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; +import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.ProtocolException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.UUID; +import java.util.zip.GZIPInputStream; +import org.jetbrains.annotations.Nullable; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; -import static com.att.eelf.configuration.Configuration.*; -import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; - /** * A file to be delivered to a destination. - * <p> - * A Delivery task represents a work item for the data router - a file that - * needs to be delivered and provides mechanisms to get information about - * the file and its delivery data as well as to attempt delivery. + * + * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides + * mechanisms to get information about the file and its delivery data as well as to attempt delivery. */ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { - private static EELFLogger eelfLogger = EELFManager.getInstance() - .getLogger(DeliveryTask.class); + + private static final String DECOMPRESSION_STATUS = "Decompression_Status"; + private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class); private DeliveryTaskHelper deliveryTaskHelper; private String pubid; private DestInfo destInfo; @@ -72,9 +82,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { * Create a delivery task for a given delivery queue and pub ID * * @param deliveryTaskHelper The delivery task helper for the queue this task is in. - * @param pubid The publish ID for this file. This is used as - * the base for the file name in the spool directory and is of - * the form <milliseconds since 1970>.<fqdn of initial data router node> + * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and + * is of the form <milliseconds since 1970>.<fqdn of initial data router node> */ DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { this.deliveryTaskHelper = deliveryTaskHelper; @@ -91,40 +100,40 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); resumeTime = System.currentTimeMillis(); - Vector<String[]> hdrv = new Vector<>(); + ArrayList<String[]> hdrv = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { - String s = br.readLine(); - int i = s.indexOf('\t'); - method = s.substring(0, i); + String line = br.readLine(); + int index = line.indexOf('\t'); + method = line.substring(0, index); NodeUtils.setIpAndFqdnForEelf(method); if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - fileid = s.substring(i + 1); - while ((s = br.readLine()) != null) { - i = s.indexOf('\t'); - String h = s.substring(0, i); - String v = s.substring(i + 1); - if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) { - subid = v.replaceAll("[^ ]*/", ""); + fileid = line.substring(index + 1); + while ((line = br.readLine()) != null) { + index = line.indexOf('\t'); + String header = line.substring(0, index); + String headerValue = line.substring(index + 1); + if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) { + subid = headerValue.replaceAll("[^ ]*/", ""); feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); } - if (length == 0 && h.toLowerCase().startsWith("content-")) { + if (length == 0 && header.toLowerCase().startsWith("content-")) { continue; } - if (h.equalsIgnoreCase("content-type")) { - ctype = v; + if ("content-type".equalsIgnoreCase(header)) { + ctype = headerValue; } - if (h.equalsIgnoreCase("x-onap-requestid")) { - MDC.put(MDC_KEY_REQUEST_ID, v); + if ("x-onap-requestid".equalsIgnoreCase(header)) { + MDC.put(MDC_KEY_REQUEST_ID, headerValue); } - if (h.equalsIgnoreCase("x-invocationid")) { - MDC.put("InvocationId", v); - v = UUID.randomUUID().toString(); - newInvocationId = v; + if ("x-invocationid".equalsIgnoreCase(header)) { + MDC.put("InvocationId", headerValue); + headerValue = UUID.randomUUID().toString(); + newInvocationId = headerValue; } - hdrv.add(new String[]{h, v}); + hdrv.add(new String[]{header, headerValue}); } } catch (Exception e) { eelfLogger.error("Exception", e); @@ -134,20 +143,20 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } /** - * Is the object a DeliveryTask with the same publication ID? + * Is the object a DeliveryTask with the same publication ID. */ - public boolean equals(Object o) { - if (!(o instanceof DeliveryTask)) { + public boolean equals(Object object) { + if (!(object instanceof DeliveryTask)) { return (false); } - return (pubid.equals(((DeliveryTask) o).pubid)); + return (pubid.equals(((DeliveryTask) object).pubid)); } /** * Compare the publication IDs. */ - public int compareTo(DeliveryTask o) { - return (pubid.compareTo(o.pubid)); + public int compareTo(DeliveryTask other) { + return (pubid.compareTo(other.pubid)); } /** @@ -165,79 +174,49 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } /** - * Get the publish ID + * Get the publish ID. */ String getPublishId() { return (pubid); } /** - * Attempt delivery + * Attempt delivery. */ public void run() { attempts++; try { destInfo = deliveryTaskHelper.getDestinationInfo(); - boolean expect100 = destInfo.isUsing100(); boolean monly = destInfo.isMetaDataOnly(); length = 0; if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){ - fileid = fileid.replace(".gz", ""); - } + stripSuffixIfIsDecompress(); url = deliveryTaskHelper.getDestURL(fileid); - URL u = new URL(url); - HttpURLConnection uc = (HttpURLConnection) u.openConnection(); - uc.setConnectTimeout(60000); - uc.setReadTimeout(60000); - uc.setInstanceFollowRedirects(false); - uc.setRequestMethod(method); - uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", destInfo.getAuth()); - uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); - for (String[] nv : hdrs) { - uc.addRequestProperty(nv[0], nv[1]); - } - if (length > 0) { - if (expect100) { - uc.setRequestProperty("Expect", "100-continue"); - } - uc.setDoOutput(true); - if (destInfo.isDecompress()) { - if (isFiletypeGzip(datafile)) { - sendDecompressedFile(uc); - } else { - uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT"); - sendFile(uc); - } - } else { - sendFile(uc); - } - } - int rc = uc.getResponseCode(); - String rmsg = uc.getResponseMessage(); - if (rmsg == null) { - String h0 = uc.getHeaderField(0); - if (h0 != null) { - int i = h0.indexOf(' '); - int j = h0.indexOf(' ', i + 1); - if (i != -1 && j != -1) { - rmsg = h0.substring(j + 1); - } - } - } + URL urlObj = new URL(url); + HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection(); + urlConnection.setConnectTimeout(60000); + urlConnection.setReadTimeout(60000); + urlConnection.setInstanceFollowRedirects(false); + urlConnection.setRequestMethod(method); + urlConnection.setRequestProperty("Content-Length", Long.toString(length)); + urlConnection.setRequestProperty("Authorization", destInfo.getAuth()); + urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); + boolean expect100 = destInfo.isUsing100(); + int rc = deliverFileToSubscriber(expect100, urlConnection); + String rmsg = urlConnection.getResponseMessage(); + rmsg = getResponseMessage(urlConnection, rmsg); String xpubid = null; InputStream is; if (rc >= 200 && rc <= 299) { - is = uc.getInputStream(); - xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); + is = urlConnection.getInputStream(); + xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); } else { if (rc >= 300 && rc <= 399) { - rmsg = uc.getHeaderField("Location"); + rmsg = urlConnection.getHeaderField("Location"); } - is = uc.getErrorStream(); + is = urlConnection.getErrorStream(); } byte[] buf = new byte[4096]; if (is != null) { @@ -247,20 +226,19 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { - eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e); + eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e); deliveryTaskHelper.reportException(this, e); } } /** - * To send decompressed gzip to the subscribers + * To send decompressed gzip to the subscribers. * * @param httpURLConnection connection used to make request - * @throws IOException */ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException { byte[] buffer = new byte[8164]; - httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS"); + httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS"); OutputStream outputStream = getOutputStream(httpURLConnection); if (outputStream != null) { int bytesRead = 0; @@ -271,7 +249,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } outputStream.close(); } catch (IOException e) { - httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE"); + httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE"); eelfLogger.info("Could not decompress file", e); sendFile(httpURLConnection); } @@ -283,44 +261,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { * To send any file to the subscriber. * * @param httpURLConnection connection used to make request - * @throws IOException */ private void sendFile(HttpURLConnection httpURLConnection) throws IOException { OutputStream os = getOutputStream(httpURLConnection); - if (os != null) { - long sofar = 0; - try (InputStream is = new FileInputStream(datafile)) { - byte[] buf = new byte[1024 * 1024]; - while (sofar < length) { - int i = buf.length; - if (sofar + i > length) { - i = (int) (length - sofar); - } - i = is.read(buf, 0, i); - if (i <= 0) { - throw new IOException("Unexpected problem reading data file " + datafile); - } - sofar += i; - os.write(buf, 0, i); + if (os == null) { + return; + } + long sofar = 0; + try (InputStream is = new FileInputStream(datafile)) { + byte[] buf = new byte[1024 * 1024]; + while (sofar < length) { + int len = buf.length; + if (sofar + len > length) { + len = (int) (length - sofar); + } + len = is.read(buf, 0, len); + if (len <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); } - os.close(); - } catch (IOException ioe) { - deliveryTaskHelper.reportDeliveryExtra(this, sofar); - throw ioe; + sofar += len; + os.write(buf, 0, len); } + os.close(); + } catch (IOException ioe) { + deliveryTaskHelper.reportDeliveryExtra(this, sofar); + throw ioe; } } /** - * Get the outputstream that will be used to send data + * Get the outputstream that will be used to send data. * * @param httpURLConnection connection used to make request * @return AN Outpustream that can be used to send your data. - * @throws IOException */ private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { OutputStream outputStream = null; - try { outputStream = httpURLConnection.getOutputStream(); } catch (ProtocolException pe) { @@ -331,8 +307,52 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { return outputStream; } + private void stripSuffixIfIsDecompress() { + if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) { + fileid = fileid.replace(".gz", ""); + } + } + + private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException { + for (String[] nv : hdrs) { + uc.addRequestProperty(nv[0], nv[1]); + } + if (length > 0) { + if (expect100) { + uc.setRequestProperty("Expect", "100-continue"); + } + uc.setDoOutput(true); + if (destInfo.isDecompress()) { + if (isFiletypeGzip(datafile)) { + sendDecompressedFile(uc); + } else { + uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT"); + sendFile(uc); + } + } else { + sendFile(uc); + } + } + return uc.getResponseCode(); + } + + @Nullable + private String getResponseMessage(HttpURLConnection uc, String rmsg) { + if (rmsg == null) { + String h0 = uc.getHeaderField(0); + if (h0 != null) { + int indexOfSpace1 = h0.indexOf(' '); + int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1); + if (indexOfSpace1 != -1 && indexOfSpace2 != -1) { + rmsg = h0.substring(indexOfSpace2 + 1); + } + } + } + return rmsg; + } + /** - * Remove meta and data files + * Remove meta and data files. */ void clean() { datafile.delete(); @@ -343,28 +363,28 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } /** - * Set the resume time for a delivery task. + * Get the resume time for a delivery task. */ - void setResumeTime(long resumeTime) { - this.resumeTime = resumeTime; + long getResumeTime() { + return resumeTime; } /** - * Get the resume time for a delivery task. + * Set the resume time for a delivery task. */ - long getResumeTime() { - return resumeTime; + void setResumeTime(long resumeTime) { + this.resumeTime = resumeTime; } /** - * Has this delivery task been cleaned? + * Has this delivery task been cleaned. */ boolean isCleaned() { return (hdrs == null); } /** - * Get length of body + * Get length of body. */ public long getLength() { return (length); @@ -378,58 +398,58 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } /** - * Get the most recent delivery attempt URL + * Get the most recent delivery attempt URL. */ public String getURL() { return (url); } /** - * Get the content type + * Get the content type. */ String getCType() { return (ctype); } /** - * Get the method + * Get the method. */ String getMethod() { return (method); } /** - * Get the file ID + * Get the file ID. */ String getFileId() { return (fileid); } /** - * Get the number of delivery attempts + * Get the number of delivery attempts. */ int getAttempts() { return (attempts); } /** - * Get the (space delimited list of) subscription ID for this delivery task + * Get the (space delimited list of) subscription ID for this delivery task. */ String getSubId() { return (subid); } /** - * Get the feed ID for this delivery task + * Get the feed ID for this delivery task. */ String getFeedId() { return (feedid); } /** - * Get the followRedirects for this delivery task + * Get the followRedirects for this delivery task. */ public boolean getFollowRedirects() { - return(followRedirects); + return (followRedirects); } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java index d4ac8bd6..b9068f2f 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java @@ -26,32 +26,33 @@ package org.onap.dmaap.datarouter.node; /** * Interface to allow independent testing of the DeliveryTask code. - * <p> - * This interface represents all the configuraiton information and - * feedback mechanisms that a delivery task needs. + * + * <p>This interface represents all the configuraiton information and feedback mechanisms that a delivery task needs. */ public interface DeliveryTaskHelper { + /** - * Report that a delivery attempt failed due to an exception (like can't connect to remote host) + * Report that a delivery attempt failed due to an exception (like can't connect to remote host). * - * @param task The task that failed + * @param task The task that failed * @param exception The exception that occurred */ void reportException(DeliveryTask task, Exception exception); /** - * Report that a delivery attempt completed (successfully or unsuccessfully) + * Report that a delivery attempt completed (successfully or unsuccessfully). * - * @param task The task that failed - * @param status The HTTP status - * @param xpubid The publish ID from the far end (if any) + * @param task The task that failed + * @param status The HTTP status + * @param xpubid The publish ID from the far end (if any) * @param location The redirection location for a 3XX response */ void reportStatus(DeliveryTask task, int status, String xpubid, String location); /** - * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue. + * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 + * Continue. * * @param task The task that failed * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue. @@ -59,14 +60,14 @@ public interface DeliveryTaskHelper { void reportDeliveryExtra(DeliveryTask task, long sent); /** - * Get the destination information for the delivery queue + * Get the destination information for the delivery queue. * * @return The destination information */ DestInfo getDestinationInfo(); /** - * Given a file ID, get the URL to deliver to + * Given a file ID, get the URL to deliver to. * * @param fileid The file id * @return The URL to deliver to @@ -74,7 +75,7 @@ public interface DeliveryTaskHelper { String getDestURL(String fileid); /** - * Get the feed ID for a subscription + * Get the feed ID for a subscription. * * @param subid The subscription ID * @return The feed iD diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java index 8890fe96..f5fa6e98 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java @@ -25,9 +25,10 @@ package org.onap.dmaap.datarouter.node; /** - * Information for a delivery destination that doesn't change from message to message + * Information for a delivery destination that doesn't change from message to message. */ public class DestInfo { + private String name; private String spool; private String subid; @@ -40,114 +41,33 @@ public class DestInfo { private boolean privilegedSubscriber; private boolean decompress; private boolean followRedirects; - private String aafInstance; - - public static class DestInfoBuilder { - private String name; - private String spool; - private String subid; - private String logdata; - private String url; - private String authuser; - private String authentication; - private boolean metaonly; - private boolean use100; - private boolean privilegedSubscriber; - private boolean followRedirects; - private boolean decompress; - private NodeConfig.ProvSubscription subscription; - - public DestInfoBuilder setName(String name) { - this.name = name; - return this; - } - - public DestInfoBuilder setSpool(String spool) { - this.spool = spool; - return this; - } - - public DestInfoBuilder setSubid(String subid) { - this.subid = subid; - return this; - } - - public DestInfoBuilder setLogdata(String logdata) { - this.logdata = logdata; - return this; - } - - public DestInfoBuilder setUrl(String url) { - this.url = url; - return this; - } - - public DestInfoBuilder setAuthuser(String authuser) { - this.authuser = authuser; - return this; - } - - public DestInfoBuilder setAuthentication(String authentication) { - this.authentication = authentication; - return this; - } - - public DestInfoBuilder setMetaonly(boolean metaonly) { - this.metaonly = metaonly; - return this; - } - - public DestInfoBuilder setUse100(boolean use100) { - this.use100 = use100; - return this; - } - - public DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) { - this.privilegedSubscriber = privilegedSubscriber; - return this; - } - - public DestInfoBuilder setFollowRedirects(boolean followRedirects) { - this.followRedirects = followRedirects; - return this; - } - - public DestInfoBuilder setDecompress(boolean decompress) { - this.decompress = decompress; - return this; - } - - public DestInfoBuilder setSubscription(NodeConfig.ProvSubscription subscription) { - this.subscription = subscription; - return this; - } - - public DestInfo createDestInfo() { - return new DestInfo(this); - } - } + /** + * Create a destination information object. + * + * @param destInfoBuilder DestInfo Object Builder + */ public DestInfo(DestInfoBuilder destInfoBuilder) { - this.name = destInfoBuilder.name; - this.spool = destInfoBuilder.spool; - this.subid = destInfoBuilder.subid; - this.logdata = destInfoBuilder.logdata; - this.url = destInfoBuilder.url; - this.authuser = destInfoBuilder.authuser; - this.authentication = destInfoBuilder.authentication; - this.metaonly = destInfoBuilder.metaonly; - this.use100 = destInfoBuilder.use100; - this.privilegedSubscriber = destInfoBuilder.privilegedSubscriber; - this.followRedirects = destInfoBuilder.followRedirects; - this.decompress = destInfoBuilder.decompress; + this.name = destInfoBuilder.getName(); + this.spool = destInfoBuilder.getSpool(); + this.subid = destInfoBuilder.getSubid(); + this.logdata = destInfoBuilder.getLogdata(); + this.url = destInfoBuilder.getUrl(); + this.authuser = destInfoBuilder.getAuthuser(); + this.authentication = destInfoBuilder.getAuthentication(); + this.metaonly = destInfoBuilder.isMetaonly(); + this.use100 = destInfoBuilder.isUse100(); + this.privilegedSubscriber = destInfoBuilder.isPrivilegedSubscriber(); + this.followRedirects = destInfoBuilder.isFollowRedirects(); + this.decompress = destInfoBuilder.isDecompress(); } /** * Create a destination information object. * - * @param name n:fqdn or s:subid - * @param spool The directory where files are spooled. - * @param subscription The subscription. + * @param name n:fqdn or s:subid + * @param spool The directory where files are spooled. + * @param subscription The subscription. */ public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) { this.name = name; @@ -164,8 +84,8 @@ public class DestInfo { this.decompress = subscription.isDecompress(); } - public boolean equals(Object o) { - return ((o instanceof DestInfo) && ((DestInfo) o).spool.equals(spool)); + public boolean equals(Object object) { + return ((object instanceof DestInfo) && ((DestInfo) object).spool.equals(spool)); } public int hashCode() { @@ -173,7 +93,7 @@ public class DestInfo { } /** - * Get the name of this destination + * Get the name of this destination. */ public String getName() { return (name); @@ -217,7 +137,7 @@ public class DestInfo { } /** - * Get the user for authentication + * Get the user for authentication. * * @return The name of the user for logging */ @@ -226,7 +146,7 @@ public class DestInfo { } /** - * Get the authentication header + * Get the authentication header. * * @return The string to use to authenticate to the recipient. */ @@ -235,7 +155,7 @@ public class DestInfo { } /** - * Is this a metadata only delivery? + * Is this a metadata only delivery. * * @return True if this is a metadata only delivery */ @@ -244,7 +164,7 @@ public class DestInfo { } /** - * Should I send expect 100-continue header? + * Should I send expect 100-continue header. * * @return True if I should. */ @@ -253,23 +173,23 @@ public class DestInfo { } /** - * Should we wait to receive a file processed acknowledgement before deleting file + * Should we wait to receive a file processed acknowledgement before deleting file. */ public boolean isPrivilegedSubscriber() { return (privilegedSubscriber); } /** - * Should I follow redirects? - * - * @return True if I should. - */ + * Should I follow redirects. + * + * @return True if I should. + */ public boolean isFollowRedirects() { return (followRedirects); } /** - * Should i decompress the file before sending it on + * Should i decompress the file before sending it on. * * @return True if I should. */ diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java new file mode 100644 index 00000000..00c5cd8b --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java @@ -0,0 +1,149 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dmaap.datarouter.node; + +public class DestInfoBuilder { + + private String destInfoName; + private String destInfoSpool; + private String destInfoSubId; + private String destInfoLogData; + private String destInfoUrl; + private String destInfoAuthUser; + private String destInfoAuthentication; + private boolean destInfoMetaOnly; + private boolean destInfoUse100; + private boolean destInfoPrivilegedSubscriber; + private boolean destInfoFollowRedirects; + private boolean destInfoDecompress; + + public String getName() { + return destInfoName; + } + + public DestInfoBuilder setName(String name) { + this.destInfoName = name; + return this; + } + + public String getSpool() { + return destInfoSpool; + } + + public DestInfoBuilder setSpool(String spool) { + this.destInfoSpool = spool; + return this; + } + + public String getSubid() { + return destInfoSubId; + } + + public DestInfoBuilder setSubid(String subid) { + this.destInfoSubId = subid; + return this; + } + + String getLogdata() { + return destInfoLogData; + } + + DestInfoBuilder setLogdata(String logdata) { + this.destInfoLogData = logdata; + return this; + } + + public String getUrl() { + return destInfoUrl; + } + + public DestInfoBuilder setUrl(String url) { + this.destInfoUrl = url; + return this; + } + + String getAuthuser() { + return destInfoAuthUser; + } + + DestInfoBuilder setAuthuser(String authuser) { + this.destInfoAuthUser = authuser; + return this; + } + + String getAuthentication() { + return destInfoAuthentication; + } + + DestInfoBuilder setAuthentication(String authentication) { + this.destInfoAuthentication = authentication; + return this; + } + + boolean isMetaonly() { + return destInfoMetaOnly; + } + + DestInfoBuilder setMetaonly(boolean metaonly) { + this.destInfoMetaOnly = metaonly; + return this; + } + + boolean isUse100() { + return destInfoUse100; + } + + DestInfoBuilder setUse100(boolean use100) { + this.destInfoUse100 = use100; + return this; + } + + boolean isPrivilegedSubscriber() { + return destInfoPrivilegedSubscriber; + } + + DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) { + this.destInfoPrivilegedSubscriber = privilegedSubscriber; + return this; + } + + boolean isFollowRedirects() { + return destInfoFollowRedirects; + } + + DestInfoBuilder setFollowRedirects(boolean followRedirects) { + this.destInfoFollowRedirects = followRedirects; + return this; + } + + boolean isDecompress() { + return destInfoDecompress; + } + + DestInfoBuilder setDecompress(boolean decompress) { + this.destInfoDecompress = decompress; + return this; + } + + DestInfo createDestInfo() { + return new DestInfo(this); + } +}
\ No newline at end of file diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java index 534b2b35..49852680 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java @@ -26,39 +26,40 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; - import java.io.IOException; -import java.util.*; -import java.net.*; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; /** - * Determine if an IP address is from a machine + * Determine if an IP address is from a machine. */ public class IsFrom { + + private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class); private long nextcheck; private String[] ips; private String fqdn; - private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class); /** - * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have any effect. + * Create an IsFrom for the specified fully qualified domain name. */ - public static void setDNSCache() { - java.security.Security.setProperty("networkaddress.cache.ttl", "10"); + public IsFrom(String fqdn) { + this.fqdn = fqdn; } /** - * Create an IsFrom for the specified fully qualified domain name. + * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have + * any effect. */ - public IsFrom(String fqdn) { - this.fqdn = fqdn; + public static void setDNSCache() { + java.security.Security.setProperty("networkaddress.cache.ttl", "10"); } /** - * Check if an IP address matches. If it has been more than - * 10 seconds since DNS was last checked for changes to the - * IP address(es) of this FQDN, check again. Then check - * if the specified IP address belongs to the FQDN. + * Check if an IP address matches. If it has been more than 10 seconds since DNS was last checked for changes to + * the IP address(es) of this FQDN, check again. Then check if the specified IP address belongs to the FQDN. */ public synchronized boolean isFrom(String ip) { long now = System.currentTimeMillis(); @@ -98,7 +99,7 @@ public class IsFrom { } /** - * Return the fully qualified domain name + * Return the fully qualified domain name. */ public String toString() { return (fqdn); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java index 6ffb7604..3277408c 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java @@ -20,6 +20,7 @@ * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * * ******************************************************************************/ + package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; @@ -35,16 +36,18 @@ import java.util.Arrays; import java.util.TimerTask; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.jetbrains.annotations.NotNull; /** * Cleanup of old log files. - * <p> - * Periodically scan the log directory for log files that are older than the log file retention interval, and delete + * + * <p>Periodically scan the log directory for log files that are older than the log file retention interval, and delete * them. In a future release, This class will also be responsible for uploading events logs to the log server to * support the log query APIs. */ public class LogManager extends TimerTask { + private EELFLogger logger = EELFManager.getInstance().getLogger(LogManager.class); private NodeConfigManager config; private Matcher isnodelog; @@ -53,8 +56,55 @@ public class LogManager extends TimerTask { private String uploaddir; private String logdir; + /** + * Construct a log manager + * + * <p>The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary. + * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes). + */ + public LogManager(NodeConfigManager config) { + this.config = config; + try { + isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher(""); + iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher(""); + } catch (Exception e) { + logger.error("Exception", e); + } + logdir = config.getLogDir(); + uploaddir = logdir + "/.spool"; + (new File(uploaddir)).mkdirs(); + long now = System.currentTimeMillis(); + long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000); + long when = now - now % intvl + intvl + 20000L; + config.getTimer().scheduleAtFixedRate(this, when - now, intvl); + worker = new Uploader(); + } + + /** + * Trigger check for expired log files and log files to upload. + */ + public void run() { + worker.poke(); + } + private class Uploader extends Thread implements DeliveryQueueHelper { + + private static final String EXCEPTION = "Exception"; + private static final String META = "/.meta"; private EELFLogger logger = EELFManager.getInstance().getLogger(Uploader.class); + private DeliveryQueue dq; + + Uploader() { + dq = new DeliveryQueue(this, + new DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null) + .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth()) + .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false) + .setFollowRedirects(false) + .setDecompress(false).createDestInfo()); + setDaemon(true); + setName("Log Uploader"); + start(); + } public long getInitFailureTimer() { return (10000L); @@ -89,6 +139,7 @@ public class LogManager extends TimerTask { } public void handleUnreachable(DestInfo destinationInfo) { + throw new UnsupportedOperationException(); } public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) { @@ -103,24 +154,11 @@ public class LogManager extends TimerTask { return (null); } - private DeliveryQueue dq; - - public Uploader() { - dq = new DeliveryQueue(this, - new DestInfo.DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null) - .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth()) - .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false).setFollowRedirects(false) - .setDecompress(false).createDestInfo()); - setDaemon(true); - setName("Log Uploader"); - start(); - } - private synchronized void snooze() { try { wait(10000); } catch (Exception e) { - logger.error("InterruptedException", e); + logger.error(EXCEPTION, e); } } @@ -145,73 +183,48 @@ public class LogManager extends TimerTask { String curlog = StatusLog.getCurLogFile(); curlog = curlog.substring(curlog.lastIndexOf('/') + 1); try { - Writer w = new FileWriter(uploaddir + "/.meta"); - w.write("POST\tlogdata\nContent-Type\ttext/plain\n"); - w.close(); + Writer writer = new FileWriter(uploaddir + META); + writer.write("POST\tlogdata\nContent-Type\ttext/plain\n"); + writer.close(); BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued")); lastqueued = br.readLine(); br.close(); } catch (Exception e) { - logger.error("Exception", e); + logger.error(EXCEPTION, e); } for (String fn : fns) { if (!isnodelog.reset(fn).matches()) { if (!iseventlog.reset(fn).matches()) { continue; } - if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) { - lastqueued = fn; - try { - String pid = config.getPublishId(); - Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn)); - Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta")); - } catch (Exception e) { - logger.error("Exception", e); - } - } + lastqueued = setLastQueued(lastqueued, curlog, fn); } - File f = new File(dir, fn); - if (f.lastModified() < threshold) { - f.delete(); + File file = new File(dir, fn); + if (file.lastModified() < threshold) { + file.delete(); } } try (Writer w = new FileWriter(uploaddir + "/.lastqueued")) { - (new File(uploaddir + "/.meta")).delete(); + (new File(uploaddir + META)).delete(); w.write(lastqueued + "\n"); } catch (Exception e) { - logger.error("Exception", e); + logger.error(EXCEPTION, e); } } - } - /** - * Construct a log manager - * <p> - * The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary. - * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes). - */ - public LogManager(NodeConfigManager config) { - this.config = config; - try { - isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher(""); - iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher(""); - } catch (Exception e) { - logger.error("Exception", e); + @NotNull + private String setLastQueued(String lastqueued, String curlog, String fn) { + if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) { + lastqueued = fn; + try { + String pid = config.getPublishId(); + Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn)); + Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + META)); + } catch (Exception e) { + logger.error(EXCEPTION, e); + } + } + return lastqueued; } - logdir = config.getLogDir(); - uploaddir = logdir + "/.spool"; - (new File(uploaddir)).mkdirs(); - long now = System.currentTimeMillis(); - long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000); - long when = now - now % intvl + intvl + 20000L; - config.getTimer().scheduleAtFixedRate(this, when - now, intvl); - worker = new Uploader(); - } - - /** - * Trigger check for expired log files and log files to upload - */ - public void run() { - worker.poke(); } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java index d455f2d9..7f018210 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java @@ -26,22 +26,451 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; - import java.io.File; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; -import java.util.Hashtable; -import java.util.Vector; /** * Processed configuration for this node. - * <p> - * The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time + * + * <p>The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time * configuration data is received from the provisioning server, a new NodeConfig is created and the previous one * discarded. */ public class NodeConfig { + + private static final String PUBLISHER_NOT_PERMITTED = "Publisher not permitted for this feed"; private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeConfig.class); + private HashMap<String, String> params = new HashMap<>(); + private HashMap<String, Feed> feeds = new HashMap<>(); + private HashMap<String, DestInfo> nodeinfo = new HashMap<>(); + private HashMap<String, DestInfo> subinfo = new HashMap<>(); + private HashMap<String, IsFrom> nodes = new HashMap<>(); + private HashMap<String, ProvSubscription> provSubscriptions = new HashMap<>(); + private String myname; + private String myauth; + private DestInfo[] alldests; + private int rrcntr; + + /** + * Process the raw provisioning data to configure this node. + * + * @param pd The parsed provisioning data + * @param myname My name as seen by external systems + * @param spooldir The directory where temporary files live + * @param port The port number for URLs + * @param nodeauthkey The keying string used to generate node authentication credentials + */ + public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) { + this.myname = myname; + for (ProvParam p : pd.getParams()) { + params.put(p.getName(), p.getValue()); + } + ArrayList<DestInfo> destInfos = new ArrayList<>(); + myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); + for (ProvNode pn : pd.getNodes()) { + String commonName = pn.getCName(); + if (nodeinfo.get(commonName) != null) { + continue; + } + DestInfo di = new DestInfoBuilder().setName("n:" + commonName).setSpool(spooldir + "/n/" + commonName) + .setSubid(null) + .setLogdata("n2n-" + commonName).setUrl("https://" + commonName + ":" + port + "/internal/publish") + .setAuthuser(commonName).setAuthentication(myauth).setMetaonly(false).setUse100(true) + .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo(); + (new File(di.getSpool())).mkdirs(); + String auth = NodeUtils.getNodeAuthHdr(commonName, nodeauthkey); + destInfos.add(di); + nodeinfo.put(commonName, di); + nodes.put(auth, new IsFrom(commonName)); + } + PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops()); + HashMap<String, ArrayList<Redirection>> rdtab = new HashMap<>(); + for (ProvForceIngress pfi : pd.getForceIngress()) { + ArrayList<Redirection> v = rdtab.get(pfi.getFeedId()); + if (v == null) { + v = new ArrayList<>(); + rdtab.put(pfi.getFeedId(), v); + } + Redirection r = new Redirection(); + if (pfi.getSubnet() != null) { + r.snm = new SubnetMatcher(pfi.getSubnet()); + } + r.user = pfi.getUser(); + r.nodes = pfi.getNodes(); + v.add(r); + } + HashMap<String, HashMap<String, String>> pfutab = new HashMap<>(); + for (ProvFeedUser pfu : pd.getFeedUsers()) { + HashMap<String, String> t = pfutab.get(pfu.getFeedId()); + if (t == null) { + t = new HashMap<>(); + pfutab.put(pfu.getFeedId(), t); + } + t.put(pfu.getCredentials(), pfu.getUser()); + } + HashMap<String, String> egrtab = new HashMap<>(); + for (ProvForceEgress pfe : pd.getForceEgress()) { + if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) { + continue; + } + egrtab.put(pfe.getSubId(), pfe.getNode()); + } + HashMap<String, ArrayList<SubnetMatcher>> pfstab = new HashMap<>(); + for (ProvFeedSubnet pfs : pd.getFeedSubnets()) { + ArrayList<SubnetMatcher> v = pfstab.get(pfs.getFeedId()); + if (v == null) { + v = new ArrayList<>(); + pfstab.put(pfs.getFeedId(), v); + } + v.add(new SubnetMatcher(pfs.getCidr())); + } + HashMap<String, StringBuilder> feedTargets = new HashMap<>(); + HashSet<String> allfeeds = new HashSet<>(); + for (ProvFeed pfx : pd.getFeeds()) { + if (pfx.getStatus() == null) { + allfeeds.add(pfx.getId()); + } + } + for (ProvSubscription provSubscription : pd.getSubscriptions()) { + String subId = provSubscription.getSubId(); + String feedId = provSubscription.getFeedId(); + if (isFeedOrSubKnown(allfeeds, subId, feedId)) { + continue; + } + int sididx = 999; + try { + sididx = Integer.parseInt(subId); + sididx -= sididx % 100; + } catch (Exception e) { + logger.error("NODE0517 Exception NodeConfig: " + e); + } + String subscriptionDirectory = sididx + "/" + subId; + DestInfo destinationInfo = new DestInfo("s:" + subId, + spooldir + "/s/" + subscriptionDirectory, provSubscription); + (new File(destinationInfo.getSpool())).mkdirs(); + destInfos.add(destinationInfo); + provSubscriptions.put(subId, provSubscription); + subinfo.put(subId, destinationInfo); + String egr = egrtab.get(subId); + if (egr != null) { + subId = pf.getPath(egr) + subId; + } + StringBuilder sb = feedTargets.get(feedId); + if (sb == null) { + sb = new StringBuilder(); + feedTargets.put(feedId, sb); + } + sb.append(' ').append(subId); + } + alldests = destInfos.toArray(new DestInfo[0]); + for (ProvFeed pfx : pd.getFeeds()) { + String fid = pfx.getId(); + Feed f = feeds.get(fid); + if (f != null) { + continue; + } + f = new Feed(); + feeds.put(fid, f); + f.createdDate = pfx.getCreatedDate(); + f.loginfo = pfx.getLogData(); + f.status = pfx.getStatus(); + /* + * AAF changes: TDP EPIC US# 307413 + * Passing aafInstance from ProvFeed to identify legacy/AAF feeds + */ + f.aafInstance = pfx.getAafInstance(); + ArrayList<SubnetMatcher> v1 = pfstab.get(fid); + if (v1 == null) { + f.subnets = new SubnetMatcher[0]; + } else { + f.subnets = v1.toArray(new SubnetMatcher[0]); + } + HashMap<String, String> h1 = pfutab.get(fid); + if (h1 == null) { + h1 = new HashMap(); + } + f.authusers = h1; + ArrayList<Redirection> v2 = rdtab.get(fid); + if (v2 == null) { + f.redirections = new Redirection[0]; + } else { + f.redirections = v2.toArray(new Redirection[0]); + } + StringBuilder sb = feedTargets.get(fid); + if (sb == null) { + f.targets = new Target[0]; + } else { + f.targets = parseRouting(sb.toString()); + } + } + } + + /** + * Parse a target string into an array of targets + * + * @param routing Target string + * @return Array of targets. + */ + public Target[] parseRouting(String routing) { + routing = routing.trim(); + if ("".equals(routing)) { + return (new Target[0]); + } + String[] xx = routing.split("\\s+"); + HashMap<String, Target> tmap = new HashMap<>(); + HashSet<String> subset = new HashSet<>(); + ArrayList<Target> tv = new ArrayList<>(); + for (int i = 0; i < xx.length; i++) { + String t = xx[i]; + int j = t.indexOf('/'); + if (j == -1) { + addTarget(subset, tv, t); + } else { + addTargetWithRouting(tmap, tv, t, j); + } + } + return (tv.toArray(new Target[0])); + } + + /** + * Check whether this is a valid node-to-node transfer + * + * @param credentials Credentials offered by the supposed node + * @param ip IP address the request came from + */ + public boolean isAnotherNode(String credentials, String ip) { + IsFrom n = nodes.get(credentials); + return (n != null && n.isFrom(ip)); + } + + /** + * Check whether publication is allowed. + * + * @param feedid The ID of the feed being requested. + * @param credentials The offered credentials + * @param ip The requesting IP address + */ + public String isPublishPermitted(String feedid, String credentials, String ip) { + Feed f = feeds.get(feedid); + String nf = "Feed does not exist"; + if (f != null) { + nf = f.status; + } + if (nf != null) { + return (nf); + } + String user = f.authusers.get(credentials); + if (user == null) { + return (PUBLISHER_NOT_PERMITTED); + } + if (f.subnets.length == 0) { + return (null); + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (SubnetMatcher snm : f.subnets) { + if (snm.matches(addr)) { + return (null); + } + } + return (PUBLISHER_NOT_PERMITTED); + } + + /** + * Check whether delete file is allowed. + * + * @param subId The ID of the subscription being requested. + */ + public boolean isDeletePermitted(String subId) { + ProvSubscription provSubscription = provSubscriptions.get(subId); + return provSubscription.isPrivilegedSubscriber(); + } + + /** + * Check whether publication is allowed for AAF Feed. + * + * @param feedid The ID of the feed being requested. + * @param ip The requesting IP address + */ + public String isPublishPermitted(String feedid, String ip) { + Feed f = feeds.get(feedid); + String nf = "Feed does not exist"; + if (f != null) { + nf = f.status; + } + if (nf != null) { + return nf; + } + if (f.subnets.length == 0) { + return null; + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (SubnetMatcher snm : f.subnets) { + if (snm.matches(addr)) { + return null; + } + } + return PUBLISHER_NOT_PERMITTED; + } + + /** + * Get authenticated user + */ + public String getAuthUser(String feedid, String credentials) { + return (feeds.get(feedid).authusers.get(credentials)); + } + + /** + * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID + * + * @param feedid The ID of the feed specified + */ + public String getAafInstance(String feedid) { + Feed f = feeds.get(feedid); + return f.aafInstance; + } + + /** + * Check if the request should be redirected to a different ingress node + */ + public String getIngressNode(String feedid, String user, String ip) { + Feed f = feeds.get(feedid); + if (f.redirections.length == 0) { + return (null); + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (Redirection r : f.redirections) { + if ((r.user != null && !user.equals(r.user)) || (r.snm != null && !r.snm.matches(addr))) { + continue; + } + for (String n : r.nodes) { + if (myname.equals(n)) { + return (null); + } + } + if (r.nodes.length == 0) { + return (null); + } + return (r.nodes[rrcntr++ % r.nodes.length]); + } + return (null); + } + + /** + * Get a provisioned configuration parameter + */ + public String getProvParam(String name) { + return (params.get(name)); + } + + /** + * Get all the DestInfos + */ + public DestInfo[] getAllDests() { + return (alldests); + } + + /** + * Get the targets for a feed + * + * @param feedid The feed ID + * @return The targets this feed should be delivered to + */ + public Target[] getTargets(String feedid) { + if (feedid == null) { + return (new Target[0]); + } + Feed f = feeds.get(feedid); + if (f == null) { + return (new Target[0]); + } + return (f.targets); + } + + /** + * Get the creation date for a feed + * + * @param feedid The feed ID + * @return the timestamp of creation date of feed id passed + */ + public String getCreatedDate(String feedid) { + Feed f = feeds.get(feedid); + return (f.createdDate); + } + + /** + * Get the feed ID for a subscription + * + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + DestInfo di = subinfo.get(subid); + if (di == null) { + return (null); + } + return (di.getLogData()); + } + + /** + * Get the spool directory for a subscription + * + * @param subid The subscription ID + * @return The spool directory + */ + public String getSpoolDir(String subid) { + DestInfo di = subinfo.get(subid); + if (di == null) { + return (null); + } + return (di.getSpool()); + } + + /** + * Get the Authorization value this node uses + * + * @return The Authorization header value for this node + */ + public String getMyAuth() { + return (myauth); + } + + private boolean isFeedOrSubKnown(HashSet<String> allfeeds, String subId, String feedId) { + return !allfeeds.contains(feedId) || subinfo.get(subId) != null; + } + + private void addTargetWithRouting(HashMap<String, Target> tmap, ArrayList<Target> tv, String t, int j) { + String node = t.substring(0, j); + String rtg = t.substring(j + 1); + DestInfo di = nodeinfo.get(node); + if (di == null) { + tv.add(new Target(null, t)); + } else { + Target tt = tmap.get(node); + if (tt == null) { + tt = new Target(di, rtg); + tmap.put(node, tt); + tv.add(tt); + } else { + tt.addRouting(rtg); + } + } + } + + private void addTarget(HashSet<String> subset, ArrayList<Target> tv, String t) { + DestInfo di = subinfo.get(t); + if (di == null) { + tv.add(new Target(null, t)); + } else { + if (!subset.contains(t)) { + subset.add(t); + tv.add(new Target(di, null)); + } + } + } + /** * Raw configuration entry for a data router node */ @@ -134,9 +563,8 @@ public class NodeConfig { /** * Get the created date of the data feed. */ - public String getCreatedDate() - { - return(createdDate); + public String getCreatedDate() { + return (createdDate); } /** @@ -277,7 +705,9 @@ public class NodeConfig { * @param followRedirect Is follow redirect of destination enabled? * @param decompress To see if they want their information compressed or decompressed */ - public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect, boolean decompress) { + public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, + boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect, + boolean decompress) { this.subid = subid; this.feedid = feedid; this.url = url; @@ -348,17 +778,17 @@ public class NodeConfig { /** * Should i decompress the file before sending it on - */ + */ public boolean isDecompress() { return (decompress); } /** - * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706 - * Get the followRedirect of this destination + * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706 Get the followRedirect of this + * destination */ boolean getFollowRedirect() { - return(followRedirect); + return (followRedirect); } } @@ -386,7 +816,7 @@ public class NodeConfig { this.subnet = subnet; this.user = user; //Sonar fix - if(nodes == null) { + if (nodes == null) { this.nodes = new String[0]; } else { this.nodes = Arrays.copyOf(nodes, nodes.length); @@ -466,13 +896,6 @@ public class NodeConfig { private String via; /** - * A human readable description of this entry - */ - public String toString() { - return ("Hop " + from + "->" + to + " via " + via); - } - - /** * Construct a hop entry * * @param from The FQDN of the node with the data to be delivered @@ -486,6 +909,13 @@ public class NodeConfig { } /** + * A human readable description of this entry + */ + public String toString() { + return ("Hop " + from + "->" + to + " via " + via); + } + + /** * Get the from node */ public String getFrom() { @@ -519,431 +949,10 @@ public class NodeConfig { String loginfo; String status; SubnetMatcher[] subnets; - Hashtable<String, String> authusers = new Hashtable<String, String>(); + HashMap<String, String> authusers = new HashMap<>(); Redirection[] redirections; Target[] targets; String createdDate; String aafInstance; } - - private Hashtable<String, String> params = new Hashtable<>(); - private Hashtable<String, Feed> feeds = new Hashtable<>(); - private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>(); - private Hashtable<String, DestInfo> subinfo = new Hashtable<>(); - private Hashtable<String, IsFrom> nodes = new Hashtable<>(); - private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>(); - private String myname; - private String myauth; - private DestInfo[] alldests; - private int rrcntr; - - /** - * Process the raw provisioning data to configure this node - * - * @param pd The parsed provisioning data - * @param myname My name as seen by external systems - * @param spooldir The directory where temporary files live - * @param port The port number for URLs - * @param nodeauthkey The keying string used to generate node authentication credentials - */ - public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) { - this.myname = myname; - for (ProvParam p : pd.getParams()) { - params.put(p.getName(), p.getValue()); - } - Vector<DestInfo> destInfos = new Vector<>(); - myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); - for (ProvNode pn : pd.getNodes()) { - String cName = pn.getCName(); - if (nodeinfo.get(cName) != null) { - continue; - } - String auth = NodeUtils.getNodeAuthHdr(cName, nodeauthkey); - DestInfo di = new DestInfo.DestInfoBuilder().setName("n:" + cName).setSpool(spooldir + "/n/" + cName).setSubid(null) - .setLogdata("n2n-" + cName).setUrl("https://" + cName + ":" + port + "/internal/publish") - .setAuthuser(cName).setAuthentication(myauth).setMetaonly(false).setUse100(true) - .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo(); - (new File(di.getSpool())).mkdirs(); - destInfos.add(di); - nodeinfo.put(cName, di); - nodes.put(auth, new IsFrom(cName)); - } - PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops()); - Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<>(); - for (ProvForceIngress pfi : pd.getForceIngress()) { - Vector<Redirection> v = rdtab.get(pfi.getFeedId()); - if (v == null) { - v = new Vector<>(); - rdtab.put(pfi.getFeedId(), v); - } - Redirection r = new Redirection(); - if (pfi.getSubnet() != null) { - r.snm = new SubnetMatcher(pfi.getSubnet()); - } - r.user = pfi.getUser(); - r.nodes = pfi.getNodes(); - v.add(r); - } - Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<>(); - for (ProvFeedUser pfu : pd.getFeedUsers()) { - Hashtable<String, String> t = pfutab.get(pfu.getFeedId()); - if (t == null) { - t = new Hashtable<>(); - pfutab.put(pfu.getFeedId(), t); - } - t.put(pfu.getCredentials(), pfu.getUser()); - } - Hashtable<String, String> egrtab = new Hashtable<>(); - for (ProvForceEgress pfe : pd.getForceEgress()) { - if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) { - continue; - } - egrtab.put(pfe.getSubId(), pfe.getNode()); - } - Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>(); - for (ProvFeedSubnet pfs : pd.getFeedSubnets()) { - Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId()); - if (v == null) { - v = new Vector<>(); - pfstab.put(pfs.getFeedId(), v); - } - v.add(new SubnetMatcher(pfs.getCidr())); - } - Hashtable<String, StringBuffer> feedTargets = new Hashtable<>(); - HashSet<String> allfeeds = new HashSet<>(); - for (ProvFeed pfx : pd.getFeeds()) { - if (pfx.getStatus() == null) { - allfeeds.add(pfx.getId()); - } - } - for (ProvSubscription provSubscription : pd.getSubscriptions()) { - String subId = provSubscription.getSubId(); - String feedId = provSubscription.getFeedId(); - if (!allfeeds.contains(feedId)) { - continue; - } - if (subinfo.get(subId) != null) { - continue; - } - int sididx = 999; - try { - sididx = Integer.parseInt(subId); - sididx -= sididx % 100; - } catch (Exception e) { - logger.error("NODE0517 Exception NodeConfig: "+e); - } - String subscriptionDirectory = sididx + "/" + subId; - DestInfo destinationInfo = new DestInfo("s:" + subId, - spooldir + "/s/" + subscriptionDirectory, provSubscription); - (new File(destinationInfo.getSpool())).mkdirs(); - destInfos.add(destinationInfo); - provSubscriptions.put(subId, provSubscription); - subinfo.put(subId, destinationInfo); - String egr = egrtab.get(subId); - if (egr != null) { - subId = pf.getPath(egr) + subId; - } - StringBuffer sb = feedTargets.get(feedId); - if (sb == null) { - sb = new StringBuffer(); - feedTargets.put(feedId, sb); - } - sb.append(' ').append(subId); - } - alldests = destInfos.toArray(new DestInfo[0]); - for (ProvFeed pfx : pd.getFeeds()) { - String fid = pfx.getId(); - Feed f = feeds.get(fid); - if (f != null) { - continue; - } - f = new Feed(); - feeds.put(fid, f); - f.createdDate = pfx.getCreatedDate(); - f.loginfo = pfx.getLogData(); - f.status = pfx.getStatus(); - /* - * AAF changes: TDP EPIC US# 307413 - * Passing aafInstance from ProvFeed to identify legacy/AAF feeds - */ - f.aafInstance = pfx.getAafInstance(); - Vector<SubnetMatcher> v1 = pfstab.get(fid); - if (v1 == null) { - f.subnets = new SubnetMatcher[0]; - } else { - f.subnets = v1.toArray(new SubnetMatcher[0]); - } - Hashtable<String, String> h1 = pfutab.get(fid); - if (h1 == null) { - h1 = new Hashtable<String, String>(); - } - f.authusers = h1; - Vector<Redirection> v2 = rdtab.get(fid); - if (v2 == null) { - f.redirections = new Redirection[0]; - } else { - f.redirections = v2.toArray(new Redirection[0]); - } - StringBuffer sb = feedTargets.get(fid); - if (sb == null) { - f.targets = new Target[0]; - } else { - f.targets = parseRouting(sb.toString()); - } - } - } - - /** - * Parse a target string into an array of targets - * - * @param routing Target string - * @return Array of targets. - */ - public Target[] parseRouting(String routing) { - routing = routing.trim(); - if ("".equals(routing)) { - return (new Target[0]); - } - String[] xx = routing.split("\\s+"); - Hashtable<String, Target> tmap = new Hashtable<String, Target>(); - HashSet<String> subset = new HashSet<String>(); - Vector<Target> tv = new Vector<Target>(); - Target[] ret = new Target[xx.length]; - for (int i = 0; i < xx.length; i++) { - String t = xx[i]; - int j = t.indexOf('/'); - if (j == -1) { - DestInfo di = subinfo.get(t); - if (di == null) { - tv.add(new Target(null, t)); - } else { - if (!subset.contains(t)) { - subset.add(t); - tv.add(new Target(di, null)); - } - } - } else { - String node = t.substring(0, j); - String rtg = t.substring(j + 1); - DestInfo di = nodeinfo.get(node); - if (di == null) { - tv.add(new Target(null, t)); - } else { - Target tt = tmap.get(node); - if (tt == null) { - tt = new Target(di, rtg); - tmap.put(node, tt); - tv.add(tt); - } else { - tt.addRouting(rtg); - } - } - } - } - return (tv.toArray(new Target[0])); - } - - /** - * Check whether this is a valid node-to-node transfer - * - * @param credentials Credentials offered by the supposed node - * @param ip IP address the request came from - */ - public boolean isAnotherNode(String credentials, String ip) { - IsFrom n = nodes.get(credentials); - return (n != null && n.isFrom(ip)); - } - - /** - * Check whether publication is allowed. - * - * @param feedid The ID of the feed being requested. - * @param credentials The offered credentials - * @param ip The requesting IP address - */ - public String isPublishPermitted(String feedid, String credentials, String ip) { - Feed f = feeds.get(feedid); - String nf = "Feed does not exist"; - if (f != null) { - nf = f.status; - } - if (nf != null) { - return (nf); - } - String user = f.authusers.get(credentials); - if (user == null) { - return ("Publisher not permitted for this feed"); - } - if (f.subnets.length == 0) { - return (null); - } - byte[] addr = NodeUtils.getInetAddress(ip); - for (SubnetMatcher snm : f.subnets) { - if (snm.matches(addr)) { - return (null); - } - } - return ("Publisher not permitted for this feed"); - } - - /** - * Check whether delete file is allowed. - * - * @param subId The ID of the subscription being requested. - */ - public boolean isDeletePermitted(String subId) { - ProvSubscription provSubscription = provSubscriptions.get(subId); - return provSubscription.isPrivilegedSubscriber(); - } - - /** - * Check whether publication is allowed for AAF Feed. - * @param feedid The ID of the feed being requested. - * @param ip The requesting IP address - */ - public String isPublishPermitted(String feedid, String ip) { - Feed f = feeds.get(feedid); - String nf = "Feed does not exist"; - if (f != null) { - nf = f.status; - } - if (nf != null) { - return(nf); - } - if (f.subnets.length == 0) { - return(null); - } - byte[] addr = NodeUtils.getInetAddress(ip); - for (SubnetMatcher snm: f.subnets) { - if (snm.matches(addr)) { - return(null); - } - } - return("Publisher not permitted for this feed"); - } - - /** - * Get authenticated user - */ - public String getAuthUser(String feedid, String credentials) { - return (feeds.get(feedid).authusers.get(credentials)); - } - - /** - * AAF changes: TDP EPIC US# 307413 - * Check AAF_instance for feed ID - * @param feedid The ID of the feed specified - */ - public String getAafInstance(String feedid) { - Feed f = feeds.get(feedid); - return f.aafInstance; - } - - /** - * Check if the request should be redirected to a different ingress node - */ - public String getIngressNode(String feedid, String user, String ip) { - Feed f = feeds.get(feedid); - if (f.redirections.length == 0) { - return (null); - } - byte[] addr = NodeUtils.getInetAddress(ip); - for (Redirection r : f.redirections) { - if (r.user != null && !user.equals(r.user)) { - continue; - } - if (r.snm != null && !r.snm.matches(addr)) { - continue; - } - for (String n : r.nodes) { - if (myname.equals(n)) { - return (null); - } - } - if (r.nodes.length == 0) { - return (null); - } - return (r.nodes[rrcntr++ % r.nodes.length]); - } - return (null); - } - - /** - * Get a provisioned configuration parameter - */ - public String getProvParam(String name) { - return (params.get(name)); - } - - /** - * Get all the DestInfos - */ - public DestInfo[] getAllDests() { - return (alldests); - } - - /** - * Get the targets for a feed - * - * @param feedid The feed ID - * @return The targets this feed should be delivered to - */ - public Target[] getTargets(String feedid) { - if (feedid == null) { - return (new Target[0]); - } - Feed f = feeds.get(feedid); - if (f == null) { - return (new Target[0]); - } - return (f.targets); - } - - /** - * Get the creation date for a feed - * @param feedid The feed ID - * @return the timestamp of creation date of feed id passed - */ - public String getCreatedDate(String feedid) { - Feed f = feeds.get(feedid); - return(f.createdDate); - } - - /** - * Get the feed ID for a subscription - * - * @param subid The subscription ID - * @return The feed ID - */ - public String getFeedId(String subid) { - DestInfo di = subinfo.get(subid); - if (di == null) { - return (null); - } - return (di.getLogData()); - } - - /** - * Get the spool directory for a subscription - * - * @param subid The subscription ID - * @return The spool directory - */ - public String getSpoolDir(String subid) { - DestInfo di = subinfo.get(subid); - if (di == null) { - return (null); - } - return (di.getSpool()); - } - - /** - * Get the Authorization value this node uses - * - * @return The Authorization header value for this node - */ - public String getMyAuth() { - return (myauth); - } - } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java index 884f7bf9..8a0b0b86 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java @@ -26,8 +26,6 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; - import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; @@ -35,6 +33,7 @@ import java.io.Reader; import java.net.URL; import java.util.Properties; import java.util.Timer; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; /** @@ -49,8 +48,9 @@ import java.util.Timer; */ public class NodeConfigManager implements DeliveryQueueHelper { - private static EELFLogger eelfLogger = EELFManager.getInstance() - .getLogger(NodeConfigManager.class); + private static final String CHANGE_ME = "changeme"; + private static final String NODE_CONFIG_MANAGER = "NodeConfigManager"; + private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class); private static NodeConfigManager base = new NodeConfigManager(); private Timer timer = new Timer("Node Configuration Timer", true); @@ -94,7 +94,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { private String eventlogsuffix; private String eventloginterval; private boolean followredirects; - private String [] enabledprotocols; + private String[] enabledprotocols; private String aafType; private String aafInstance; private String aafAction; @@ -103,13 +103,6 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** - * Get the default node configuration manager - */ - public static NodeConfigManager getInstance() { - return base; - } - - /** * Initialize the configuration of a Data Router node */ private NodeConfigManager() { @@ -120,8 +113,10 @@ public class NodeConfigManager implements DeliveryQueueHelper { drNodeProperties.load(new FileInputStream(System .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"))); } catch (Exception e) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e, System.getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties")); + NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER); + eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e, + System.getProperty("org.onap.dmaap.datarouter.node.properties", + "/opt/app/datartr/etc/node.properties")); } provurl = drNodeProperties.getProperty("ProvisioningURL", "https://dmaap-dr-prov:8443/internal/prov"); /* @@ -143,7 +138,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { try { provhost = (new URL(provurl)).getHost(); } catch (Exception e) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER); eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl); System.exit(1); } @@ -168,14 +163,14 @@ public class NodeConfigManager implements DeliveryQueueHelper { logretention = Long.parseLong(drNodeProperties.getProperty("LogRetention", "30")) * 86400000L; eventlogprefix = logdir + "/events"; eventlogsuffix = ".log"; - String redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat"); + redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat"); kstype = drNodeProperties.getProperty("KeyStoreType", "jks"); ksfile = drNodeProperties.getProperty("KeyStoreFile", "etc/keystore"); - kspass = drNodeProperties.getProperty("KeyStorePassword", "changeme"); - kpass = drNodeProperties.getProperty("KeyPassword", "changeme"); + kspass = drNodeProperties.getProperty("KeyStorePassword", CHANGE_ME); + kpass = drNodeProperties.getProperty("KeyPassword", CHANGE_ME); tstype = drNodeProperties.getProperty("TrustStoreType", "jks"); tsfile = drNodeProperties.getProperty("TrustStoreFile"); - tspass = drNodeProperties.getProperty("TrustStorePassword", "changeme"); + tspass = drNodeProperties.getProperty("TrustStorePassword", CHANGE_ME); if (tsfile != null && tsfile.length() > 0) { System.setProperty("javax.net.ssl.trustStoreType", tstype); System.setProperty("javax.net.ssl.trustStore", tsfile); @@ -185,7 +180,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { quiesce = new File(drNodeProperties.getProperty("QuiesceFile", "etc/SHUTDOWN")); myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass); if (myname == null) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER); eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile); eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile); System.exit(1); @@ -202,6 +197,13 @@ public class NodeConfigManager implements DeliveryQueueHelper { pfetcher.request(); } + /** + * Get the default node configuration manager + */ + public static NodeConfigManager getInstance() { + return base; + } + private void localconfig() { followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s"); @@ -218,52 +220,53 @@ public class NodeConfigManager implements DeliveryQueueHelper { try { initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) { - eelfLogger.error("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e); + eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e); } try { - waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000); + waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) + * 1000); } catch (Exception e) { - eelfLogger.error("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e); + eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e); } try { maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) { - eelfLogger.error("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e); + eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e); } try { expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) { - eelfLogger.error("Error parsing DELIVERY_MAX_AGE", e); + eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e); } try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) { - eelfLogger.error("Error parsing DELIVERY_RETRY_RATIO", e); + eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e); } try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) { - eelfLogger.error("Error parsing DELIVERY_THREADS", e); + eelfLogger.trace("Error parsing DELIVERY_THREADS", e); } try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) { - eelfLogger.error("Error parsing FAIR_FILE_LIMIT", e); + eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e); } try { fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) { - eelfLogger.error("Error parsing FAIR_TIME_LIMIT", e); + eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e); } try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) { - eelfLogger.error("Error parsing FREE_DISK_RED_PERCENT", e); + eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e); } try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) { - eelfLogger.error("Error parsing FREE_DISK_YELLOW_PERCENT", e); + eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e); } if (fdpstart < 0.01) { fdpstart = 0.01; @@ -286,14 +289,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak); localconfig(); configtasks.startRun(); - Runnable rr; - while ((rr = configtasks.next()) != null) { - try { - rr.run(); - } catch (Exception e) { - eelfLogger.error("NODE0518 Exception fetchconfig: " + e); - } - } + runTasks(); } catch (Exception e) { NodeUtils.setIpAndFqdnForEelf("fetchconfigs"); eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString()); @@ -302,6 +298,17 @@ public class NodeConfigManager implements DeliveryQueueHelper { } } + private void runTasks() { + Runnable rr; + while ((rr = configtasks.next()) != null) { + try { + rr.run(); + } catch (Exception e) { + eelfLogger.error("NODE0518 Exception fetchconfig: " + e); + } + } + } + /** * Process a gofetch request from a particular IP address. If the IP address is not an IP address we would go to to * fetch the provisioning data, ignore the request. If the data has been fetched very recently (default 10 @@ -381,7 +388,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { * @return True if the IP and credentials are valid for the specified feed. */ public String isPublishPermitted(String feedid, String ip) { - return(config.isPublishPermitted(feedid, ip)); + return (config.isPublishPermitted(feedid, ip)); } /** @@ -396,12 +403,12 @@ public class NodeConfigManager implements DeliveryQueueHelper { } /** - * AAF changes: TDP EPIC US# 307413 - * Check AAF_instance for feed ID in NodeConfig + * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID in NodeConfig + * * @param feedid The ID of the feed specified */ public String getAafInstance(String feedid) { - return(config.getAafInstance(feedid)); + return (config.getAafInstance(feedid)); } /** @@ -597,11 +604,12 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Get the creation date for a feed + * * @param feedid The feed ID * @return the timestamp of creation date of feed id passed */ public String getCreatedDate(String feedid) { - return(config.getCreatedDate(feedid)); + return (config.getCreatedDate(feedid)); } /** @@ -774,10 +782,11 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Disable and enable protocols - * */ + */ public String[] getEnabledprotocols() { return enabledprotocols; } + public void setEnabledprotocols(String[] enabledprotocols) { this.enabledprotocols = enabledprotocols.clone(); } @@ -805,34 +814,42 @@ public class NodeConfigManager implements DeliveryQueueHelper { public String getAafType() { return aafType; } + public void setAafType(String aafType) { this.aafType = aafType; } + public String getAafInstance() { return aafInstance; } + public void setAafInstance(String aafInstance) { this.aafInstance = aafInstance; } + public String getAafAction() { return aafAction; } + public void setAafAction(String aafAction) { this.aafAction = aafAction; } + /* * Get aafURL from SWM variable * */ public String getAafURL() { return aafURL; } + public void setAafURL(String aafURL) { this.aafURL = aafURL; } - public boolean getCadiEnabeld() { + public boolean getCadiEnabled() { return cadiEnabled; } + public void setCadiEnabled(boolean cadiEnabled) { this.cadiEnabled = cadiEnabled; } @@ -847,7 +864,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { try { String type = getAafType(); String action = getAafAction(); - if (aafInstance == null || aafInstance.equals("")) { + if ("".equals(aafInstance)) { aafInstance = getAafInstance(); } return type + "|" + aafInstance + "|" + action; diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java index 7a2691e4..9eaea283 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java @@ -26,74 +26,38 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumSet; +import java.util.Properties; +import javax.servlet.DispatcherType; +import javax.servlet.ServletException; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.server.*; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.onap.aaf.cadi.PropAccess; -import javax.servlet.DispatcherType; -import java.io.IOException; -import java.io.InputStream; -import java.util.EnumSet; -import java.util.Properties; - /** * The main starting point for the Data Router node */ public class NodeMain { - private NodeMain() { - } - private static EELFLogger nodeMainLogger = EELFManager.getInstance().getLogger(NodeMain.class); - - class Inner { - InputStream getCadiProps() { - InputStream in = null; - try { - in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties"); - } catch (Exception e) { - nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e); - } - return in; - } - } - - private static class WaitForConfig implements Runnable { - - private NodeConfigManager localNodeConfigManager; - - WaitForConfig(NodeConfigManager ncm) { - this.localNodeConfigManager = ncm; - } - - public synchronized void run() { - notify(); - } - - synchronized void waitForConfig() { - localNodeConfigManager.registerConfigTask(this); - while (!localNodeConfigManager.isConfigured()) { - nodeMainLogger.info("NODE0003 Waiting for Node Configuration"); - try { - wait(); - } catch (Exception exception) { - nodeMainLogger - .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(), - exception); - } - } - localNodeConfigManager.deregisterConfigTask(this); - nodeMainLogger.info("NODE0004 Node Configuration Data Received"); - } - } - private static Delivery delivery; private static NodeConfigManager nodeConfigManager; + private NodeMain() { + } + /** * Reset the retry timer for a subscription */ @@ -123,7 +87,8 @@ public class NodeMain { httpConfiguration.setRequestHeaderSize(2048); // HTTP connector - try (ServerConnector httpServerConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration))) { + try (ServerConnector httpServerConnector = new ServerConnector(server, + new HttpConnectionFactory(httpConfiguration))) { httpServerConnector.setPort(nodeConfigManager.getHttpPort()); httpServerConnector.setIdleTimeout(2000); @@ -147,9 +112,12 @@ public class NodeMain { sslContextFactory.addExcludeProtocols("SSLv3"); sslContextFactory.setIncludeProtocols(nodeConfigManager.getEnabledprotocols()); - nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" + String.join(",", sslContextFactory.getExcludeProtocols())); - nodeMainLogger.info("NODE00004 Supported protocols node server:-" + String.join(",", sslContextFactory.getIncludeProtocols())); - nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" + String.join(",", sslContextFactory.getExcludeCipherSuites())); + nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" + + String.join(",", sslContextFactory.getExcludeProtocols())); + nodeMainLogger.info("NODE00004 Supported protocols node server:-" + + String.join(",", sslContextFactory.getIncludeProtocols())); + nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" + + String.join(",", sslContextFactory.getExcludeCipherSuites())); HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration); httpsConfiguration.setRequestHeaderSize(8192); @@ -174,20 +142,8 @@ public class NodeMain { servletContextHandler.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*"); //CADI Filter activation check - if (nodeConfigManager.getCadiEnabeld()) { - Properties cadiProperties = new Properties(); - try { - Inner obj = new NodeMain().new Inner(); - InputStream in = obj.getCadiProps(); - cadiProperties.load(in); - } catch (IOException e1) { - nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties ", e1); - } - cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL()); - nodeMainLogger.info("NODE00005 aaf_url set to - " + cadiProperties.getProperty("aaf_url")); - - PropAccess access = new PropAccess(cadiProperties); - servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet.of(DispatcherType.REQUEST)); + if (nodeConfigManager.getCadiEnabled()) { + enableCadi(servletContextHandler); } server.setHandler(servletContextHandler); @@ -199,9 +155,68 @@ public class NodeMain { server.start(); nodeMainLogger.info("NODE00006 Node Server started-" + server.getState()); } catch (Exception e) { - nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e); + nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e.getMessage()); } server.join(); nodeMainLogger.info("NODE00007 Node Server joined - " + server.getState()); } + + private static void enableCadi(ServletContextHandler servletContextHandler) throws ServletException { + Properties cadiProperties = new Properties(); + try { + Inner obj = new NodeMain().new Inner(); + InputStream in = obj.getCadiProps(); + cadiProperties.load(in); + } catch (IOException e1) { + nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties " + e1.getMessage()); + } + cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL()); + nodeMainLogger.info("NODE00005 aaf_url set to - " + cadiProperties.getProperty("aaf_url")); + + PropAccess access = new PropAccess(cadiProperties); + servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet + .of(DispatcherType.REQUEST)); + } + + private static class WaitForConfig implements Runnable { + + private NodeConfigManager localNodeConfigManager; + + WaitForConfig(NodeConfigManager ncm) { + this.localNodeConfigManager = ncm; + } + + public synchronized void run() { + notify(); + } + + synchronized void waitForConfig() { + localNodeConfigManager.registerConfigTask(this); + while (!localNodeConfigManager.isConfigured()) { + nodeMainLogger.info("NODE0003 Waiting for Node Configuration"); + try { + wait(); + } catch (Exception exception) { + nodeMainLogger + .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(), + exception); + } + } + localNodeConfigManager.deregisterConfigTask(this); + nodeMainLogger.info("NODE0004 Node Configuration Data Received"); + } + } + + class Inner { + + InputStream getCadiProps() { + InputStream in = null; + try { + in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties"); + } catch (Exception e) { + nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e); + } + return in; + } + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java index d665080b..3f2fc09f 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java @@ -24,15 +24,10 @@ package org.onap.dmaap.datarouter.node; +import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.jetbrains.annotations.Nullable; -import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; -import org.slf4j.MDC; - -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -45,8 +40,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Enumeration; import java.util.regex.Pattern; - -import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.jetbrains.annotations.Nullable; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import org.slf4j.MDC; /** * Servlet for handling all http and https requests to the data router node @@ -61,23 +60,27 @@ import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError; */ public class NodeServlet extends HttpServlet { + private static final String FROM = " from "; + private static final String INVALID_REQUEST_URI = "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."; + private static final String IO_EXCEPTION = "IOException"; + private static final String ON_BEHALF_OF = "X-DMAAP-DR-ON-BEHALF-OF"; private static NodeConfigManager config; - private static Pattern MetaDataPattern; + private static Pattern metaDataPattern; private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeServlet.class); - private final Delivery delivery; static { final String ws = "\\s*"; // assume that \\ and \" have been replaced by X final String string = "\"[^\"]*\""; - //String string = "\"(?:[^\"\\\\]|\\\\.)*\""; final String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?"; final String value = "(?:" + string + "|" + number + "|null|true|false)"; final String item = string + ws + ":" + ws + value + ws; final String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws; - MetaDataPattern = Pattern.compile(object, Pattern.DOTALL); + metaDataPattern = Pattern.compile(object, Pattern.DOTALL); } + private final Delivery delivery; + NodeServlet(Delivery delivery) { this.delivery = delivery; } @@ -91,7 +94,7 @@ public class NodeServlet extends HttpServlet { eelfLogger.info("NODE0101 Node Servlet Configured"); } - private boolean down(HttpServletResponse resp) throws IOException { + private boolean down(HttpServletResponse resp) { if (config.isShutdown() || !config.isConfigured()) { sendResponseError(resp, HttpServletResponse.SC_SERVICE_UNAVAILABLE, eelfLogger); eelfLogger.info("NODE0102 Rejecting request: Service is being quiesced"); @@ -109,15 +112,10 @@ public class NodeServlet extends HttpServlet { NodeUtils.setRequestIdAndInvocationId(req); eelfLogger.info(EelfMsgs.ENTRY); try { - eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"), + eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF), getIdFromPath(req) + ""); - try { - if (down(resp)) { - return; - } - - } catch (IOException ioe) { - eelfLogger.error("IOException", ioe); + if (down(resp)) { + return; } String path = req.getPathInfo(); String qs = req.getQueryString(); @@ -138,7 +136,7 @@ public class NodeServlet extends HttpServlet { } } - eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); + eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + FROM + ip); sendResponseError(resp, HttpServletResponse.SC_NOT_FOUND, eelfLogger); } finally { eelfLogger.info(EelfMsgs.EXIT); @@ -153,12 +151,12 @@ public class NodeServlet extends HttpServlet { NodeUtils.setIpAndFqdnForEelf("doPut"); NodeUtils.setRequestIdAndInvocationId(req); eelfLogger.info(EelfMsgs.ENTRY); - eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"), + eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF), getIdFromPath(req) + ""); try { common(req, resp, true); } catch (IOException ioe) { - eelfLogger.error("IOException", ioe); + eelfLogger.error(IO_EXCEPTION, ioe); eelfLogger.info(EelfMsgs.EXIT); } } @@ -171,25 +169,30 @@ public class NodeServlet extends HttpServlet { NodeUtils.setIpAndFqdnForEelf("doDelete"); NodeUtils.setRequestIdAndInvocationId(req); eelfLogger.info(EelfMsgs.ENTRY); - eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"), + eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF), getIdFromPath(req) + ""); try { common(req, resp, false); } catch (IOException ioe) { - eelfLogger.error("IOException", ioe); + eelfLogger.error(IO_EXCEPTION, ioe); eelfLogger.info(EelfMsgs.EXIT); } } private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException { + final String PUBLISH = "/publish/"; + final String INTERNAL_PUBLISH = "/internal/publish/"; + final String HTTPS = "https://"; + final String USER = " user "; String fileid = getFileId(req, resp); - if (fileid == null) return; + if (fileid == null) { + return; + } String feedid = null; String user = null; String ip = req.getRemoteAddr(); String lip = req.getLocalAddr(); String pubid = null; - String xpubid = null; String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; Target[] targets = null; boolean isAAFFeed = false; @@ -199,17 +202,17 @@ public class NodeServlet extends HttpServlet { } String credentials = req.getHeader("Authorization"); if (credentials == null) { - eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req + eelfLogger.error("NODE0306 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required"); eelfLogger.info(EelfMsgs.EXIT); return; } - if (fileid.startsWith("/publish/")) { + if (fileid.startsWith(PUBLISH)) { fileid = fileid.substring(9); int i = fileid.indexOf('/'); if (i == -1 || i == fileid.length() - 1) { - eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req + eelfLogger.error("NODE0205 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid."); @@ -218,18 +221,19 @@ public class NodeServlet extends HttpServlet { } feedid = fileid.substring(0, i); - if (config.getCadiEnabeld()) { + if (config.getCadiEnabled()) { String path = req.getPathInfo(); if (!path.startsWith("/internal") && feedid != null) { String aafInstance = config.getAafInstance(feedid); - if (!(aafInstance.equalsIgnoreCase("legacy"))) { + if (!("legacy".equalsIgnoreCase(aafInstance))) { isAAFFeed = true; String permission = config.getPermission(aafInstance); eelfLogger.info("NodeServlet.common() permission string - " + permission); //Check in CADI Framework API if user has AAF permission or not if (!req.isUserInRole(permission)) { String message = "AAF disallows access to permission string - " + permission; - eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + eelfLogger.error("NODE0307 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + + FROM + req.getRemoteAddr()); resp.sendError(HttpServletResponse.SC_FORBIDDEN, message); eelfLogger.info(EelfMsgs.EXIT); return; @@ -240,9 +244,8 @@ public class NodeServlet extends HttpServlet { fileid = fileid.substring(i + 1); pubid = config.getPublishId(); - xpubid = req.getHeader("X-DMAAP-DR-PUBLISH-ID"); targets = config.getTargets(feedid); - } else if (fileid.startsWith("/internal/publish/")) { + } else if (fileid.startsWith(INTERNAL_PUBLISH)) { if (!config.isAnotherNode(credentials, ip)) { eelfLogger.error("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip); resp.sendError(HttpServletResponse.SC_FORBIDDEN); @@ -254,18 +257,18 @@ public class NodeServlet extends HttpServlet { user = "datartr"; // SP6 : Added usr as datartr to avoid null entries for internal routing targets = config.parseRouting(req.getHeader("X-DMAAP-DR-ROUTING")); } else { - eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req + eelfLogger.error("NODE0204 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_NOT_FOUND, - "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); + INVALID_REQUEST_URI); eelfLogger.info(EelfMsgs.EXIT); return; } if (fileid.indexOf('/') != -1) { - eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req + eelfLogger.error("NODE0202 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_NOT_FOUND, - "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); + INVALID_REQUEST_URI); eelfLogger.info(EelfMsgs.EXIT); return; } @@ -278,14 +281,16 @@ public class NodeServlet extends HttpServlet { if (xp != 443) { hp = hp + ":" + xp; } - String logurl = "https://" + hp + "/internal/publish/" + fileid; + String logurl = HTTPS + hp + INTERNAL_PUBLISH + fileid; if (feedid != null) { - logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid; + logurl = HTTPS + hp + PUBLISH + feedid + "/" + fileid; //Cadi code starts if (!isAAFFeed) { String reason = config.isPublishPermitted(feedid, credentials, ip); if (reason != null) { - eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason " + PathUtil.cleanString(reason)); + eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil + .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil + .cleanString(ip) + " reason " + PathUtil.cleanString(reason)); resp.sendError(HttpServletResponse.SC_FORBIDDEN, reason); eelfLogger.info(EelfMsgs.EXIT); return; @@ -294,9 +299,12 @@ public class NodeServlet extends HttpServlet { } else { String reason = config.isPublishPermitted(feedid, ip); if (reason != null) { - eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason Invalid AAF user- " + PathUtil.cleanString(reason)); + eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil + .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil + .cleanString(ip) + " reason Invalid AAF user- " + PathUtil.cleanString(reason)); String message = "Invalid AAF user- " + PathUtil.cleanString(reason); - eelfLogger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + PathUtil.cleanString(req.getPathInfo()) + " from " + PathUtil.cleanString(req.getRemoteAddr())); + eelfLogger.info("NODE0308 Rejecting unauthenticated PUT or DELETE of " + PathUtil + .cleanString(req.getPathInfo()) + FROM + PathUtil.cleanString(req.getRemoteAddr())); resp.sendError(HttpServletResponse.SC_FORBIDDEN, message); return; } @@ -316,25 +324,26 @@ public class NodeServlet extends HttpServlet { if (iport != 443) { port = ":" + iport; } - String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid; - eelfLogger.info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil.cleanString(redirto)); //Fortify scan fixes - log forging + String redirto = HTTPS + newnode + port + PUBLISH + feedid + "/" + fileid; + eelfLogger + .info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + USER + + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil + .cleanString(redirto)); //Fortify scan fixes - log forging resp.sendRedirect(PathUtil.cleanString(redirto)); //Fortify scan fixes-open redirect - 2 issues eelfLogger.info(EelfMsgs.EXIT); return; } resp.setHeader("X-DMAAP-DR-PUBLISH-ID", pubid); } - if (req.getPathInfo().startsWith("/internal/publish/")) { + if (req.getPathInfo().startsWith(INTERNAL_PUBLISH)) { feedid = req.getHeader("X-DMAAP-DR-FEED-ID"); } String fbase = PathUtil.cleanString(config.getSpoolDir() + "/" + pubid); //Fortify scan fixes-Path manipulation File data = new File(fbase); File meta = new File(fbase + ".M"); - OutputStream dos = null; Writer mw = null; - InputStream is = null; try { - StringBuffer mx = new StringBuffer(); + StringBuilder mx = new StringBuilder(); mx.append(req.getMethod()).append('\t').append(fileid).append('\n'); Enumeration hnames = req.getHeaderNames(); String ctype = null; @@ -364,13 +373,17 @@ public class NodeServlet extends HttpServlet { } if ("x-dmaap-dr-meta".equals(hnlc)) { if (hv.length() > 4096) { - eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging + eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed " + + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip " + + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long"); eelfLogger.info(EelfMsgs.EXIT); return; } - if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) { - eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging + if (!metaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) { + eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed " + + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip " + + PathUtil.cleanString(ip)); //Fortify scan fixes - log forging resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata"); eelfLogger.info(EelfMsgs.EXIT); return; @@ -388,28 +401,12 @@ public class NodeServlet extends HttpServlet { } mx.append("X-DMAAP-DR-RECEIVED\t").append(rcvd).append('\n'); String metadata = mx.toString(); - byte[] buf = new byte[1024 * 1024]; - int i; - try { - is = req.getInputStream(); - dos = new FileOutputStream(data); - while ((i = is.read(buf)) > 0) { - dos.write(buf, 0, i); - } - is.close(); - is = null; - dos.close(); - dos = null; - } catch (IOException ioe) { - long exlen = -1; - try { - exlen = Long.parseLong(req.getHeader("Content-Length")); - } catch (Exception e) { - eelfLogger.error("NODE0529 Exception common: " + e); - } - StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage()); - eelfLogger.info(EelfMsgs.EXIT); - throw ioe; + long exlen = getExlen(req); + String message = writeInputStreamToFile(req, data); + if (message != null) { + StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, + message); + throw new IOException(message); } Path dpath = Paths.get(fbase); for (Target t : targets) { @@ -418,7 +415,8 @@ public class NodeServlet extends HttpServlet { // TODO: unknown destination continue; } - String dbase = PathUtil.cleanString(di.getSpool() + "/" + pubid); //Fortify scan fixes-Path Manipulation + String dbase = PathUtil + .cleanString(di.getSpool() + "/" + pubid); //Fortify scan fixes-Path Manipulation Files.createLink(Paths.get(dbase), dpath); mw = new FileWriter(meta); mw.write(metadata); @@ -427,45 +425,28 @@ public class NodeServlet extends HttpServlet { } mw.close(); meta.renameTo(new File(dbase + ".M")); - } resp.setStatus(HttpServletResponse.SC_NO_CONTENT); try { resp.getOutputStream().close(); } catch (IOException ioe) { - long exlen = -1; - try { - exlen = Long.parseLong(req.getHeader("Content-Length")); - } catch (Exception e) { - eelfLogger.error("NODE00000 Exception common", e); - } - StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage()); + StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, + ioe.getMessage()); //Fortify scan fixes - log forging - eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe.toString(), ioe); - + eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid) + + USER + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe + .toString(), ioe); throw ioe; } - StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT); + StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, + HttpServletResponse.SC_NO_CONTENT); } catch (IOException ioe) { - eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe); + eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + USER + user + + " ip " + ip + " " + ioe.toString(), ioe); eelfLogger.info(EelfMsgs.EXIT); throw ioe; } finally { - if (is != null) { - try { - is.close(); - } catch (Exception e) { - eelfLogger.error("NODE0530 Exception common: " + e); - } - } - if (dos != null) { - try { - dos.close(); - } catch (Exception e) { - eelfLogger.error("NODE0531 Exception common: " + e); - } - } if (mw != null) { try { mw.close(); @@ -486,12 +467,39 @@ public class NodeServlet extends HttpServlet { } } + private String writeInputStreamToFile(HttpServletRequest req, File data) { + byte[] buf = new byte[1024 * 1024]; + int i; + try (OutputStream dos = new FileOutputStream(data); + InputStream is = req.getInputStream()) { + while ((i = is.read(buf)) > 0) { + dos.write(buf, 0, i); + } + } catch (IOException ioe) { + eelfLogger.error("NODE0530 Exception common: " + ioe, ioe); + eelfLogger.info(EelfMsgs.EXIT); + return ioe.getMessage(); + } + return null; + } + + private long getExlen(HttpServletRequest req) { + long exlen = -1; + try { + exlen = Long.parseLong(req.getHeader("Content-Length")); + } catch (Exception e) { + eelfLogger.error("NODE0529 Exception common: " + e); + } + return exlen; + } + private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) { + final String FROM_DR_MESSAGE = ".M) from DR Node: "; try { fileid = fileid.substring(8); int i = fileid.indexOf('/'); if (i == -1 || i == fileid.length() - 1) { - eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req + eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <subId>/<pubId>."); @@ -501,7 +509,7 @@ public class NodeServlet extends HttpServlet { String subscriptionId = fileid.substring(0, i); int subId = Integer.parseInt(subscriptionId); pubid = fileid.substring(i + 1); - String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: " + String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE + config.getMyName() + "."; int subIdDir = subId - (subId % 100); if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) { @@ -509,7 +517,7 @@ public class NodeServlet extends HttpServlet { } boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid); if (result) { - eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: " + eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + FROM_DR_MESSAGE + config.getMyName()); resp.setStatus(HttpServletResponse.SC_OK); eelfLogger.info(EelfMsgs.EXIT); @@ -519,7 +527,7 @@ public class NodeServlet extends HttpServlet { eelfLogger.info(EelfMsgs.EXIT); } } catch (IOException ioe) { - eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: " + eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE + config.getMyName(), ioe); eelfLogger.info(EelfMsgs.EXIT); } @@ -533,7 +541,7 @@ public class NodeServlet extends HttpServlet { } if (!req.isSecure()) { eelfLogger.error( - "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req + "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); eelfLogger.info(EelfMsgs.EXIT); @@ -541,17 +549,18 @@ public class NodeServlet extends HttpServlet { } String fileid = req.getPathInfo(); if (fileid == null) { - eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req + eelfLogger.error("NODE0201 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req .getRemoteAddr()); resp.sendError(HttpServletResponse.SC_NOT_FOUND, - "Invalid request URI. Expecting <feed-publishing-url>/<fileid>."); + INVALID_REQUEST_URI); eelfLogger.info(EelfMsgs.EXIT); return null; } return fileid; } - private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException { + private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) + throws IOException { try { boolean deletePermitted = config.isDeletePermitted(subscriptionId); if (!deletePermitted) { diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java index da84ae54..e79e2ee3 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java @@ -24,15 +24,22 @@ package org.onap.dmaap.datarouter.node; +import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; +import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN; +import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS; +import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; - import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetAddress; import java.security.KeyStore; +import java.security.KeyStoreException; import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.text.SimpleDateFormat; import java.util.Date; @@ -47,8 +54,6 @@ import org.apache.commons.lang3.StringUtils; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; -import static com.att.eelf.configuration.Configuration.*; - /** * Utility functions for the data router node */ @@ -63,7 +68,7 @@ public class NodeUtils { /** * Base64 encode a byte array * - * @param raw The bytes to be encoded + * @param raw The bytes to be encoded * @return The encoded string */ public static String base64Encode(byte[] raw) { @@ -117,11 +122,7 @@ public class NodeUtils { KeyStore ks; try { ks = KeyStore.getInstance(kstype); - try (FileInputStream fileInputStream = new FileInputStream(ksfile)) { - ks.load(fileInputStream, kspass.toCharArray()); - } catch (IOException ioException) { - eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(), - ioException); + if (loadKeyStore(ksfile, kspass, ks)) { return (null); } } catch (Exception e) { @@ -142,22 +143,9 @@ public class NodeUtils { try { Enumeration<String> aliases = ks.aliases(); while (aliases.hasMoreElements()) { - String s = aliases.nextElement(); - if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) { - X509Certificate c = (X509Certificate) ks.getCertificate(s); - if (c != null) { - String subject = c.getSubjectX500Principal().getName(); - String[] parts = subject.split(","); - if (parts.length < 1) { - return (null); - } - subject = parts[5].trim(); - if (!subject.startsWith("CN=")) { - return (null); - - } - return (subject.substring(3)); - } + String name = getNameFromSubject(ks, aliases); + if (name != null) { + return name; } } } catch (Exception e) { @@ -290,19 +278,51 @@ public class NodeUtils { /** * Method to check to see if file is of type gzip * - * @param file The name of the file to be checked - * @return True if the file is of type gzip + * @param file The name of the file to be checked + * @return True if the file is of type gzip */ - public static boolean isFiletypeGzip(File file){ - try(FileInputStream fileInputStream = new FileInputStream(file); - GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) { + public static boolean isFiletypeGzip(File file) { + try (FileInputStream fileInputStream = new FileInputStream(file); + GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) { return true; - }catch (IOException e){ + } catch (IOException e) { eelfLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e); return false; } } + private static boolean loadKeyStore(String ksfile, String kspass, KeyStore ks) + throws NoSuchAlgorithmException, CertificateException { + try (FileInputStream fileInputStream = new FileInputStream(ksfile)) { + ks.load(fileInputStream, kspass.toCharArray()); + } catch (IOException ioException) { + eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(), + ioException); + return true; + } + return false; + } + + + private static String getNameFromSubject(KeyStore ks, Enumeration<String> aliases) throws KeyStoreException { + String s = aliases.nextElement(); + if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) { + X509Certificate c = (X509Certificate) ks.getCertificate(s); + if (c != null) { + String subject = c.getSubjectX500Principal().getName(); + String[] parts = subject.split(","); + if (parts.length < 1) { + return null; + } + subject = parts[5].trim(); + if (!subject.startsWith("CN=")) { + return null; + } + return subject.substring(3); + } + } + return null; + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java index fec2ca39..d8beab5a 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java @@ -24,9 +24,11 @@ package org.onap.dmaap.datarouter.node; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; -import java.util.Hashtable; -import java.util.Vector; +import org.jetbrains.annotations.Nullable; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop; /** * Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to @@ -35,16 +37,41 @@ import java.util.Vector; public class PathFinder { - private static class Hop { + private ArrayList<String> errors = new ArrayList<>(); + private HashMap<String, String> routes = new HashMap<>(); - boolean mark; - boolean bad; - NodeConfig.ProvHop basis; + /** + * Find routes from a specified origin to all of the nodes given a set of specified next hops. + * + * @param origin where we start + * @param nodes where we can go + * @param hops detours along the way + */ + public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) { + HashSet<String> known = new HashSet<>(); + HashMap<String, HashMap<String, Hop>> ht = new HashMap<>(); + for (String n : nodes) { + known.add(n); + ht.put(n, new HashMap<>()); + } + for (NodeConfig.ProvHop ph : hops) { + Hop h = getHop(known, ht, ph); + if (h == null) { + continue; + } + if (ph.getVia().equals(ph.getTo())) { + errors.add(ph + " gives destination as via"); + h.bad = true; + } + } + for (String n : known) { + if (n.equals(origin)) { + routes.put(n, ""); + } + routes.put(n, plot(origin, n, ht.get(n)) + "/"); + } } - private Vector<String> errors = new Vector<String>(); - private Hashtable<String, String> routes = new Hashtable<String, String>(); - /** * Get list of errors encountered while finding paths * @@ -68,13 +95,12 @@ public class PathFinder { return (ret); } - private String plot(String from, String to, Hashtable<String, Hop> info) { + private String plot(String from, String to, HashMap<String, Hop> info) { Hop nh = info.get(from); if (nh == null || nh.bad) { return (to); } if (nh.mark) { - // loop detected; while (!nh.bad) { nh.bad = true; errors.add(nh.basis + " is part of a cycle"); @@ -91,55 +117,38 @@ public class PathFinder { return (nh.basis.getVia() + "/" + x); } - /** - * Find routes from a specified origin to all of the nodes given a set of specified next hops. - * - * @param origin where we start - * @param nodes where we can go - * @param hops detours along the way - */ - public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) { - HashSet<String> known = new HashSet<String>(); - Hashtable<String, Hashtable<String, Hop>> ht = new Hashtable<String, Hashtable<String, Hop>>(); - for (String n : nodes) { - known.add(n); - ht.put(n, new Hashtable<String, Hop>()); + @Nullable + private Hop getHop(HashSet<String> known, HashMap<String, HashMap<String, Hop>> ht, ProvHop ph) { + if (!known.contains(ph.getFrom())) { + errors.add(ph + " references unknown from node"); + return null; } - for (NodeConfig.ProvHop ph : hops) { - if (!known.contains(ph.getFrom())) { - errors.add(ph + " references unknown from node"); - continue; - } - if (!known.contains(ph.getTo())) { - errors.add(ph + " references unknown destination node"); - continue; - } - Hashtable<String, Hop> ht2 = ht.get(ph.getTo()); - Hop h = ht2.get(ph.getFrom()); - if (h != null) { - h.bad = true; - errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia()); - continue; - } - h = new Hop(); - h.basis = ph; - ht2.put(ph.getFrom(), h); - if (!known.contains(ph.getVia())) { - errors.add(ph + " references unknown via node"); - h.bad = true; - continue; - } - if (ph.getVia().equals(ph.getTo())) { - errors.add(ph + " gives destination as via"); - h.bad = true; - continue; - } + if (!known.contains(ph.getTo())) { + errors.add(ph + " references unknown destination node"); + return null; } - for (String n : known) { - if (n.equals(origin)) { - routes.put(n, ""); - } - routes.put(n, plot(origin, n, ht.get(n)) + "/"); + HashMap<String, Hop> ht2 = ht.get(ph.getTo()); + Hop h = ht2.get(ph.getFrom()); + if (h != null) { + h.bad = true; + errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia()); + return null; } + h = new Hop(); + h.basis = ph; + ht2.put(ph.getFrom(), h); + if (!known.contains(ph.getVia())) { + errors.add(ph + " references unknown via node"); + h.bad = true; + return null; + } + return h; + } + + private static class Hop { + + boolean mark; + boolean bad; + NodeConfig.ProvHop basis; } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java index a4034410..16f8033b 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java @@ -1,65 +1,82 @@ -/**
- * -
+/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * <p>
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dmaap.datarouter.node;
/**
* FORTIFY SCAN FIXES
* <p>This Utility is used for Fortify fixes. It Validates the path url formed from
- * the string passed in the request parameters.</p>
- *
+ * the string passed in the request parameters.</p>
*/
class PathUtil {
+ private PathUtil() {
+ throw new IllegalStateException("Utility Class");
+ }
+
/**
* This method takes String as the parameter and return the filtered path string.
+ *
* @param aString String to clean
* @return A cleaned String
*/
static String cleanString(String aString) {
- if (aString == null) return null;
- String cleanString = "";
+ if (aString == null) {
+ return null;
+ }
+ StringBuilder cleanString = new StringBuilder();
for (int i = 0; i < aString.length(); ++i) {
- cleanString += cleanChar(aString.charAt(i));
+ cleanString.append(cleanChar(aString.charAt(i)));
}
- return cleanString;
+ return cleanString.toString();
}
/**
* This method filters the valid special characters in path string.
+ *
* @param aChar The char to be cleaned
* @return The cleaned char
*/
private static char cleanChar(char aChar) {
// 0 - 9
for (int i = 48; i < 58; ++i) {
- if (aChar == i) return (char) i;
+ if (aChar == i) {
+ return (char) i;
+ }
}
// 'A' - 'Z'
for (int i = 65; i < 91; ++i) {
- if (aChar == i) return (char) i;
+ if (aChar == i) {
+ return (char) i;
+ }
}
// 'a' - 'z'
for (int i = 97; i < 123; ++i) {
- if (aChar == i) return (char) i;
+ if (aChar == i) {
+ return (char) i;
+ }
}
+ return getValidCharacter(aChar);
+ }
+
+ private static char getValidCharacter(char aChar) {
// other valid characters
switch (aChar) {
case '/':
@@ -82,7 +99,8 @@ class PathUtil { return '_';
case ' ':
return ' ';
+ default:
+ return '%';
}
- return '%';
}
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java index 1af7dda4..bb9ddc3b 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java @@ -24,35 +24,91 @@ package org.onap.dmaap.datarouter.node; -import java.io.*; -import java.util.*; - -import org.json.*; -import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; +import java.io.IOException; +import java.io.Reader; +import java.util.ArrayList; +import org.jetbrains.annotations.Nullable; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeed; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedSubnet; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedUser; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceEgress; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceIngress; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvNode; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvParam; +import org.onap.dmaap.datarouter.node.NodeConfig.ProvSubscription; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; /** * Parser for provisioning data from the provisioning server. * <p> - * The ProvData class uses a Reader for the text configuration from the - * provisioning server to construct arrays of raw configuration entries. + * The ProvData class uses a Reader for the text configuration from the provisioning server to construct arrays of raw + * configuration entries. */ public class ProvData { + + private static final String FEED_ID = "feedid"; private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(ProvData.class); - private NodeConfig.ProvNode[] pn; - private NodeConfig.ProvParam[] pp; - private NodeConfig.ProvFeed[] pf; - private NodeConfig.ProvFeedUser[] pfu; - private NodeConfig.ProvFeedSubnet[] pfsn; - private NodeConfig.ProvSubscription[] ps; - private NodeConfig.ProvForceIngress[] pfi; - private NodeConfig.ProvForceEgress[] pfe; - private NodeConfig.ProvHop[] ph; - - private static String[] gvasa(JSONArray a, int index) { - return (gvasa(a.get(index))); + private NodeConfig.ProvNode[] provNodes; + private NodeConfig.ProvParam[] provParams; + private NodeConfig.ProvFeed[] provFeeds; + private NodeConfig.ProvFeedUser[] provFeedUsers; + private NodeConfig.ProvFeedSubnet[] provFeedSubnets; + private NodeConfig.ProvSubscription[] provSubscriptions; + private NodeConfig.ProvForceIngress[] provForceIngresses; + private NodeConfig.ProvForceEgress[] provForceEgresses; + private NodeConfig.ProvHop[] provHops; + + /** + * Construct raw provisioing data entries from the text (JSON) provisioning document received from the provisioning + * server + * + * @param r The reader for the JSON text. + */ + public ProvData(Reader r) throws IOException { + ArrayList<ProvNode> provNodes1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvParam> provParams1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvFeed> provFeeds1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvFeedUser> provFeedUsers1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvFeedSubnet> provFeedSubnets1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvSubscription> provSubscriptions1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvForceIngress> provForceIngresses1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvForceEgress> provForceEgresses1 = new ArrayList<>(); + ArrayList<NodeConfig.ProvHop> provHops1 = new ArrayList<>(); + try { + JSONTokener jtx = new JSONTokener(r); + JSONObject jcfg = new JSONObject(jtx); + char c = jtx.nextClean(); + if (c != '\0') { + throw new JSONException("Spurious characters following configuration"); + } + r.close(); + addJSONFeeds(provFeeds1, provFeedUsers1, provFeedSubnets1, jcfg); + addJSONSubs(provSubscriptions1, jcfg); + addJSONParams(provNodes1, provParams1, jcfg); + addJSONRoutingInformation(provForceIngresses1, provForceEgresses1, provHops1, jcfg); + } catch (JSONException jse) { + NodeUtils.setIpAndFqdnForEelf("ProvData"); + eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString()); + eelfLogger + .error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse); + throw new IOException(jse.toString(), jse); + } + provNodes = provNodes1.toArray(new NodeConfig.ProvNode[provNodes1.size()]); + provParams = provParams1.toArray(new NodeConfig.ProvParam[provParams1.size()]); + provFeeds = provFeeds1.toArray(new NodeConfig.ProvFeed[provFeeds1.size()]); + provFeedUsers = provFeedUsers1.toArray(new NodeConfig.ProvFeedUser[provFeedUsers1.size()]); + provFeedSubnets = provFeedSubnets1.toArray(new NodeConfig.ProvFeedSubnet[provFeedSubnets1.size()]); + provSubscriptions = provSubscriptions1.toArray(new NodeConfig.ProvSubscription[provSubscriptions1.size()]); + provForceIngresses = provForceIngresses1.toArray(new NodeConfig.ProvForceIngress[provForceIngresses1.size()]); + provForceEgresses = provForceEgresses1.toArray(new NodeConfig.ProvForceEgress[provForceEgresses1.size()]); + provHops = provHops1.toArray(new NodeConfig.ProvHop[provHops1.size()]); } private static String[] gvasa(JSONObject o, String key) { @@ -62,7 +118,7 @@ public class ProvData { private static String[] gvasa(Object o) { if (o instanceof JSONArray) { JSONArray a = (JSONArray) o; - Vector<String> v = new Vector<String>(); + ArrayList<String> v = new ArrayList<>(); for (int i = 0; i < a.length(); i++) { String s = gvas(a, i); if (s != null) { @@ -96,236 +152,256 @@ public class ProvData { } /** - * Construct raw provisioing data entries from the text (JSON) - * provisioning document received from the provisioning server - * - * @param r The reader for the JSON text. - */ - public ProvData(Reader r) throws IOException { - Vector<NodeConfig.ProvNode> pnv = new Vector<NodeConfig.ProvNode>(); - Vector<NodeConfig.ProvParam> ppv = new Vector<NodeConfig.ProvParam>(); - Vector<NodeConfig.ProvFeed> pfv = new Vector<NodeConfig.ProvFeed>(); - Vector<NodeConfig.ProvFeedUser> pfuv = new Vector<NodeConfig.ProvFeedUser>(); - Vector<NodeConfig.ProvFeedSubnet> pfsnv = new Vector<NodeConfig.ProvFeedSubnet>(); - Vector<NodeConfig.ProvSubscription> psv = new Vector<NodeConfig.ProvSubscription>(); - Vector<NodeConfig.ProvForceIngress> pfiv = new Vector<NodeConfig.ProvForceIngress>(); - Vector<NodeConfig.ProvForceEgress> pfev = new Vector<NodeConfig.ProvForceEgress>(); - Vector<NodeConfig.ProvHop> phv = new Vector<NodeConfig.ProvHop>(); - try { - JSONTokener jtx = new JSONTokener(r); - JSONObject jcfg = new JSONObject(jtx); - char c = jtx.nextClean(); - if (c != '\0') { - throw new JSONException("Spurious characters following configuration"); - } - r.close(); - JSONArray jfeeds = jcfg.optJSONArray("feeds"); - if (jfeeds != null) { - for (int fx = 0; fx < jfeeds.length(); fx++) { - JSONObject jfeed = jfeeds.getJSONObject(fx); - String stat = null; - if (jfeed.optBoolean("suspend", false)) { - stat = "Feed is suspended"; - } - if (jfeed.optBoolean("deleted", false)) { - stat = "Feed is deleted"; - } - String fid = gvas(jfeed, "feedid"); - String fname = gvas(jfeed, "name"); - String fver = gvas(jfeed, "version"); - String createdDate = gvas(jfeed, "created_date"); - /* - * START - AAF changes - * TDP EPIC US# 307413 - * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds - */ - String aafInstance = gvas(jfeed, "aaf_instance"); - pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat,createdDate, aafInstance)); - /* - * END - AAF changes - */ - JSONObject jauth = jfeed.optJSONObject("authorization"); - if (jauth == null) { - continue; - } - JSONArray jeids = jauth.optJSONArray("endpoint_ids"); - if (jeids != null) { - for (int ux = 0; ux < jeids.length(); ux++) { - JSONObject ju = jeids.getJSONObject(ux); - String login = gvas(ju, "id"); - String password = gvas(ju, "password"); - pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password))); - } - } - JSONArray jeips = jauth.optJSONArray("endpoint_addrs"); - if (jeips != null) { - for (int ix = 0; ix < jeips.length(); ix++) { - String sn = gvas(jeips, ix); - pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn)); - } - } - } - } - JSONArray jsubs = jcfg.optJSONArray("subscriptions"); - if (jsubs != null) { - for (int sx = 0; sx < jsubs.length(); sx++) { - JSONObject jsub = jsubs.getJSONObject(sx); - if (jsub.optBoolean("suspend", false)) { - continue; - } - String sid = gvas(jsub, "subid"); - String fid = gvas(jsub, "feedid"); - JSONObject jdel = jsub.getJSONObject("delivery"); - String delurl = gvas(jdel, "url"); - String id = gvas(jdel, "user"); - String password = gvas(jdel, "password"); - boolean monly = jsub.getBoolean("metadataOnly"); - boolean use100 = jdel.getBoolean("use100"); - boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber"); - boolean decompress = jsub.getBoolean("decompress"); - boolean followRedirect = jsub.getBoolean("follow_redirect"); - psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, followRedirect, decompress)); - } - } - JSONObject jparams = jcfg.optJSONObject("parameters"); - if (jparams != null) { - for (String pname : JSONObject.getNames(jparams)) { - String pvalue = gvas(jparams, pname); - if (pvalue != null) { - ppv.add(new NodeConfig.ProvParam(pname, pvalue)); - } - } - String sfx = gvas(jparams, "PROV_DOMAIN"); - JSONArray jnodes = jparams.optJSONArray("NODES"); - if (jnodes != null) { - for (int nx = 0; nx < jnodes.length(); nx++) { - String nn = gvas(jnodes, nx); - if (nn.indexOf('.') == -1) { - nn = nn + "." + sfx; - } - pnv.add(new NodeConfig.ProvNode(nn)); - } - } - } - JSONArray jingresses = jcfg.optJSONArray("ingress"); - if (jingresses != null) { - for (int fx = 0; fx < jingresses.length(); fx++) { - JSONObject jingress = jingresses.getJSONObject(fx); - String fid = gvas(jingress, "feedid"); - String subnet = gvas(jingress, "subnet"); - String user = gvas(jingress, "user"); - String[] nodes = gvasa(jingress, "node"); - if (fid == null || "".equals(fid)) { - continue; - } - if ("".equals(subnet)) { - subnet = null; - } - if ("".equals(user)) { - user = null; - } - pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes)); - } - } - JSONObject jegresses = jcfg.optJSONObject("egress"); - if (jegresses != null && JSONObject.getNames(jegresses) != null) { - for (String esid : JSONObject.getNames(jegresses)) { - String enode = gvas(jegresses, esid); - if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) { - pfev.add(new NodeConfig.ProvForceEgress(esid, enode)); - } - } - } - JSONArray jhops = jcfg.optJSONArray("routing"); - if (jhops != null) { - for (int fx = 0; fx < jhops.length(); fx++) { - JSONObject jhop = jhops.getJSONObject(fx); - String from = gvas(jhop, "from"); - String to = gvas(jhop, "to"); - String via = gvas(jhop, "via"); - if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) { - continue; - } - phv.add(new NodeConfig.ProvHop(from, to, via)); - } - } - } catch (JSONException jse) { - NodeUtils.setIpAndFqdnForEelf("ProvData"); - eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString()); - eelfLogger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse); - throw new IOException(jse.toString(), jse); - } - pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]); - pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]); - pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]); - pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]); - pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]); - ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]); - pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]); - pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]); - ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]); - } - - /** * Get the raw node configuration entries */ public NodeConfig.ProvNode[] getNodes() { - return (pn); + return (provNodes); } /** * Get the raw parameter configuration entries */ public NodeConfig.ProvParam[] getParams() { - return (pp); + return (provParams); } /** * Ge the raw feed configuration entries */ public NodeConfig.ProvFeed[] getFeeds() { - return (pf); + return (provFeeds); } /** * Get the raw feed user configuration entries */ public NodeConfig.ProvFeedUser[] getFeedUsers() { - return (pfu); + return (provFeedUsers); } /** * Get the raw feed subnet configuration entries */ public NodeConfig.ProvFeedSubnet[] getFeedSubnets() { - return (pfsn); + return (provFeedSubnets); } /** * Get the raw subscription entries */ public NodeConfig.ProvSubscription[] getSubscriptions() { - return (ps); + return (provSubscriptions); } /** * Get the raw forced ingress entries */ public NodeConfig.ProvForceIngress[] getForceIngress() { - return (pfi); + return (provForceIngresses); } /** * Get the raw forced egress entries */ public NodeConfig.ProvForceEgress[] getForceEgress() { - return (pfe); + return (provForceEgresses); } /** * Get the raw next hop entries */ public NodeConfig.ProvHop[] getHops() { - return (ph); + return (provHops); + } + + @Nullable + private String getFeedStatus(JSONObject jfeed) { + String stat = null; + if (jfeed.optBoolean("suspend", false)) { + stat = "Feed is suspended"; + } + if (jfeed.optBoolean("deleted", false)) { + stat = "Feed is deleted"; + } + return stat; + } + + private void addJSONFeeds(ArrayList<ProvFeed> provFeeds1, ArrayList<ProvFeedUser> provFeedUsers1, + ArrayList<ProvFeedSubnet> provFeedSubnets1, + JSONObject jsonConfig) { + JSONArray jfeeds = jsonConfig.optJSONArray("feeds"); + if (jfeeds != null) { + for (int fx = 0; fx < jfeeds.length(); fx++) { + addJSONFeed(provFeeds1, provFeedUsers1, provFeedSubnets1, jfeeds, fx); + } + } + } + + private void addJSONFeed(ArrayList<ProvFeed> provFeeds1, ArrayList<ProvFeedUser> provFeedUsers1, + ArrayList<ProvFeedSubnet> provFeedSubnets1, JSONArray jfeeds, int feedIndex) { + JSONObject jfeed = jfeeds.getJSONObject(feedIndex); + String stat = getFeedStatus(jfeed); + String fid = gvas(jfeed, FEED_ID); + String fname = gvas(jfeed, "name"); + String fver = gvas(jfeed, "version"); + String createdDate = gvas(jfeed, "created_date"); + /* + * START - AAF changes + * TDP EPIC US# 307413 + * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds + */ + String aafInstance = gvas(jfeed, "aaf_instance"); + provFeeds1.add(new ProvFeed(fid, fname + "//" + fver, stat, createdDate, aafInstance)); + /* + * END - AAF changes + */ + addJSONFeedAuthArrays(provFeedUsers1, provFeedSubnets1, jfeed, fid); + } + + private void addJSONFeedAuthArrays(ArrayList<ProvFeedUser> provFeedUsers1, + ArrayList<ProvFeedSubnet> provFeedSubnets1, JSONObject jfeed, String fid) { + JSONObject jauth = jfeed.optJSONObject("authorization"); + if (jauth == null) { + return; + } + JSONArray jeids = jauth.optJSONArray("endpoint_ids"); + if (jeids != null) { + for (int ux = 0; ux < jeids.length(); ux++) { + JSONObject ju = jeids.getJSONObject(ux); + String login = gvas(ju, "id"); + String password = gvas(ju, "password"); + provFeedUsers1.add(new ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password))); + } + } + JSONArray jeips = jauth.optJSONArray("endpoint_addrs"); + if (jeips != null) { + for (int ix = 0; ix < jeips.length(); ix++) { + String sn = gvas(jeips, ix); + provFeedSubnets1.add(new ProvFeedSubnet(fid, sn)); + } + } + } + + private void addJSONSubs(ArrayList<ProvSubscription> provSubscriptions1, JSONObject jsonConfig) { + JSONArray jsubs = jsonConfig.optJSONArray("subscriptions"); + if (jsubs != null) { + for (int sx = 0; sx < jsubs.length(); sx++) { + addJSONSub(provSubscriptions1, jsubs, sx); + } + } + } + + private void addJSONSub(ArrayList<ProvSubscription> provSubscriptions1, JSONArray jsubs, int sx) { + JSONObject jsub = jsubs.getJSONObject(sx); + if (jsub.optBoolean("suspend", false)) { + return; + } + String sid = gvas(jsub, "subid"); + String fid = gvas(jsub, FEED_ID); + JSONObject jdel = jsub.getJSONObject("delivery"); + String delurl = gvas(jdel, "url"); + String id = gvas(jdel, "user"); + String password = gvas(jdel, "password"); + boolean monly = jsub.getBoolean("metadataOnly"); + boolean use100 = jdel.getBoolean("use100"); + boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber"); + boolean decompress = jsub.getBoolean("decompress"); + boolean followRedirect = jsub.getBoolean("follow_redirect"); + provSubscriptions1 + .add(new ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, + privilegedSubscriber, followRedirect, decompress)); + } + + private void addJSONParams(ArrayList<ProvNode> provNodes1, ArrayList<ProvParam> provParams1, + JSONObject jsonconfig) { + JSONObject jparams = jsonconfig.optJSONObject("parameters"); + if (jparams != null) { + for (String pname : JSONObject.getNames(jparams)) { + addJSONParam(provParams1, jparams, pname); + } + addJSONNodesToParams(provNodes1, jparams); + } + } + + private void addJSONParam(ArrayList<ProvParam> provParams1, JSONObject jparams, String pname) { + String pvalue = gvas(jparams, pname); + if (pvalue != null) { + provParams1.add(new ProvParam(pname, pvalue)); + } + } + + private void addJSONNodesToParams(ArrayList<ProvNode> provNodes1, JSONObject jparams) { + String sfx = gvas(jparams, "PROV_DOMAIN"); + JSONArray jnodes = jparams.optJSONArray("NODES"); + if (jnodes != null) { + for (int nx = 0; nx < jnodes.length(); nx++) { + String nn = gvas(jnodes, nx); + if (nn == null) { + continue; + } + if (nn.indexOf('.') == -1) { + nn = nn + "." + sfx; + } + provNodes1.add(new ProvNode(nn)); + } + } + } + + private void addJSONRoutingInformation(ArrayList<ProvForceIngress> provForceIngresses1, + ArrayList<ProvForceEgress> provForceEgresses1, ArrayList<ProvHop> provHops1, JSONObject jsonConfig) { + JSONArray jingresses = jsonConfig.optJSONArray("ingress"); + if (jingresses != null) { + for (int fx = 0; fx < jingresses.length(); fx++) { + addJSONIngressRoute(provForceIngresses1, jingresses, fx); + } + } + JSONObject jegresses = jsonConfig.optJSONObject("egress"); + if (jegresses != null && JSONObject.getNames(jegresses) != null) { + for (String esid : JSONObject.getNames(jegresses)) { + addJSONEgressRoute(provForceEgresses1, jegresses, esid); + } + } + JSONArray jhops = jsonConfig.optJSONArray("routing"); + if (jhops != null) { + for (int fx = 0; fx < jhops.length(); fx++) { + addJSONRoutes(provHops1, jhops, fx); + } + } + } + + private void addJSONIngressRoute(ArrayList<ProvForceIngress> provForceIngresses1, JSONArray jingresses, int fx) { + JSONObject jingress = jingresses.getJSONObject(fx); + String fid = gvas(jingress, FEED_ID); + String subnet = gvas(jingress, "subnet"); + String user = gvas(jingress, "user"); + String[] nodes = gvasa(jingress, "node"); + if (fid == null || "".equals(fid)) { + return; + } + if ("".equals(subnet)) { + subnet = null; + } + if ("".equals(user)) { + user = null; + } + provForceIngresses1.add(new ProvForceIngress(fid, subnet, user, nodes)); + } + + private void addJSONEgressRoute(ArrayList<ProvForceEgress> provForceEgresses1, JSONObject jegresses, String esid) { + String enode = gvas(jegresses, esid); + if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) { + provForceEgresses1.add(new ProvForceEgress(esid, enode)); + } + } + + private void addJSONRoutes(ArrayList<ProvHop> provHops1, JSONArray jhops, int fx) { + JSONObject jhop = jhops.getJSONObject(fx); + String from = gvas(jhop, "from"); + String to = gvas(jhop, "to"); + String via = gvas(jhop, "via"); + if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) { + return; + } + provHops1.add(new ProvHop(from, to, via)); } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java index 3d4908e8..5b7248af 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java @@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node; * Generate publish IDs */ public class PublishId { + private long nextuid; private String myname; @@ -41,7 +42,8 @@ public class PublishId { } /** - * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes. + * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log + * correlation purposes. */ public synchronized String next() { long now = System.currentTimeMillis(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java index 42af8ca0..94b694d4 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java @@ -24,13 +24,15 @@ package org.onap.dmaap.datarouter.node; -import java.util.*; +import java.util.Timer; +import java.util.TimerTask; /** * Execute an operation no more frequently than a specified interval */ public abstract class RateLimitedOperation implements Runnable { + private boolean marked; // a timer task exists private boolean executing; // the operation is currently in progress private boolean remark; // a request was made while the operation was in progress @@ -41,29 +43,15 @@ public abstract class RateLimitedOperation implements Runnable { /** * Create a rate limited operation * - * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin - * @param timer The timer used to perform deferred executions + * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can + * begin + * @param timer The timer used to perform deferred executions */ public RateLimitedOperation(long mininterval, Timer timer) { this.timer = timer; this.mininterval = mininterval; } - private class deferred extends TimerTask { - public void run() { - execute(); - } - } - - private synchronized void unmark() { - marked = false; - } - - private void execute() { - unmark(); - request(); - } - /** * Request that the operation be performed by this thread or at a later time by the timer */ @@ -90,7 +78,7 @@ public abstract class RateLimitedOperation implements Runnable { if (last + mininterval > now) { // too soon - schedule a timer marked = true; - timer.schedule(new deferred(), last + mininterval - now); + timer.schedule(new Deferred(), last + mininterval - now); return (true); } last = now; @@ -107,4 +95,20 @@ public abstract class RateLimitedOperation implements Runnable { } return (false); } + + private class Deferred extends TimerTask { + + public void run() { + execute(); + } + + private void execute() { + unmark(); + request(); + } + + private synchronized void unmark() { + marked = false; + } + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java index 4cd650b0..83e3c30d 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java @@ -29,27 +29,27 @@ import com.att.eelf.configuration.EELFManager; import java.io.BufferedReader; import java.io.FileOutputStream; import java.io.FileReader; -import java.io.IOException; import java.io.OutputStream; -import java.util.Hashtable; +import java.util.HashMap; +import java.util.Map; import java.util.Timer; /** * Track redirections of subscriptions */ public class RedirManager { + private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(RedirManager.class); - private Hashtable<String, String> sid2primary = new Hashtable<String, String>(); - private Hashtable<String, String> sid2secondary = new Hashtable<String, String>(); - private String redirfile; RateLimitedOperation op; + private HashMap<String, String> sid2primary = new HashMap<>(); + private HashMap<String, String> sid2secondary = new HashMap<>(); + private String redirfile; /** * Create a mechanism for maintaining subscription redirections. * * @param redirfile The file to store the redirection information. - * @param mininterval The minimum number of milliseconds between writes to the redirection - * information file. + * @param mininterval The minimum number of milliseconds between writes to the redirection information file. * @param timer The timer thread used to run delayed file writes. */ public RedirManager(String redirfile, long mininterval, Timer timer) { @@ -57,10 +57,12 @@ public class RedirManager { op = new RateLimitedOperation(mininterval, timer) { public void run() { try { - StringBuffer sb = new StringBuffer(); - for (String s : sid2primary.keySet()) { - sb.append(s).append(' ').append(sid2primary.get(s)).append(' ') - .append(sid2secondary.get(s)).append('\n'); + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, String> entry : sid2primary.entrySet()) { + String s = entry.getKey(); + String value = entry.getValue(); + sb.append(s).append(' ').append(value).append(' ') + .append(sid2secondary.get(s)).append('\n'); } try (OutputStream os = new FileOutputStream(RedirManager.this.redirfile)) { os.write(sb.toString().getBytes()); @@ -74,23 +76,17 @@ public class RedirManager { String s; try (BufferedReader br = new BufferedReader(new FileReader(redirfile))) { while ((s = br.readLine()) != null) { - s = s.trim(); - String[] sx = s.split(" "); - if (s.startsWith("#") || sx.length != 3) { - continue; - } - sid2primary.put(sx[0], sx[1]); - sid2secondary.put(sx[0], sx[2]); + addSubRedirInfo(s); } } } catch (Exception e) { - eelfLogger.error("Missing file is normal", e); + eelfLogger.debug("Missing file is normal", e); } } /** - * Set up redirection. If a request is to be sent to subscription ID sid, and that is - * configured to go to URL primary, instead, go to secondary. + * Set up redirection. If a request is to be sent to subscription ID sid, and that is configured to go to URL + * primary, instead, go to secondary. * * @param sid The subscription ID to be redirected * @param primary The URL associated with that subscription ID @@ -103,8 +99,7 @@ public class RedirManager { } /** - * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its - * primary URL. + * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its primary URL. * * @param sid The subscription ID to remove from the table. */ @@ -115,8 +110,8 @@ public class RedirManager { } /** - * Look up where to send a subscription. If the primary has changed or there is no redirection, - * use the primary. Otherwise, redirect to the secondary URL. + * Look up where to send a subscription. If the primary has changed or there is no redirection, use the primary. + * Otherwise, redirect to the secondary URL. * * @param sid The subscription ID to look up. * @param primary The configured primary URL. @@ -138,4 +133,14 @@ public class RedirManager { public synchronized boolean isRedirected(String sid) { return (sid != null && sid2secondary.get(sid) != null); } + + private void addSubRedirInfo(String s) { + s = s.trim(); + String[] sx = s.split(" "); + if (s.startsWith("#") || sx.length != 3) { + return; + } + sid2primary.put(sx[0], sx[1]); + sid2secondary.put(sx[0], sx[2]); + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java index 1be3408a..e6165588 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java @@ -25,20 +25,27 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import java.util.regex.*; -import java.util.*; -import java.io.*; -import java.nio.file.*; -import java.text.*; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Logging for data router delivery events (PUB/DEL/EXP) */ public class StatusLog { + + private static final String EXCEPTION = "Exception"; private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(StatusLog.class); private static StatusLog instance = new StatusLog(); - private HashSet<String> toship = new HashSet<String>(); - private SimpleDateFormat filedate; + private SimpleDateFormat filedate = new SimpleDateFormat("-yyyyMMddHHmm"); + private String prefix = "logs/events"; private String suffix = ".log"; private String plainfile; @@ -48,85 +55,75 @@ public class StatusLog { private long intvl; private NodeConfigManager config = NodeConfigManager.getInstance(); - { - try { - filedate = new SimpleDateFormat("-yyyyMMddHHmm"); - } catch (Exception e) { - eelfLogger.error("Exception", e); - } + private StatusLog() { } /** - * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are specified, assume seconds. + * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are + * specified, assume seconds. */ public static long parseInterval(String interval, int def) { try { Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval); if (m.matches()) { - int dur = 0; - String x = m.group(1); - if (x != null) { - dur += 3600 * Integer.parseInt(x); - } - x = m.group(2); - if (x != null) { - dur += 60 * Integer.parseInt(x); - } - x = m.group(3); - if (x != null) { - dur += Integer.parseInt(x); - } - if (dur < 60) { - dur = 60; - } + int dur = getDur(m); int best = 86400; int dist = best - dur; if (dur > best) { dist = dur - best; } - int base = 1; - for (int i = 0; i < 8; i++) { - int base2 = base; - base *= 2; - for (int j = 0; j < 4; j++) { - int base3 = base2; - base2 *= 3; - for (int k = 0; k < 3; k++) { - int cur = base3; - base3 *= 5; - int ndist = cur - dur; - if (dur > cur) { - ndist = dur - cur; - } - if (ndist < dist) { - best = cur; - dist = ndist; - } - } - } - } + best = getBest(dur, best, dist); def = best * 1000; } } catch (Exception e) { - eelfLogger.error("Exception", e); + eelfLogger.error(EXCEPTION, e); } return (def); } - private synchronized void checkRoll(long now) throws IOException { - if (now >= nexttime) { - if (os != null) { - os.close(); - os = null; + private static int getBest(int dur, int best, int dist) { + int base = 1; + for (int i = 0; i < 8; i++) { + int base2 = base; + base *= 2; + for (int j = 0; j < 4; j++) { + int base3 = base2; + base2 *= 3; + for (int k = 0; k < 3; k++) { + int cur = base3; + base3 *= 5; + int ndist = cur - dur; + if (dur > cur) { + ndist = dur - cur; + } + if (ndist < dist) { + best = cur; + dist = ndist; + } + } } - intvl = parseInterval(config.getEventLogInterval(), 300000); - prefix = config.getEventLogPrefix(); - suffix = config.getEventLogSuffix(); - nexttime = now - now % intvl + intvl; - curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix; - plainfile = prefix + suffix; - notify(); } + return best; + } + + private static int getDur(Matcher m) { + int dur = 0; + String x = m.group(1); + if (x != null) { + dur += 3600 * Integer.parseInt(x); + } + x = m.group(2); + if (x != null) { + dur += 60 * Integer.parseInt(x); + } + x = m.group(3); + if (x != null) { + dur += Integer.parseInt(x); + } + if (dur < 60) { + dur = 60; + } + return dur; } /** @@ -138,111 +135,107 @@ public class StatusLog { try { instance.checkRoll(System.currentTimeMillis()); } catch (Exception e) { - eelfLogger.error("Exception", e); + eelfLogger.error(EXCEPTION, e); } return (instance.curfile); } - private synchronized void log(String s) { - try { - long now = System.currentTimeMillis(); - checkRoll(now); - if (os == null) { - os = new FileOutputStream(curfile, true); - (new File(plainfile)).delete(); - Files.createLink(Paths.get(plainfile), Paths.get(curfile)); - } - os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes()); - os.flush(); - } catch (IOException ioe) { - eelfLogger.error("IOException", ioe); - } - } - /** * Log a received publication attempt. * - * @param pubid The publish ID assigned by the node + * @param pubid The publish ID assigned by the node * @param feedid The feed id given by the publisher * @param requrl The URL of the received request * @param method The method (DELETE or PUT) in the received request - * @param ctype The content type (if method is PUT and clen > 0) - * @param clen The content length (if method is PUT) - * @param srcip The IP address of the publisher - * @param user The identity of the publisher + * @param ctype The content type (if method is PUT and clen > 0) + * @param clen The content length (if method is PUT) + * @param srcip The IP address of the publisher + * @param user The identity of the publisher * @param status The status returned to the publisher */ - public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) { - instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status); + public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, + String srcip, String user, int status) { + instance.log( + "PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + + "|" + user + "|" + status); } /** * Log a data transfer error receiving a publication attempt * - * @param pubid The publish ID assigned by the node + * @param pubid The publish ID assigned by the node * @param feedid The feed id given by the publisher * @param requrl The URL of the received request * @param method The method (DELETE or PUT) in the received request - * @param ctype The content type (if method is PUT and clen > 0) - * @param clen The expected content length (if method is PUT) - * @param rcvd The content length received - * @param srcip The IP address of the publisher - * @param user The identity of the publisher - * @param error The error message from the IO exception + * @param ctype The content type (if method is PUT and clen > 0) + * @param clen The expected content length (if method is PUT) + * @param rcvd The content length received + * @param srcip The IP address of the publisher + * @param user The identity of the publisher + * @param error The error message from the IO exception */ - public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) { - instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error); + public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, + long rcvd, String srcip, String user, String error) { + instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + + "|" + srcip + "|" + user + "|" + error); } /** * Log a delivery attempt. * - * @param pubid The publish ID assigned by the node + * @param pubid The publish ID assigned by the node * @param feedid The feed ID - * @param subid The (space delimited list of) subscription ID + * @param subid The (space delimited list of) subscription ID * @param requrl The URL used in the attempt * @param method The method (DELETE or PUT) in the attempt - * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) - * @param clen The content length (if PUT and not metaonly) - * @param user The identity given to the subscriber + * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) + * @param clen The content length (if PUT and not metaonly) + * @param user The identity given to the subscriber * @param status The status returned by the subscriber or -1 if an exeception occured trying to connect * @param xpubid The publish ID returned by the subscriber */ - public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) { + public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, + long clen, String user, int status, String xpubid) { if (feedid == null) { return; } - instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid); + instance.log( + "DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + + "|" + user + "|" + status + "|" + xpubid); } /** * Log delivery attempts expired * - * @param pubid The publish ID assigned by the node - * @param feedid The feed ID - * @param subid The (space delimited list of) subscription ID - * @param requrl The URL that would be delivered to - * @param method The method (DELETE or PUT) in the request - * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) - * @param clen The content length (if PUT and not metaonly) - * @param reason The reason the attempts were discontinued + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param requrl The URL that would be delivered to + * @param method The method (DELETE or PUT) in the request + * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) + * @param clen The content length (if PUT and not metaonly) + * @param reason The reason the attempts were discontinued * @param attempts The number of attempts made */ - public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) { + public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, + long clen, String reason, int attempts) { if (feedid == null) { return; } - instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts); + instance.log( + "EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + + "|" + reason + "|" + attempts); } /** * Log extra statistics about unsuccessful delivery attempts. * - * @param pubid The publish ID assigned by the node + * @param pubid The publish ID assigned by the node * @param feedid The feed ID - * @param subid The (space delimited list of) subscription ID - * @param clen The content length - * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred. + * @param subid The (space delimited list of) subscription ID + * @param clen The content length + * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the + * number of bytes sent before an error occurred. */ public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) { if (feedid == null) { @@ -251,6 +244,35 @@ public class StatusLog { instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent); } - private StatusLog() { + private synchronized void checkRoll(long now) throws IOException { + if (now >= nexttime) { + if (os != null) { + os.close(); + os = null; + } + intvl = parseInterval(config.getEventLogInterval(), 300000); + prefix = config.getEventLogPrefix(); + suffix = config.getEventLogSuffix(); + nexttime = now - now % intvl + intvl; + curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix; + plainfile = prefix + suffix; + notify(); + } + } + + private synchronized void log(String s) { + try { + long now = System.currentTimeMillis(); + checkRoll(now); + if (os == null) { + os = new FileOutputStream(curfile, true); + (new File(plainfile)).delete(); + Files.createLink(Paths.get(plainfile), Paths.get(curfile)); + } + os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes()); + os.flush(); + } catch (IOException ioe) { + eelfLogger.error("IOException", ioe); + } } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java index 6f74df48..fd5a6bc6 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java @@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node; * Compare IP addresses as byte arrays to a subnet specified as a CIDR */ public class SubnetMatcher { + private byte[] sn; private int len; private int mask; diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java index eb10876e..d86b2e92 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java @@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node; * A destination to deliver a message */ public class Target { + private DestInfo destinfo; private String routing; @@ -35,7 +36,7 @@ public class Target { * A destination to deliver a message * * @param destinfo Either info for a subscription ID or info for a node-to-node transfer - * @param routing For a node-to-node transfer, what to do when it gets there. + * @param routing For a node-to-node transfer, what to do when it gets there. */ public Target(DestInfo destinfo, String routing) { this.destinfo = destinfo; diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java index 33e4f801..1eb73c69 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java @@ -24,41 +24,37 @@ package org.onap.dmaap.datarouter.node; -import java.util.*; +import java.util.HashSet; +import java.util.Iterator; /** - * Manage a list of tasks to be executed when an event occurs. - * This makes the following guarantees: + * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees: * <ul> * <li>Tasks can be safely added and removed in the middle of a run.</li> * <li>No task will be returned more than once during a run.</li> * <li>No task will be returned when it is not, at that moment, in the list of tasks.</li> * <li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li> - * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is called. + * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is + * called. * </ul> */ public class TaskList { + private Iterator<Runnable> runlist; - private HashSet<Runnable> tasks = new HashSet<Runnable>(); + private HashSet<Runnable> tasks = new HashSet<>(); private HashSet<Runnable> togo; private HashSet<Runnable> sofar; private HashSet<Runnable> added; private HashSet<Runnable> removed; /** - * Construct a new TaskList - */ - public TaskList() { - } - - /** * Start executing the sequence of tasks. */ public synchronized void startRun() { - sofar = new HashSet<Runnable>(); - added = new HashSet<Runnable>(); - removed = new HashSet<Runnable>(); - togo = new HashSet<Runnable>(tasks); + sofar = new HashSet<>(); + added = new HashSet<>(); + removed = new HashSet<>(); + togo = new HashSet<>(tasks); runlist = togo.iterator(); } @@ -69,18 +65,13 @@ public class TaskList { while (runlist != null) { if (runlist.hasNext()) { Runnable task = runlist.next(); - if (removed.contains(task)) { - continue; + if (addTaskToSoFar(task)) { + return task; } - if (sofar.contains(task)) { - continue; - } - sofar.add(task); - return (task); } - if (added.size() != 0) { + if (!added.isEmpty()) { togo = added; - added = new HashSet<Runnable>(); + added = new HashSet<>(); removed.clear(); runlist = togo.iterator(); continue; @@ -115,4 +106,15 @@ public class TaskList { } tasks.remove(task); } + + private boolean addTaskToSoFar(Runnable task) { + if (removed.contains(task)) { + return false; + } + if (sofar.contains(task)) { + return false; + } + sofar.add(task); + return true; + } } |