From e4b20cc6f7c31f48ddd0de5bcd054b09a35cd510 Mon Sep 17 00:00:00 2001 From: sg481n Date: Wed, 23 Aug 2017 16:30:52 -0400 Subject: Update project structure to org.onap Update project structure of dmaap/datarouter from com.att to org.onap and add distribution management and nexus repositires details. Issue-id: DMAAP-52 Change-Id: Ibafee1cba43c7c5a3f227a02417998d36ecaae6b Signed-off-by: sg481n --- .../com/att/research/datarouter/node/Delivery.java | 253 -------- .../research/datarouter/node/DeliveryQueue.java | 348 ---------- .../datarouter/node/DeliveryQueueHelper.java | 89 --- .../att/research/datarouter/node/DeliveryTask.java | 308 --------- .../datarouter/node/DeliveryTaskHelper.java | 72 -- .../com/att/research/datarouter/node/DestInfo.java | 132 ---- .../com/att/research/datarouter/node/IsFrom.java | 82 --- .../att/research/datarouter/node/LogManager.java | 159 ----- .../att/research/datarouter/node/NodeConfig.java | 722 --------------------- .../datarouter/node/NodeConfigManager.java | 599 ----------------- .../com/att/research/datarouter/node/NodeMain.java | 113 ---- .../att/research/datarouter/node/NodeServlet.java | 380 ----------- .../att/research/datarouter/node/NodeUtils.java | 226 ------- .../att/research/datarouter/node/PathFinder.java | 132 ---- .../com/att/research/datarouter/node/ProvData.java | 302 --------- .../att/research/datarouter/node/PublishId.java | 52 -- .../datarouter/node/RateLimitedOperation.java | 102 --- .../att/research/datarouter/node/RedirManager.java | 118 ---- .../att/research/datarouter/node/StatusLog.java | 229 ------- .../research/datarouter/node/SubnetMatcher.java | 71 -- .../com/att/research/datarouter/node/Target.java | 60 -- .../com/att/research/datarouter/node/TaskList.java | 113 ---- .../research/datarouter/node/eelf/EELFFilter.java | 43 -- .../research/datarouter/node/eelf/EelfMsgs.java | 96 --- .../org/onap/dmaap/datarouter/node/Delivery.java | 253 ++++++++ .../onap/dmaap/datarouter/node/DeliveryQueue.java | 348 ++++++++++ .../dmaap/datarouter/node/DeliveryQueueHelper.java | 89 +++ .../onap/dmaap/datarouter/node/DeliveryTask.java | 308 +++++++++ .../dmaap/datarouter/node/DeliveryTaskHelper.java | 72 ++ .../org/onap/dmaap/datarouter/node/DestInfo.java | 132 ++++ .../org/onap/dmaap/datarouter/node/IsFrom.java | 82 +++ .../org/onap/dmaap/datarouter/node/LogManager.java | 159 +++++ .../org/onap/dmaap/datarouter/node/NodeConfig.java | 722 +++++++++++++++++++++ .../dmaap/datarouter/node/NodeConfigManager.java | 599 +++++++++++++++++ .../org/onap/dmaap/datarouter/node/NodeMain.java | 113 ++++ .../onap/dmaap/datarouter/node/NodeServlet.java | 380 +++++++++++ .../org/onap/dmaap/datarouter/node/NodeUtils.java | 226 +++++++ .../org/onap/dmaap/datarouter/node/PathFinder.java | 132 ++++ .../org/onap/dmaap/datarouter/node/ProvData.java | 302 +++++++++ .../org/onap/dmaap/datarouter/node/PublishId.java | 52 ++ .../datarouter/node/RateLimitedOperation.java | 102 +++ .../onap/dmaap/datarouter/node/RedirManager.java | 118 ++++ .../org/onap/dmaap/datarouter/node/StatusLog.java | 229 +++++++ .../onap/dmaap/datarouter/node/SubnetMatcher.java | 71 ++ .../org/onap/dmaap/datarouter/node/Target.java | 60 ++ .../org/onap/dmaap/datarouter/node/TaskList.java | 113 ++++ .../dmaap/datarouter/node/eelf/EELFFilter.java | 43 ++ .../onap/dmaap/datarouter/node/eelf/EelfMsgs.java | 96 +++ 48 files changed, 4801 insertions(+), 4801 deletions(-) delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java delete mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java create mode 100644 datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EelfMsgs.java (limited to 'datarouter-node/src') diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java deleted file mode 100644 index d0e88ec9..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java +++ /dev/null @@ -1,253 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - -package com.att.research.datarouter.node; - -import java.util.*; -import java.io.*; -import org.apache.log4j.Logger; - -/** - * Main control point for delivering files to destinations. - *

- * The Delivery class manages assignment of delivery threads to delivery - * queues and creation and destruction of delivery queues as - * configuration changes. DeliveryQueues are assigned threads based on a - * modified round-robin approach giving priority to queues with more work - * as measured by both bytes to deliver and files to deliver and lower - * priority to queues that already have delivery threads working. - * A delivery thread continues to work for a delivery queue as long as - * that queue has more files to deliver. - */ -public class Delivery { - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery"); - private static class DelItem implements Comparable { - private String pubid; - private String spool; - public int compareTo(DelItem x) { - int i = pubid.compareTo(x.pubid); - if (i == 0) { - i = spool.compareTo(x.spool); - } - return(i); - } - public String getPublishId() { - return(pubid); - } - public String getSpool() { - return(spool); - } - public DelItem(String pubid, String spool) { - this.pubid = pubid; - this.spool = spool; - } - } - private double fdstart; - private double fdstop; - private int threads; - private int curthreads; - private NodeConfigManager config; - private Hashtable dqs = new Hashtable(); - private DeliveryQueue[] queues = new DeliveryQueue[0]; - private int qpos = 0; - private long nextcheck; - private Runnable cmon = new Runnable() { - public void run() { - checkconfig(); - } - }; - /** - * Constructs a new Delivery system using the specified configuration manager. - * @param config The configuration manager for this delivery system. - */ - public Delivery(NodeConfigManager config) { - this.config = config; - config.registerConfigTask(cmon); - checkconfig(); - } - private void cleardir(String dir) { - if (dqs.get(dir) != null) { - return; - } - File fdir = new File(dir); - for (File junk: fdir.listFiles()) { - if (junk.isFile()) { - junk.delete(); - } - } - fdir.delete(); - } - private void freeDiskCheck() { - File spoolfile = new File(config.getSpoolBase()); - long tspace = spoolfile.getTotalSpace(); - long start = (long)(tspace * fdstart); - long stop = (long)(tspace * fdstop); - long cur = spoolfile.getUsableSpace(); - if (cur >= start) { - return; - } - Vector cv = new Vector(); - for (String sdir: dqs.keySet()) { - for (String meta: (new File(sdir)).list()) { - if (!meta.endsWith(".M") || meta.charAt(0) == '.') { - continue; - } - cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir)); - } - } - DelItem[] items = cv.toArray(new DelItem[cv.size()]); - Arrays.sort(items); - logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace); - for (DelItem item: items) { - long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); - logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk"); - if (amount > 0) { - cur += amount; - if (cur >= stop) { - cur = spoolfile.getUsableSpace(); - } - if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); - return; - } - } - } - cur = spoolfile.getUsableSpace(); - if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); - return; - } - logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace); - } - private void cleardirs() { - String basedir = config.getSpoolBase(); - String nbase = basedir + "/n"; - for (String nodedir: (new File(nbase)).list()) { - if (!nodedir.startsWith(".")) { - cleardir(nbase + "/" + nodedir); - } - } - String sxbase = basedir + "/s"; - for (String sxdir: (new File(sxbase)).list()) { - if (sxdir.startsWith(".")) { - continue; - } - File sxf = new File(sxbase + "/" + sxdir); - for (String sdir: sxf.list()) { - if (!sdir.startsWith(".")) { - cleardir(sxbase + "/" + sxdir + "/" + sdir); - } - } - sxf.delete(); // won't if anything still in it - } - } - private synchronized void checkconfig() { - if (!config.isConfigured()) { - return; - } - fdstart = config.getFreeDiskStart(); - fdstop = config.getFreeDiskStop(); - threads = config.getDeliveryThreads(); - if (threads < 1) { - threads = 1; - } - DestInfo[] alldis = config.getAllDests(); - DeliveryQueue[] nqs = new DeliveryQueue[alldis.length]; - qpos = 0; - Hashtable ndqs = new Hashtable(); - for (DestInfo di: alldis) { - String spl = di.getSpool(); - DeliveryQueue dq = dqs.get(spl); - if (dq == null) { - dq = new DeliveryQueue(config, di); - } else { - dq.config(di); - } - ndqs.put(spl, dq); - nqs[qpos++] = dq; - } - queues = nqs; - dqs = ndqs; - cleardirs(); - while (curthreads < threads) { - curthreads++; - (new Thread() { - { - setName("Delivery Thread"); - } - public void run() { - dodelivery(); - } - }).start(); - } - nextcheck = 0; - notify(); - } - private void dodelivery() { - DeliveryQueue dq; - while ((dq = getNextQueue()) != null) { - dq.run(); - } - } - private synchronized DeliveryQueue getNextQueue() { - while (true) { - if (curthreads > threads) { - curthreads--; - return(null); - } - if (qpos < queues.length) { - DeliveryQueue dq = queues[qpos++]; - if (dq.isSkipSet()) { - continue; - } - nextcheck = 0; - notify(); - return(dq); - } - long now = System.currentTimeMillis(); - if (now < nextcheck) { - try { - wait(nextcheck + 500 - now); - } catch (Exception e) { - } - now = System.currentTimeMillis(); - } - if (now >= nextcheck) { - nextcheck = now + 5000; - qpos = 0; - freeDiskCheck(); - } - } - } - /** - * Reset the retry timer for a delivery queue - */ - public synchronized void resetQueue(String spool) { - if (spool != null) { - DeliveryQueue dq = dqs.get(spool); - if (dq != null) { - dq.resetQueue(); - } - } - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java deleted file mode 100644 index 71c77978..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java +++ /dev/null @@ -1,348 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.io.*; -import java.util.*; - -/** - * Mechanism for monitoring and controlling delivery of files to a destination. - *

- * The DeliveryQueue class maintains lists of DeliveryTasks for a single - * destination (a subscription or another data router node) and assigns - * delivery threads to try to deliver them. It also maintains a delivery - * status that causes it to back off on delivery attempts after a failure. - *

- * If the most recent delivery result was a failure, then no more attempts - * will be made for a period of time. Initially, and on the first failure - * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds). - * If, after this delay, additional failures occur, each failure will - * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a - * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer(). - * Note that this behavior applies to the delivery queue as a whole and not - * to individual files in the queue. If multiple files are being - * delivered and one fails, the delay will be started. If a second - * delivery fails while the delay was active, it will not change the delay - * or change the duration of any subsequent delay. - * If, however, it succeeds, it will cancel the delay. - *

- * The queue maintains 3 collections of files to deliver: A todo list of - * files that will be attempted, a working set of files that are being - * attempted, and a retry set of files that were attempted and failed. - * Whenever the todo list is empty and needs to be refilled, a scan of the - * spool directory is made and the file names sorted. Any files in the working set are ignored. - * If a DeliveryTask for the file is in the retry set, then that delivery - * task is placed on the todo list. Otherwise, a new DeliveryTask for the - * file is created and placed on the todo list. - * If, when a DeliveryTask is about to be removed from the todo list, its - * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead - * marked as expired. - *

- * A delivery queue also maintains a skip flag. This flag is true if the - * failure timer is active or if no files are found in a directory scan. - */ -public class DeliveryQueue implements Runnable, DeliveryTaskHelper { - private DeliveryQueueHelper dqh; - private DestInfo di; - private Hashtable working = new Hashtable(); - private Hashtable retry = new Hashtable(); - private int todoindex; - private boolean failed; - private long failduration; - private long resumetime; - File dir; - private Vector todo = new Vector(); - /** - * Try to cancel a delivery task. - * @return The length of the task in bytes or 0 if the task cannot be cancelled. - */ - public synchronized long cancelTask(String pubid) { - if (working.get(pubid) != null) { - return(0); - } - DeliveryTask dt = retry.get(pubid); - if (dt == null) { - for (int i = todoindex; i < todo.size(); i++) { - DeliveryTask xdt = todo.get(i); - if (xdt.getPublishId().equals(pubid)) { - dt = xdt; - break; - } - } - } - if (dt == null) { - dt = new DeliveryTask(this, pubid); - if (dt.getFileId() == null) { - return(0); - } - } - if (dt.isCleaned()) { - return(0); - } - StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts()); - dt.clean(); - return(dt.getLength()); - } - /** - * Mark that a delivery task has succeeded. - */ - public synchronized void markSuccess(DeliveryTask task) { - working.remove(task.getPublishId()); - task.clean(); - failed = false; - failduration = 0; - } - /** - * Mark that a delivery task has expired. - */ - public synchronized void markExpired(DeliveryTask task) { - task.clean(); - } - /** - * Mark that a delivery task has failed permanently. - */ - public synchronized void markFailNoRetry(DeliveryTask task) { - working.remove(task.getPublishId()); - task.clean(); - failed = false; - failduration = 0; - } - private void fdupdate() { - if (!failed) { - failed = true; - if (failduration == 0) { - failduration = dqh.getInitFailureTimer(); - } - resumetime = System.currentTimeMillis() + failduration; - long maxdur = dqh.getMaxFailureTimer(); - failduration = (long)(failduration * dqh.getFailureBackoff()); - if (failduration > maxdur) { - failduration = maxdur; - } - } - } - /** - * Mark that a delivery task has been redirected. - */ - public synchronized void markRedirect(DeliveryTask task) { - working.remove(task.getPublishId()); - retry.put(task.getPublishId(), task); - } - /** - * Mark that a delivery task has temporarily failed. - */ - public synchronized void markFailWithRetry(DeliveryTask task) { - working.remove(task.getPublishId()); - retry.put(task.getPublishId(), task); - fdupdate(); - } - /** - * Get the next task. - */ - public synchronized DeliveryTask getNext() { - DeliveryTask ret = peekNext(); - if (ret != null) { - todoindex++; - working.put(ret.getPublishId(), ret); - } - return(ret); - } - /** - * Peek at the next task. - */ - public synchronized DeliveryTask peekNext() { - long now = System.currentTimeMillis(); - long mindate = now - dqh.getExpirationTimer(); - if (failed) { - if (now > resumetime) { - failed = false; - } else { - return(null); - } - } - while (true) { - if (todoindex >= todo.size()) { - todoindex = 0; - todo = new Vector(); - String[] files = dir.list(); - Arrays.sort(files); - for (String fname: files) { - if (!fname.endsWith(".M")) { - continue; - } - String fname2 = fname.substring(0, fname.length() - 2); - long pidtime = 0; - int dot = fname2.indexOf('.'); - if (dot < 1) { - continue; - } - try { - pidtime = Long.parseLong(fname2.substring(0, dot)); - } catch (Exception e) { - } - if (pidtime < 1000000000000L) { - continue; - } - if (working.get(fname2) != null) { - continue; - } - DeliveryTask dt = retry.get(fname2); - if (dt == null) { - dt = new DeliveryTask(this, fname2); - } - todo.add(dt); - } - retry = new Hashtable(); - } - if (todoindex < todo.size()) { - DeliveryTask dt = todo.get(todoindex); - if (dt.isCleaned()) { - todoindex++; - continue; - } - if (dt.getDate() >= mindate) { - return(dt); - } - todoindex++; - reportExpiry(dt); - continue; - } - return(null); - } - } - /** - * Create a delivery queue for a given destination info - */ - public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) { - this.dqh = dqh; - this.di = di; - dir = new File(di.getSpool()); - dir.mkdirs(); - } - /** - * Update the destination info for this delivery queue - */ - public void config(DestInfo di) { - this.di = di; - } - /** - * Get the dest info - */ - public DestInfo getDestInfo() { - return(di); - } - /** - * Get the config manager - */ - public DeliveryQueueHelper getConfig() { - return(dqh); - } - /** - * Exceptional condition occurred during delivery - */ - public void reportDeliveryExtra(DeliveryTask task, long sent) { - StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent); - } - /** - * Message too old to deliver - */ - public void reportExpiry(DeliveryTask task) { - StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); - markExpired(task); - } - /** - * Completed a delivery attempt - */ - public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { - if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid); - markSuccess(task); - } else if (status < 400 && dqh.isFollowRedirects()) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); - if (dqh.handleRedirection(di, location, task.getFileId())) { - markRedirect(task); - } else { - StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); - markFailNoRetry(task); - } - } else if (status < 500) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); - StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); - markFailNoRetry(task); - } else { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); - markFailWithRetry(task); - } - } - /** - * Delivery failed by reason of an exception - */ - public void reportException(DeliveryTask task, Exception exception) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString()); - dqh.handleUnreachable(di); - markFailWithRetry(task); - } - /** - * Get the feed ID for a subscription - * @param subid The subscription ID - * @return The feed ID - */ - public String getFeedId(String subid) { - return(dqh.getFeedId(subid)); - } - /** - * Get the URL to deliver a message to given the file ID - */ - public String getDestURL(String fileid) { - return(dqh.getDestURL(di, fileid)); - } - /** - * Deliver files until there's a failure or there are no more - * files to deliver - */ - public void run() { - DeliveryTask t; - long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit(); - int filestogo = dqh.getFairFileLimit(); - while ((t = getNext()) != null) { - t.run(); - if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { - break; - } - } - } - /** - * Is there no work to do for this queue right now? - */ - public synchronized boolean isSkipSet() { - return(peekNext() == null); - } - /** - * Reset the retry timer - */ - public void resetQueue() { - resumetime = System.currentTimeMillis(); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java deleted file mode 100644 index 770db1dc..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java +++ /dev/null @@ -1,89 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -/** - * Interface to allow independent testing of the DeliveryQueue code - *

- * This interface represents all of the configuration information and - * feedback mechanisms that a delivery queue needs. - */ -public interface DeliveryQueueHelper { - /** - * Get the timeout (milliseconds) before retrying after an initial delivery failure - */ - public long getInitFailureTimer(); - /** - * Get the ratio between timeouts on consecutive delivery attempts - */ - public double getFailureBackoff(); - /** - * Get the maximum timeout (milliseconds) between delivery attempts - */ - public long getMaxFailureTimer(); - /** - * Get the expiration timer (milliseconds) for deliveries - */ - public long getExpirationTimer(); - /** - * Get the maximum number of file delivery attempts before checking - * if another queue has work to be performed. - */ - public int getFairFileLimit(); - /** - * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed. - */ - public long getFairTimeLimit(); - /** - * Get the URL for delivering a file - * @param dest The destination information for the file to be delivered. - * @param fileid The file id for the file to be delivered. - * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid). - */ - public String getDestURL(DestInfo dest, String fileid); - /** - * Forget redirections associated with a subscriber - * @param dest Destination information to forget - */ - public void handleUnreachable(DestInfo dest); - /** - * Post redirection for a subscriber - * @param dest Destination information to update - * @param location Location given by subscriber - * @param fileid File ID of request - * @return true if this 3xx response is retryable, otherwise, false. - */ - public boolean handleRedirection(DestInfo dest, String location, String fileid); - /** - * Should I handle 3xx responses differently than 4xx responses? - */ - public boolean isFollowRedirects(); - /** - * Get the feed ID for a subscription - * @param subid The subscription ID - * @return The feed ID - */ - public String getFeedId(String subid); -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java deleted file mode 100644 index 3d72a417..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java +++ /dev/null @@ -1,308 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.io.*; -import java.net.*; -import java.util.*; -import org.apache.log4j.Logger; - -/** - * A file to be delivered to a destination. - *

- * A Delivery task represents a work item for the data router - a file that - * needs to be delivered and provides mechanisms to get information about - * the file and its delivery data as well as to attempt delivery. - */ -public class DeliveryTask implements Runnable, Comparable { - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.DeliveryTask"); - private DeliveryTaskHelper dth; - private String pubid; - private DestInfo di; - private String spool; - private File datafile; - private File metafile; - private long length; - private long date; - private String method; - private String fileid; - private String ctype; - private String url; - private String feedid; - private String subid; - private int attempts; - private String[][] hdrs; - /** - * Is the object a DeliveryTask with the same publication ID? - */ - public boolean equals(Object o) { - if (!(o instanceof DeliveryTask)) { - return(false); - } - return(pubid.equals(((DeliveryTask)o).pubid)); - } - /** - * Compare the publication IDs. - */ - public int compareTo(DeliveryTask o) { - return(pubid.compareTo(o.pubid)); - } - /** - * Get the hash code of the publication ID. - */ - public int hashCode() { - return(pubid.hashCode()); - } - /** - * Return the publication ID. - */ - public String toString() { - return(pubid); - } - /** - * Create a delivery task for a given delivery queue and pub ID - * @param dth The delivery task helper for the queue this task is in. - * @param pubid The publish ID for this file. This is used as - * the base for the file name in the spool directory and is of - * the form . - */ - public DeliveryTask(DeliveryTaskHelper dth, String pubid) { - this.dth = dth; - this.pubid = pubid; - di = dth.getDestInfo(); - subid = di.getSubId(); - feedid = di.getLogData(); - spool = di.getSpool(); - String dfn = spool + "/" + pubid; - String mfn = dfn + ".M"; - datafile = new File(spool + "/" + pubid); - metafile = new File(mfn); - boolean monly = di.isMetaDataOnly(); - date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); - Vector hdrv = new Vector(); - try { - BufferedReader br = new BufferedReader(new FileReader(metafile)); - String s = br.readLine(); - int i = s.indexOf('\t'); - method = s.substring(0, i); - if (!"DELETE".equals(method) && !monly) { - length = datafile.length(); - } - fileid = s.substring(i + 1); - while ((s = br.readLine()) != null) { - i = s.indexOf('\t'); - String h = s.substring(0, i); - String v = s.substring(i + 1); - if ("x-att-dr-routing".equalsIgnoreCase(h)) { - subid = v.replaceAll("[^ ]*/", ""); - feedid = dth.getFeedId(subid.replaceAll(" .*", "")); - } - if (length == 0 && h.toLowerCase().startsWith("content-")) { - continue; - } - if (h.equalsIgnoreCase("content-type")) { - ctype = v; - } - hdrv.add(new String[] {h, v}); - } - br.close(); - } catch (Exception e) { - } - hdrs = hdrv.toArray(new String[hdrv.size()][]); - url = dth.getDestURL(fileid); - } - /** - * Get the publish ID - */ - public String getPublishId() { - return(pubid); - } - /** - * Attempt delivery - */ - public void run() { - attempts++; - try { - di = dth.getDestInfo(); - boolean expect100 = di.isUsing100(); - boolean monly = di.isMetaDataOnly(); - length = 0; - if (!"DELETE".equals(method) && !monly) { - length = datafile.length(); - } - url = dth.getDestURL(fileid); - URL u = new URL(url); - HttpURLConnection uc = (HttpURLConnection)u.openConnection(); - uc.setConnectTimeout(60000); - uc.setReadTimeout(60000); - uc.setInstanceFollowRedirects(false); - uc.setRequestMethod(method); - uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", di.getAuth()); - uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid); - for (String[] nv: hdrs) { - uc.addRequestProperty(nv[0], nv[1]); - } - if (length > 0) { - if (expect100) { - uc.setRequestProperty("Expect", "100-continue"); - } - uc.setFixedLengthStreamingMode(length); - uc.setDoOutput(true); - OutputStream os = null; - try { - os = uc.getOutputStream(); - } catch (ProtocolException pe) { - dth.reportDeliveryExtra(this, -1L); - // Rcvd error instead of 100-continue - } - if (os != null) { - long sofar = 0; - try { - byte[] buf = new byte[1024 * 1024]; - InputStream is = new FileInputStream(datafile); - while (sofar < length) { - int i = buf.length; - if (sofar + i > length) { - i = (int)(length - sofar); - } - i = is.read(buf, 0, i); - if (i <= 0) { - throw new IOException("Unexpected problem reading data file " + datafile); - } - sofar += i; - os.write(buf, 0, i); - } - is.close(); - os.close(); - } catch (IOException ioe) { - dth.reportDeliveryExtra(this, sofar); - throw ioe; - } - } - } - int rc = uc.getResponseCode(); - String rmsg = uc.getResponseMessage(); - if (rmsg == null) { - String h0 = uc.getHeaderField(0); - if (h0 != null) { - int i = h0.indexOf(' '); - int j = h0.indexOf(' ', i + 1); - if (i != -1 && j != -1) { - rmsg = h0.substring(j + 1); - } - } - } - String xpubid = null; - InputStream is; - if (rc >= 200 && rc <= 299) { - is = uc.getInputStream(); - xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID"); - } else { - if (rc >= 300 && rc <= 399) { - rmsg = uc.getHeaderField("Location"); - } - is = uc.getErrorStream(); - } - byte[] buf = new byte[4096]; - if (is != null) { - while (is.read(buf) > 0) { - } - is.close(); - } - dth.reportStatus(this, rc, xpubid, rmsg); - } catch (Exception e) { - dth.reportException(this, e); - } - } - /** - * Remove meta and data files - */ - public void clean() { - datafile.delete(); - metafile.delete(); - hdrs = null; - } - /** - * Has this delivery task been cleaned? - */ - public boolean isCleaned() { - return(hdrs == null); - } - /** - * Get length of body - */ - public long getLength() { - return(length); - } - /** - * Get creation date as encoded in the publish ID. - */ - public long getDate() { - return(date); - } - /** - * Get the most recent delivery attempt URL - */ - public String getURL() { - return(url); - } - /** - * Get the content type - */ - public String getCType() { - return(ctype); - } - /** - * Get the method - */ - public String getMethod() { - return(method); - } - /** - * Get the file ID - */ - public String getFileId() { - return(fileid); - } - /** - * Get the number of delivery attempts - */ - public int getAttempts() { - return(attempts); - } - /** - * Get the (space delimited list of) subscription ID for this delivery task - */ - public String getSubId() { - return(subid); - } - /** - * Get the feed ID for this delivery task - */ - public String getFeedId() { - return(feedid); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java deleted file mode 100644 index 702bb29e..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java +++ /dev/null @@ -1,72 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -/** - * Interface to allow independent testing of the DeliveryTask code. - *

- * This interface represents all the configuraiton information and - * feedback mechanisms that a delivery task needs. - */ - -public interface DeliveryTaskHelper { - /** - * Report that a delivery attempt failed due to an exception (like can't connect to remote host) - * @param task The task that failed - * @param exception The exception that occurred - */ - public void reportException(DeliveryTask task, Exception exception); - /** - * Report that a delivery attempt completed (successfully or unsuccessfully) - * @param task The task that failed - * @param status The HTTP status - * @param xpubid The publish ID from the far end (if any) - * @param location The redirection location for a 3XX response - */ - public void reportStatus(DeliveryTask task, int status, String xpubid, String location); - /** - * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue. - * @param task The task that failed - * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue. - */ - public void reportDeliveryExtra(DeliveryTask task, long sent); - /** - * Get the destination information for the delivery queue - * @return The destination information - */ - public DestInfo getDestInfo(); - /** - * Given a file ID, get the URL to deliver to - * @param fileid The file id - * @return The URL to deliver to - */ - public String getDestURL(String fileid); - /** - * Get the feed ID for a subscription - * @param subid The subscription ID - * @return The feed iD - */ - public String getFeedId(String subid); -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java deleted file mode 100644 index e57fef8b..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java +++ /dev/null @@ -1,132 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -/** - * Information for a delivery destination that doesn't change from message to message - */ -public class DestInfo { - private String name; - private String spool; - private String subid; - private String logdata; - private String url; - private String authuser; - private String authentication; - private boolean metaonly; - private boolean use100; - /** - * Create a destination information object. - * @param name n:fqdn or s:subid - * @param spool The directory where files are spooled. - * @param subid The subscription ID (if applicable). - * @param logdata Text to be included in log messages - * @param url The URL to deliver to. - * @param authuser The auth user for logging. - * @param authentication The credentials. - * @param metaonly Is this a metadata only delivery? - * @param use100 Should I use expect 100-continue? - */ - public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) { - this.name = name; - this.spool = spool; - this.subid = subid; - this.logdata = logdata; - this.url = url; - this.authuser = authuser; - this.authentication = authentication; - this.metaonly = metaonly; - this.use100 = use100; - } - public boolean equals(Object o) { - return((o instanceof DestInfo) && ((DestInfo)o).spool.equals(spool)); - } - public int hashCode() { - return(spool.hashCode()); - } - /** - * Get the name of this destination - */ - public String getName() { - return(name); - } - /** - * Get the spool directory for this destination. - * @return The spool directory - */ - public String getSpool() { - return(spool); - } - /** - * Get the subscription ID. - * @return Subscription ID or null if this is a node to node delivery. - */ - public String getSubId() { - return(subid); - } - /** - * Get the log data. - * @return Text to be included in a log message about delivery attempts. - */ - public String getLogData() { - return(logdata); - } - /** - * Get the delivery URL. - * @return The URL to deliver to (the primary URL). - */ - public String getURL() { - return(url); - - } - /** - * Get the user for authentication - * @return The name of the user for logging - */ - public String getAuthUser() { - return(authuser); - } - /** - * Get the authentication header - * @return The string to use to authenticate to the recipient. - */ - public String getAuth() { - return(authentication); - } - /** - * Is this a metadata only delivery? - * @return True if this is a metadata only delivery - */ - public boolean isMetaDataOnly() { - return(metaonly); - } - /** - * Should I send expect 100-continue header? - * @return True if I should. - */ - public boolean isUsing100() { - return(use100); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java deleted file mode 100644 index bb3e4137..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java +++ /dev/null @@ -1,82 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.util.*; -import java.net.*; - -/** - * Determine if an IP address is from a machine - */ -public class IsFrom { - private long nextcheck; - private String[] ips; - private String fqdn; - /** - * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have any effect. - */ - public static void setDNSCache() { - java.security.Security.setProperty("networkaddress.cache.ttl", "10"); - } - /** - * Create an IsFrom for the specified fully qualified domain name. - */ - public IsFrom(String fqdn) { - this.fqdn = fqdn; - } - /** - * Check if an IP address matches. If it has been more than - * 10 seconds since DNS was last checked for changes to the - * IP address(es) of this FQDN, check again. Then check - * if the specified IP address belongs to the FQDN. - */ - public synchronized boolean isFrom(String ip) { - long now = System.currentTimeMillis(); - if (now > nextcheck) { - nextcheck = now + 10000; - Vector v = new Vector(); - try { - InetAddress[] addrs = InetAddress.getAllByName(fqdn); - for (InetAddress a: addrs) { - v.add(a.getHostAddress()); - } - } catch (Exception e) { - } - ips = v.toArray(new String[v.size()]); - } - for (String s: ips) { - if (s.equals(ip)) { - return(true); - } - } - return(false); - } - /** - * Return the fully qualified domain name - */ - public String toString() { - return(fqdn); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java deleted file mode 100644 index 078deaa1..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java +++ /dev/null @@ -1,159 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ -package com.att.research.datarouter.node; - -import java.util.*; -import java.util.regex.*; -import java.io.*; -import java.nio.file.*; -import java.text.*; - -/** - * Cleanup of old log files. - *

- * Periodically scan the log directory for log files that are older than - * the log file retention interval, and delete them. In a future release, - * This class will also be responsible for uploading events logs to the - * log server to support the log query APIs. - */ - -public class LogManager extends TimerTask { - private NodeConfigManager config; - private Matcher isnodelog; - private Matcher iseventlog; - private Uploader worker; - private String uploaddir; - private String logdir; - private class Uploader extends Thread implements DeliveryQueueHelper { - public long getInitFailureTimer() { return(10000L); } - public double getFailureBackoff() { return(2.0); } - public long getMaxFailureTimer() { return(150000L); } - public long getExpirationTimer() { return(604800000L); } - public int getFairFileLimit() { return(10000); } - public long getFairTimeLimit() { return(86400000); } - public String getDestURL(DestInfo dest, String fileid) { - return(config.getEventLogUrl()); - } - public void handleUnreachable(DestInfo dest) {} - public boolean handleRedirection(DestInfo dest, String location, String fileid) { return(false); } - public boolean isFollowRedirects() { return(false); } - public String getFeedId(String subid) { return(null); } - private DeliveryQueue dq; - public Uploader() { - dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, false)); - setDaemon(true); - setName("Log Uploader"); - start(); - } - private synchronized void snooze() { - try { - wait(10000); - } catch (Exception e) { - } - } - private synchronized void poke() { - notify(); - } - public void run() { - while (true) { - scan(); - dq.run(); - snooze(); - } - } - private void scan() { - long threshold = System.currentTimeMillis() - config.getLogRetention(); - File dir = new File(logdir); - String[] fns = dir.list(); - Arrays.sort(fns); - String lastqueued = "events-000000000000.log"; - String curlog = StatusLog.getCurLogFile(); - curlog = curlog.substring(curlog.lastIndexOf('/') + 1); - try { - Writer w = new FileWriter(uploaddir + "/.meta"); - w.write("POST\tlogdata\nContent-Type\ttext/plain\n"); - w.close(); - BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued")); - lastqueued = br.readLine(); - br.close(); - } catch (Exception e) { - } - for (String fn: fns) { - if (!isnodelog.reset(fn).matches()) { - if (!iseventlog.reset(fn).matches()) { - continue; - } - if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) { - lastqueued = fn; - try { - String pid = config.getPublishId(); - Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn)); - Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta")); - } catch (Exception e) { - } - } - } - File f = new File(dir, fn); - if (f.lastModified() < threshold) { - f.delete(); - } - } - try { - (new File(uploaddir + "/.meta")).delete(); - Writer w = new FileWriter(uploaddir + "/.lastqueued"); - w.write(lastqueued + "\n"); - w.close(); - } catch (Exception e) { - } - } - } - /** - * Construct a log manager - *

- * The log manager will check for expired log files every 5 minutes - * at 20 seconds after the 5 minute boundary. (Actually, the - * interval is the event log rollover interval, which - * defaults to 5 minutes). - */ - public LogManager(NodeConfigManager config) { - this.config = config; - try { - isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher(""); - iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher(""); - } catch (Exception e) {} - logdir = config.getLogDir(); - uploaddir = logdir + "/.spool"; - (new File(uploaddir)).mkdirs(); - long now = System.currentTimeMillis(); - long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 300000); - long when = now - now % intvl + intvl + 20000L; - config.getTimer().scheduleAtFixedRate(this, when - now, intvl); - worker = new Uploader(); - } - /** - * Trigger check for expired log files and log files to upload - */ - public void run() { - worker.poke(); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java deleted file mode 100644 index 689f7653..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java +++ /dev/null @@ -1,722 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.util.*; -import java.io.*; - -/** - * Processed configuration for this node. - *

- * The NodeConfig represents a processed configuration from the Data Router - * provisioning server. Each time configuration data is received from the - * provisioning server, a new NodeConfig is created and the previous one - * discarded. - */ -public class NodeConfig { - /** - * Raw configuration entry for a data router node - */ - public static class ProvNode { - private String cname; - /** - * Construct a node configuration entry. - * @param cname The cname of the node. - */ - public ProvNode(String cname) { - this.cname = cname; - } - /** - * Get the cname of the node - */ - public String getCName() { - return(cname); - } - } - /** - * Raw configuration entry for a provisioning parameter - */ - public static class ProvParam { - private String name; - private String value; - /** - * Construct a provisioning parameter configuration entry. - * @param name The name of the parameter. - * @param value The value of the parameter. - */ - public ProvParam(String name, String value) { - this.name = name; - this.value = value; - } - /** - * Get the name of the parameter. - */ - public String getName() { - return(name); - } - /** - * Get the value of the parameter. - */ - public String getValue() { - return(value); - } - } - /** - * Raw configuration entry for a data feed. - */ - public static class ProvFeed { - private String id; - private String logdata; - private String status; - /** - * Construct a feed configuration entry. - * @param id The feed ID of the entry. - * @param logdata String for log entries about the entry. - * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or null if it is valid. - */ - public ProvFeed(String id, String logdata, String status) { - this.id = id; - this.logdata = logdata; - this.status = status; - } - /** - * Get the feed id of the data feed. - */ - public String getId() { - return(id); - } - /** - * Get the log data of the data feed. - */ - public String getLogData() { - return(logdata); - } - /** - * Get the status of the data feed. - */ - public String getStatus() { - return(status); - } - } - /** - * Raw configuration entry for a feed user. - */ - public static class ProvFeedUser { - private String feedid; - private String user; - private String credentials; - /** - * Construct a feed user configuration entry - * @param feedid The feed id. - * @param user The user that will publish to the feed. - * @param credentials The Authorization header the user will use to publish. - */ - public ProvFeedUser(String feedid, String user, String credentials) { - this.feedid = feedid; - this.user = user; - this.credentials = credentials; - } - /** - * Get the feed id of the feed user. - */ - public String getFeedId() { - return(feedid); - } - /** - * Get the user for the feed user. - */ - public String getUser() { - return(user); - } - /** - * Get the credentials for the feed user. - */ - public String getCredentials() { - return(credentials); - } - } - /** - * Raw configuration entry for a feed subnet - */ - public static class ProvFeedSubnet { - private String feedid; - private String cidr; - /** - * Construct a feed subnet configuration entry - * @param feedid The feed ID - * @param cidr The CIDR allowed to publish to the feed. - */ - public ProvFeedSubnet(String feedid, String cidr) { - this.feedid = feedid; - this.cidr = cidr; - } - /** - * Get the feed id of the feed subnet. - */ - public String getFeedId() { - return(feedid); - } - /** - * Get the CIDR of the feed subnet. - */ - public String getCidr() { - return(cidr); - } - } - /** - * Raw configuration entry for a subscription - */ - public static class ProvSubscription { - private String subid; - private String feedid; - private String url; - private String authuser; - private String credentials; - private boolean metaonly; - private boolean use100; - /** - * Construct a subscription configuration entry - * @param subid The subscription ID - * @param feedid The feed ID - * @param url The base delivery URL (not including the fileid) - * @param authuser The user in the credentials used to deliver - * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the Authorization header. - * @param metaonly Is this a meta data only subscription? - * @param use100 Should we send Expect: 100-continue? - */ - public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100) { - this.subid = subid; - this.feedid = feedid; - this.url = url; - this.authuser = authuser; - this.credentials = credentials; - this.metaonly = metaonly; - this.use100 = use100; - } - /** - * Get the subscription ID - */ - public String getSubId() { - return(subid); - } - /** - * Get the feed ID - */ - public String getFeedId() { - return(feedid); - } - /** - * Get the delivery URL - */ - public String getURL() { - return(url); - } - /** - * Get the user - */ - public String getAuthUser() { - return(authuser); - } - /** - * Get the delivery credentials - */ - public String getCredentials() { - return(credentials); - } - /** - * Is this a meta data only subscription? - */ - public boolean isMetaDataOnly() { - return(metaonly); - } - /** - * Should we send Expect: 100-continue? - */ - public boolean isUsing100() { - return(use100); - } - } - /** - * Raw configuration entry for controlled ingress to the data router node - */ - public static class ProvForceIngress { - private String feedid; - private String subnet; - private String user; - private String[] nodes; - /** - * Construct a forced ingress configuration entry - * @param feedid The feed ID that this entry applies to - * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all publisher IP addresses - * @param user The publishing user this entry applies to or "" if it applies to all publishing users. - * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to. - */ - public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) { - this.feedid = feedid; - this.subnet = subnet; - this.user = user; - this.nodes = nodes; - } - /** - * Get the feed ID - */ - public String getFeedId() { - return(feedid); - } - /** - * Get the subnet - */ - public String getSubnet() { - return(subnet); - } - /** - * Get the user - */ - public String getUser() { - return(user); - } - /** - * Get the node - */ - public String[] getNodes() { - return(nodes); - } - } - /** - * Raw configuration entry for controlled egress from the data router - */ - public static class ProvForceEgress { - private String subid; - private String node; - /** - * Construct a forced egress configuration entry - * @param subid The subscription ID the subscription with forced egress - * @param node The node handling deliveries for this subscription - */ - public ProvForceEgress(String subid, String node) { - this.subid = subid; - this.node = node; - } - /** - * Get the subscription ID - */ - public String getSubId() { - return(subid); - } - /** - * Get the node - */ - public String getNode() { - return(node); - } - } - /** - * Raw configuration entry for routing within the data router network - */ - public static class ProvHop { - private String from; - private String to; - private String via; - /** - * A human readable description of this entry - */ - public String toString() { - return("Hop " + from + "->" + to + " via " + via); - } - /** - * Construct a hop entry - * @param from The FQDN of the node with the data to be delivered - * @param to The FQDN of the node that will deliver to the subscriber - * @param via The FQDN of the node where the from node should send the data - */ - public ProvHop(String from, String to, String via) { - this.from = from; - this.to = to; - this.via = via; - } - /** - * Get the from node - */ - public String getFrom() { - return(from); - } - /** - * Get the to node - */ - public String getTo() { - return(to); - } - /** - * Get the next intermediate node - */ - public String getVia() { - return(via); - } - } - private static class Redirection { - public SubnetMatcher snm; - public String user; - public String[] nodes; - } - private static class Feed { - public String loginfo; - public String status; - public SubnetMatcher[] subnets; - public Hashtable authusers = new Hashtable(); - public Redirection[] redirections; - public Target[] targets; - } - private Hashtable params = new Hashtable(); - private Hashtable feeds = new Hashtable(); - private Hashtable nodeinfo = new Hashtable(); - private Hashtable subinfo = new Hashtable(); - private Hashtable nodes = new Hashtable(); - private String myname; - private String myauth; - private DestInfo[] alldests; - private int rrcntr; - /** - * Process the raw provisioning data to configure this node - * @param pd The parsed provisioning data - * @param myname My name as seen by external systems - * @param spooldir The directory where temporary files live - * @param port The port number for URLs - * @param nodeauthkey The keying string used to generate node authentication credentials - */ - public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) { - this.myname = myname; - for (ProvParam p: pd.getParams()) { - params.put(p.getName(), p.getValue()); - } - Vector div = new Vector(); - myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); - for (ProvNode pn: pd.getNodes()) { - String cn = pn.getCName(); - if (nodeinfo.get(cn) != null) { - continue; - } - String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey); - DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true); - (new File(di.getSpool())).mkdirs(); - div.add(di); - nodeinfo.put(cn, di); - nodes.put(auth, new IsFrom(cn)); - } - PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[nodeinfo.size()]), pd.getHops()); - Hashtable> rdtab = new Hashtable>(); - for (ProvForceIngress pfi: pd.getForceIngress()) { - Vector v = rdtab.get(pfi.getFeedId()); - if (v == null) { - v = new Vector(); - rdtab.put(pfi.getFeedId(), v); - } - Redirection r = new Redirection(); - if (pfi.getSubnet() != null) { - r.snm = new SubnetMatcher(pfi.getSubnet()); - } - r.user = pfi.getUser(); - r.nodes = pfi.getNodes(); - v.add(r); - } - Hashtable> pfutab = new Hashtable>(); - for (ProvFeedUser pfu: pd.getFeedUsers()) { - Hashtable t = pfutab.get(pfu.getFeedId()); - if (t == null) { - t = new Hashtable(); - pfutab.put(pfu.getFeedId(), t); - } - t.put(pfu.getCredentials(), pfu.getUser()); - } - Hashtable egrtab = new Hashtable(); - for (ProvForceEgress pfe: pd.getForceEgress()) { - if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) { - continue; - } - egrtab.put(pfe.getSubId(), pfe.getNode()); - } - Hashtable> pfstab = new Hashtable>(); - for (ProvFeedSubnet pfs: pd.getFeedSubnets()) { - Vector v = pfstab.get(pfs.getFeedId()); - if (v == null) { - v = new Vector(); - pfstab.put(pfs.getFeedId(), v); - } - v.add(new SubnetMatcher(pfs.getCidr())); - } - Hashtable ttab = new Hashtable(); - HashSet allfeeds = new HashSet(); - for (ProvFeed pfx: pd.getFeeds()) { - if (pfx.getStatus() == null) { - allfeeds.add(pfx.getId()); - } - } - for (ProvSubscription ps: pd.getSubscriptions()) { - String sid = ps.getSubId(); - String fid = ps.getFeedId(); - if (!allfeeds.contains(fid)) { - continue; - } - if (subinfo.get(sid) != null) { - continue; - } - int sididx = 999; - try { - sididx = Integer.parseInt(sid); - sididx -= sididx % 100; - } catch (Exception e) { - } - String siddir = sididx + "/" + sid; - DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100()); - (new File(di.getSpool())).mkdirs(); - div.add(di); - subinfo.put(sid, di); - String egr = egrtab.get(sid); - if (egr != null) { - sid = pf.getPath(egr) + sid; - } - StringBuffer sb = ttab.get(fid); - if (sb == null) { - sb = new StringBuffer(); - ttab.put(fid, sb); - } - sb.append(' ').append(sid); - } - alldests = div.toArray(new DestInfo[div.size()]); - for (ProvFeed pfx: pd.getFeeds()) { - String fid = pfx.getId(); - Feed f = feeds.get(fid); - if (f != null) { - continue; - } - f = new Feed(); - feeds.put(fid, f); - f.loginfo = pfx.getLogData(); - f.status = pfx.getStatus(); - Vector v1 = pfstab.get(fid); - if (v1 == null) { - f.subnets = new SubnetMatcher[0]; - } else { - f.subnets = v1.toArray(new SubnetMatcher[v1.size()]); - } - Hashtable h1 = pfutab.get(fid); - if (h1 == null) { - h1 = new Hashtable(); - } - f.authusers = h1; - Vector v2 = rdtab.get(fid); - if (v2 == null) { - f.redirections = new Redirection[0]; - } else { - f.redirections = v2.toArray(new Redirection[v2.size()]); - } - StringBuffer sb = ttab.get(fid); - if (sb == null) { - f.targets = new Target[0]; - } else { - f.targets = parseRouting(sb.toString()); - } - } - } - /** - * Parse a target string into an array of targets - * @param routing Target string - * @return Array of targets. - */ - public Target[] parseRouting(String routing) { - routing = routing.trim(); - if ("".equals(routing)) { - return(new Target[0]); - } - String[] xx = routing.split("\\s+"); - Hashtable tmap = new Hashtable(); - HashSet subset = new HashSet(); - Vector tv = new Vector(); - Target[] ret = new Target[xx.length]; - for (int i = 0; i < xx.length; i++) { - String t = xx[i]; - int j = t.indexOf('/'); - if (j == -1) { - DestInfo di = subinfo.get(t); - if (di == null) { - tv.add(new Target(null, t)); - } else { - if (!subset.contains(t)) { - subset.add(t); - tv.add(new Target(di, null)); - } - } - } else { - String node = t.substring(0, j); - String rtg = t.substring(j + 1); - DestInfo di = nodeinfo.get(node); - if (di == null) { - tv.add(new Target(null, t)); - } else { - Target tt = tmap.get(node); - if (tt == null) { - tt = new Target(di, rtg); - tmap.put(node, tt); - tv.add(tt); - } else { - tt.addRouting(rtg); - } - } - } - } - return(tv.toArray(new Target[tv.size()])); - } - /** - * Check whether this is a valid node-to-node transfer - * @param credentials Credentials offered by the supposed node - * @param ip IP address the request came from - */ - public boolean isAnotherNode(String credentials, String ip) { - IsFrom n = nodes.get(credentials); - return (n != null && n.isFrom(ip)); - } - /** - * Check whether publication is allowed. - * @param feedid The ID of the feed being requested. - * @param credentials The offered credentials - * @param ip The requesting IP address - */ - public String isPublishPermitted(String feedid, String credentials, String ip) { - Feed f = feeds.get(feedid); - String nf = "Feed does not exist"; - if (f != null) { - nf = f.status; - } - if (nf != null) { - return(nf); - } - String user = f.authusers.get(credentials); - if (user == null) { - return("Publisher not permitted for this feed"); - } - if (f.subnets.length == 0) { - return(null); - } - byte[] addr = NodeUtils.getInetAddress(ip); - for (SubnetMatcher snm: f.subnets) { - if (snm.matches(addr)) { - return(null); - } - } - return("Publisher not permitted for this feed"); - } - /** - * Get authenticated user - */ - public String getAuthUser(String feedid, String credentials) { - return(feeds.get(feedid).authusers.get(credentials)); - } - /** - * Check if the request should be redirected to a different ingress node - */ - public String getIngressNode(String feedid, String user, String ip) { - Feed f = feeds.get(feedid); - if (f.redirections.length == 0) { - return(null); - } - byte[] addr = NodeUtils.getInetAddress(ip); - for (Redirection r: f.redirections) { - if (r.user != null && !user.equals(r.user)) { - continue; - } - if (r.snm != null && !r.snm.matches(addr)) { - continue; - } - for (String n: r.nodes) { - if (myname.equals(n)) { - return(null); - } - } - if (r.nodes.length == 0) { - return(null); - } - return(r.nodes[rrcntr++ % r.nodes.length]); - } - return(null); - } - /** - * Get a provisioned configuration parameter - */ - public String getProvParam(String name) { - return(params.get(name)); - } - /** - * Get all the DestInfos - */ - public DestInfo[] getAllDests() { - return(alldests); - } - /** - * Get the targets for a feed - * @param feedid The feed ID - * @return The targets this feed should be delivered to - */ - public Target[] getTargets(String feedid) { - if (feedid == null) { - return(new Target[0]); - } - Feed f = feeds.get(feedid); - if (f == null) { - return(new Target[0]); - } - return(f.targets); - } - /** - * Get the feed ID for a subscription - * @param subid The subscription ID - * @return The feed ID - */ - public String getFeedId(String subid) { - DestInfo di = subinfo.get(subid); - if (di == null) { - return(null); - } - return(di.getLogData()); - } - /** - * Get the spool directory for a subscription - * @param subid The subscription ID - * @return The spool directory - */ - public String getSpoolDir(String subid) { - DestInfo di = subinfo.get(subid); - if (di == null) { - return(null); - } - return(di.getSpool()); - } - /** - * Get the Authorization value this node uses - * @return The Authorization header value for this node - */ - public String getMyAuth() { - return(myauth); - } - -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java deleted file mode 100644 index 01ca4426..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java +++ /dev/null @@ -1,599 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.net.*; -import java.util.*; -import java.io.*; -import org.apache.log4j.Logger; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.research.datarouter.node.eelf.EelfMsgs; - - -/** - * Maintain the configuration of a Data Router node - *

- * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention subsystems to access configuration information. (Log4J has its own configuration mechanism). - *

- * There are two basic sets of configuration data. The - * static local configuration data, stored in a local configuration file (created - * as part of installation by SWM), and the dynamic global - * configuration data fetched from the data router provisioning server. - */ -public class NodeConfigManager implements DeliveryQueueHelper { - private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeConfigManager"); - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeConfigManager"); - private static NodeConfigManager base = new NodeConfigManager(); - - private Timer timer = new Timer("Node Configuration Timer", true); - private long maxfailuretimer; - private long initfailuretimer; - private long expirationtimer; - private double failurebackoff; - private long fairtimelimit; - private int fairfilelimit; - private double fdpstart; - private double fdpstop; - private int deliverythreads; - private String provurl; - private String provhost; - private IsFrom provcheck; - private int gfport; - private int svcport; - private int port; - private String spooldir; - private String logdir; - private long logretention; - private String redirfile; - private String kstype; - private String ksfile; - private String kspass; - private String kpass; - private String tstype; - private String tsfile; - private String tspass; - private String myname; - private RedirManager rdmgr; - private RateLimitedOperation pfetcher; - private NodeConfig config; - private File quiesce; - private PublishId pid; - private String nak; - private TaskList configtasks = new TaskList(); - private String eventlogurl; - private String eventlogprefix; - private String eventlogsuffix; - private String eventloginterval; - private boolean followredirects; - - - /** - * Get the default node configuration manager - */ - public static NodeConfigManager getInstance() { - return(base); - } - /** - * Initialize the configuration of a Data Router node - */ - private NodeConfigManager() { - Properties p = new Properties(); - try { - p.load(new FileInputStream(System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"))); - } catch (Exception e) { - - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR); - logger.error("NODE0301 Unable to load local configuration file " + System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"), e); - } - provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov"); - try { - provhost = (new URL(provurl)).getHost(); - } catch (Exception e) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl); - logger.error("NODE0302 Bad provisioning server URL " + provurl); - System.exit(1); - } - logger.info("NODE0303 Provisioning server is " + provhost); - eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs"); - provcheck = new IsFrom(provhost); - gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080")); - svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443")); - port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443")); - long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000")); - long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000")); - spooldir = p.getProperty("SpoolDir", "spool"); - File fdir = new File(spooldir + "/f"); - fdir.mkdirs(); - for (File junk: fdir.listFiles()) { - if (junk.isFile()) { - junk.delete(); - } - } - logdir = p.getProperty("LogDir", "logs"); - (new File(logdir)).mkdirs(); - logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L; - eventlogprefix = logdir + "/events"; - eventlogsuffix = ".log"; - String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat"); - kstype = p.getProperty("KeyStoreType", "jks"); - ksfile = p.getProperty("KeyStoreFile", "etc/keystore"); - kspass = p.getProperty("KeyStorePassword", "changeme"); - kpass = p.getProperty("KeyPassword", "changeme"); - tstype = p.getProperty("TrustStoreType", "jks"); - tsfile = p.getProperty("TrustStoreFile"); - tspass = p.getProperty("TrustStorePassword", "changeme"); - if (tsfile != null && tsfile.length() > 0) { - System.setProperty("javax.net.ssl.trustStoreType", tstype); - System.setProperty("javax.net.ssl.trustStore", tsfile); - System.setProperty("javax.net.ssl.trustStorePassword", tspass); - } - nak = p.getProperty("NodeAuthKey", "Node123!"); - quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN")); - myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass); - if (myname == null) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile); - logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile); - System.exit(1); - } - logger.info("NODE0304 My certificate says my name is " + myname); - pid = new PublishId(myname); - rdmgr = new RedirManager(redirfile, minrsinterval, timer); - pfetcher = new RateLimitedOperation(minpfinterval, timer) { - public void run() { - fetchconfig(); - } - }; - logger.info("NODE0305 Attempting to fetch configuration at " + provurl); - pfetcher.request(); - } - private void localconfig() { - followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); - eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m"); - initfailuretimer = 10000; - maxfailuretimer = 3600000; - expirationtimer = 86400000; - failurebackoff = 2.0; - deliverythreads = 40; - fairfilelimit = 100; - fairtimelimit = 60000; - fdpstart = 0.05; - fdpstop = 0.2; - try { initfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) {} - try { maxfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) {} - try { expirationtimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) {} - try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) {} - try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) {} - try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) {} - try { fairtimelimit = (long)(Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) {} - try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) {} - try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) {} - if (fdpstart < 0.01) { - fdpstart = 0.01; - } - if (fdpstart > 0.5) { - fdpstart = 0.5; - } - if (fdpstop < fdpstart) { - fdpstop = fdpstart; - } - if (fdpstop > 0.5) { - fdpstop = 0.5; - } - } - private void fetchconfig() { - try { - System.out.println("provurl:: "+provurl); - Reader r = new InputStreamReader((new URL(provurl)).openStream()); - config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak); - localconfig(); - configtasks.startRun(); - Runnable rr; - while ((rr = configtasks.next()) != null) { - try { - rr.run(); - } catch (Exception e) { - } - } - } catch (Exception e) { - e.printStackTrace(); - NodeUtils.setIpAndFqdnForEelf("fetchconfigs"); - eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString()); - logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e); - pfetcher.request(); - } - } - /** - * Process a gofetch request from a particular IP address. If the - * IP address is not an IP address we would go to to fetch the - * provisioning data, ignore the request. If the data has been - * fetched very recently (default 10 seconds), wait a while before fetching again. - */ - public synchronized void gofetch(String remoteaddr) { - if (provcheck.isFrom(remoteaddr)) { - logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr); - pfetcher.request(); - } else { - logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr); - } - } - /** - * Am I configured? - */ - public boolean isConfigured() { - return(config != null); - } - /** - * Am I shut down? - */ - public boolean isShutdown() { - return(quiesce.exists()); - } - /** - * Given a routing string, get the targets. - * @param routing Target string - * @return array of targets - */ - public Target[] parseRouting(String routing) { - return(config.parseRouting(routing)); - } - /** - * Given a set of credentials and an IP address, is this request from another node? - * @param credentials Credentials offered by the supposed node - * @param ip IP address the request came from - * @return If the credentials and IP address are recognized, true, otherwise false. - */ - public boolean isAnotherNode(String credentials, String ip) { - return(config.isAnotherNode(credentials, ip)); - } - /** - * Check whether publication is allowed. - * @param feedid The ID of the feed being requested - * @param credentials The offered credentials - * @param ip The requesting IP address - * @return True if the IP and credentials are valid for the specified feed. - */ - public String isPublishPermitted(String feedid, String credentials, String ip) { - return(config.isPublishPermitted(feedid, credentials, ip)); - } - /** - * Check who the user is given the feed ID and the offered credentials. - * @param feedid The ID of the feed specified - * @param credentials The offered credentials - * @return Null if the credentials are invalid or the user if they are valid. - */ - public String getAuthUser(String feedid, String credentials) { - return(config.getAuthUser(feedid, credentials)); - } - /** - * Check if the publish request should be sent to another node based on the feedid, user, and source IP address. - * @param feedid The ID of the feed specified - * @param user The publishing user - * @param ip The IP address of the publish endpoint - * @return Null if the request should be accepted or the correct hostname if it should be sent to another node. - */ - public String getIngressNode(String feedid, String user, String ip) { - return(config.getIngressNode(feedid, user, ip)); - } - /** - * Get a provisioned configuration parameter (from the provisioning server configuration) - * @param name The name of the parameter - * @return The value of the parameter or null if it is not defined. - */ - public String getProvParam(String name) { - return(config.getProvParam(name)); - } - /** - * Get a provisioned configuration parameter (from the provisioning server configuration) - * @param name The name of the parameter - * @param deflt The value to use if the parameter is not defined - * @return The value of the parameter or deflt if it is not defined. - */ - public String getProvParam(String name, String deflt) { - name = config.getProvParam(name); - if (name == null) { - name = deflt; - } - return(name); - } - /** - * Generate a publish ID - */ - public String getPublishId() { - return(pid.next()); - } - /** - * Get all the outbound spooling destinations. - * This will include both subscriptions and nodes. - */ - public DestInfo[] getAllDests() { - return(config.getAllDests()); - } - /** - * Register a task to run whenever the configuration changes - */ - public void registerConfigTask(Runnable task) { - configtasks.addTask(task); - } - /** - * Deregister a task to run whenever the configuration changes - */ - public void deregisterConfigTask(Runnable task) { - configtasks.removeTask(task); - } - /** - * Get the URL to deliver a message to. - * @param destinfo The destination information - * @param fileid The file ID - * @return The URL to deliver to - */ - public String getDestURL(DestInfo destinfo, String fileid) { - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); - if (followredirects && subid != null) { - purl = rdmgr.lookup(subid, purl); - } - return(purl + "/" + fileid); - } - /** - * Is a destination redirected? - */ - public boolean isDestRedirected(DestInfo destinfo) { - return(followredirects && rdmgr.isRedirected(destinfo.getSubId())); - } - /** - * Set up redirection on receipt of a 3XX from a target URL - */ - public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) { - fileid = "/" + fileid; - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); - if (followredirects && subid != null && redirto.endsWith(fileid)) { - redirto = redirto.substring(0, redirto.length() - fileid.length()); - if (!redirto.equals(purl)) { - rdmgr.redirect(subid, purl, redirto); - return(true); - } - } - return(false); - } - /** - * Handle unreachable target URL - */ - public void handleUnreachable(DestInfo destinfo) { - String subid = destinfo.getSubId(); - if (followredirects && subid != null) { - rdmgr.forget(subid); - } - } - /** - * Get the timeout before retrying after an initial delivery failure - */ - public long getInitFailureTimer() { - return(initfailuretimer); - } - /** - * Get the maximum timeout between delivery attempts - */ - public long getMaxFailureTimer() { - return(maxfailuretimer); - } - /** - * Get the ratio between consecutive delivery attempts - */ - public double getFailureBackoff() { - return(failurebackoff); - } - /** - * Get the expiration timer for deliveries - */ - public long getExpirationTimer() { - return(expirationtimer); - } - /** - * Get the maximum number of file delivery attempts before checking - * if another queue has work to be performed. - */ - public int getFairFileLimit() { - return(fairfilelimit); - } - /** - * Get the maximum amount of time spent delivering files before - * checking if another queue has work to be performed. - */ - public long getFairTimeLimit() { - return(fairtimelimit); - } - /** - * Get the targets for a feed - * @param feedid The feed ID - * @return The targets this feed should be delivered to - */ - public Target[] getTargets(String feedid) { - return(config.getTargets(feedid)); - } - /** - * Get the spool directory for temporary files - */ - public String getSpoolDir() { - return(spooldir + "/f"); - } - /** - * Get the base directory for spool directories - */ - public String getSpoolBase() { - return(spooldir); - } - /** - * Get the key store type - */ - public String getKSType() { - return(kstype); - } - /** - * Get the key store file - */ - public String getKSFile() { - return(ksfile); - } - /** - * Get the key store password - */ - public String getKSPass() { - return(kspass); - } - /** - * Get the key password - */ - public String getKPass() { - return(kpass); - } - /** - * Get the http port - */ - public int getHttpPort() { - return(gfport); - } - /** - * Get the https port - */ - public int getHttpsPort() { - return(svcport); - } - /** - * Get the externally visible https port - */ - public int getExtHttpsPort() { - return(port); - } - /** - * Get the external name of this machine - */ - public String getMyName() { - return(myname); - } - /** - * Get the number of threads to use for delivery - */ - public int getDeliveryThreads() { - return(deliverythreads); - } - /** - * Get the URL for uploading the event log data - */ - public String getEventLogUrl() { - return(eventlogurl); - } - /** - * Get the prefix for the names of event log files - */ - public String getEventLogPrefix() { - return(eventlogprefix); - } - /** - * Get the suffix for the names of the event log files - */ - public String getEventLogSuffix() { - return(eventlogsuffix); - } - /** - * Get the interval between event log file rollovers - */ - public String getEventLogInterval() { - return(eventloginterval); - } - /** - * Should I follow redirects from subscribers? - */ - public boolean isFollowRedirects() { - return(followredirects); - } - /** - * Get the directory where the event and node log files live - */ - public String getLogDir() { - return(logdir); - } - /** - * How long do I keep log files (in milliseconds) - */ - public long getLogRetention() { - return(logretention); - } - /** - * Get the timer - */ - public Timer getTimer() { - return(timer); - } - /** - * Get the feed ID for a subscription - * @param subid The subscription ID - * @return The feed ID - */ - public String getFeedId(String subid) { - return(config.getFeedId(subid)); - } - /** - * Get the authorization string this node uses - * @return The Authorization string for this node - */ - public String getMyAuth() { - return(config.getMyAuth()); - } - /** - * Get the fraction of free spool disk space where we start throwing away undelivered files. This is FREE_DISK_RED_PERCENT / 100.0. Default is 0.05. Limited by 0.01 <= FreeDiskStart <= 0.5. - */ - public double getFreeDiskStart() { - return(fdpstart); - } - /** - * Get the fraction of free spool disk space where we stop throwing away undelivered files. This is FREE_DISK_YELLOW_PERCENT / 100.0. Default is 0.2. Limited by FreeDiskStart <= FreeDiskStop <= 0.5. - */ - public double getFreeDiskStop() { - return(fdpstop); - } - /** - * Get the spool directory for a subscription - */ - public String getSpoolDir(String subid, String remoteaddr) { - if (provcheck.isFrom(remoteaddr)) { - String sdir = config.getSpoolDir(subid); - if (sdir != null) { - logger.info("NODE0310 Received subscription reset request for subscription " + subid + " from provisioning server " + remoteaddr); - } else { - logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid + " from provisioning server " + remoteaddr); - } - return(sdir); - } else { - logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr); - return(null); - } - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java deleted file mode 100644 index c9390419..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java +++ /dev/null @@ -1,113 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import org.eclipse.jetty.servlet.*; -import org.eclipse.jetty.util.ssl.*; -import org.eclipse.jetty.server.*; -import org.eclipse.jetty.server.nio.*; -import org.eclipse.jetty.server.ssl.*; -import org.apache.log4j.Logger; - -/** - * The main starting point for the Data Router node - */ -public class NodeMain { - private NodeMain() {} - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeMain"); - private static class wfconfig implements Runnable { - private NodeConfigManager ncm; - public wfconfig(NodeConfigManager ncm) { - this.ncm = ncm; - } - public synchronized void run() { - notify(); - } - public synchronized void waitforconfig() { - ncm.registerConfigTask(this); - while (!ncm.isConfigured()) { - logger.info("NODE0003 Waiting for Node Configuration"); - try { - wait(); - } catch (Exception e) { - } - } - ncm.deregisterConfigTask(this); - logger.info("NODE0004 Node Configuration Data Received"); - } - } - private static Delivery d; - private static NodeConfigManager ncm; - /** - * Reset the retry timer for a subscription - */ - public static void resetQueue(String subid, String ip) { - d.resetQueue(ncm.getSpoolDir(subid, ip)); - } - /** - * Start the data router. - *

- * The location of the node configuration file can be set using the - * com.att.research.datarouter.node.ConfigFile system property. By - * default, it is "etc/node.properties". - */ - public static void main(String[] args) throws Exception { - logger.info("NODE0001 Data Router Node Starting"); - IsFrom.setDNSCache(); - ncm = NodeConfigManager.getInstance(); - logger.info("NODE0002 I am " + ncm.getMyName()); - (new wfconfig(ncm)).waitforconfig(); - d = new Delivery(ncm); - LogManager lm = new LogManager(ncm); - Server server = new Server(); - SelectChannelConnector http = new SelectChannelConnector(); - http.setPort(ncm.getHttpPort()); - http.setMaxIdleTime(2000); - http.setRequestHeaderSize(2048); - SslSelectChannelConnector https = new SslSelectChannelConnector(); - https.setPort(ncm.getHttpsPort()); - https.setMaxIdleTime(30000); - https.setRequestHeaderSize(8192); - SslContextFactory cf = https.getSslContextFactory(); - - /**Skip SSLv3 Fixes*/ - cf.addExcludeProtocols("SSLv3"); - logger.info("Excluded protocols node-"+cf.getExcludeProtocols()); - /**End of SSLv3 Fixes*/ - - cf.setKeyStoreType(ncm.getKSType()); - cf.setKeyStorePath(ncm.getKSFile()); - cf.setKeyStorePassword(ncm.getKSPass()); - cf.setKeyManagerPassword(ncm.getKPass()); - server.setConnectors(new Connector[] { http, https }); - ServletContextHandler ctxt = new ServletContextHandler(0); - ctxt.setContextPath("/"); - server.setHandler(ctxt); - ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*"); - logger.info("NODE0005 Data Router Node Activating Service"); - server.start(); - server.join(); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java deleted file mode 100644 index e0ec1f5b..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java +++ /dev/null @@ -1,380 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import javax.servlet.*; -import javax.servlet.http.*; -import java.util.*; -import java.util.regex.*; -import java.io.*; -import java.nio.file.*; -import org.apache.log4j.Logger; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.research.datarouter.node.eelf.EelfMsgs; - -import java.net.*; - -/** - * Servlet for handling all http and https requests to the data router node - *

- * Handled requests are: - *
- * GET http://node/internal/fetchProv - fetch the provisioning data - *
- * PUT/DELETE https://node/internal/publish/fileid - n2n transfer - *
- * PUT/DELETE https://node/publish/feedid/fileid - publsh request - */ -public class NodeServlet extends HttpServlet { - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeServlet"); - private static NodeConfigManager config; - private static Pattern MetaDataPattern; - private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25"); - //Adding EELF Logger Rally:US664892 - private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeServlet"); - - static { - try { - String ws = "\\s*"; - // assume that \\ and \" have been replaced by X - String string = "\"[^\"]*\""; - //String string = "\"(?:[^\"\\\\]|\\\\.)*\""; - String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?"; - String value = "(?:" + string + "|" + number + "|null|true|false)"; - String item = string + ws + ":" + ws + value + ws; - String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws; - MetaDataPattern = Pattern.compile(object, Pattern.DOTALL); - } catch (Exception e) { - } - } - /** - * Get the NodeConfigurationManager - */ - public void init() { - config = NodeConfigManager.getInstance(); - logger.info("NODE0101 Node Servlet Configured"); - } - private boolean down(HttpServletResponse resp) throws IOException { - if (config.isShutdown() || !config.isConfigured()) { - resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); - logger.info("NODE0102 Rejecting request: Service is being quiesced"); - return(true); - } - return(false); - } - /** - * Handle a GET for /internal/fetchProv - */ - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - NodeUtils.setIpAndFqdnForEelf("doGet"); - eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); - if (down(resp)) { - return; - } - String path = req.getPathInfo(); - String qs = req.getQueryString(); - String ip = req.getRemoteAddr(); - if (qs != null) { - path = path + "?" + qs; - } - if ("/internal/fetchProv".equals(path)) { - config.gofetch(ip); - resp.setStatus(HttpServletResponse.SC_NO_CONTENT); - return; - } else if (path.startsWith("/internal/resetSubscription/")) { - String subid = path.substring(28); - if (subid.length() != 0 && subid.indexOf('/') == -1) { - NodeMain.resetQueue(subid, ip); - resp.setStatus(HttpServletResponse.SC_NO_CONTENT); - return; - } - } - if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) { - if (path.startsWith("/internal/logs/")) { - String f = path.substring(15); - File fn = new File(config.getLogDir() + "/" + f); - if (f.indexOf('/') != -1 || !fn.isFile()) { - logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); - resp.sendError(HttpServletResponse.SC_NOT_FOUND); - return; - } - byte[] buf = new byte[65536]; - resp.setContentType("text/plain"); - resp.setContentLength((int)fn.length()); - resp.setStatus(200); - InputStream is = new FileInputStream(fn); - OutputStream os = resp.getOutputStream(); - int i; - while ((i = is.read(buf)) > 0) { - os.write(buf, 0, i); - } - is.close(); - return; - } - if (path.startsWith("/internal/rtt/")) { - String xip = path.substring(14); - long st = System.currentTimeMillis(); - String status = " unknown"; - try { - Socket s = new Socket(xip, 443); - s.close(); - status = " connected"; - } catch (Exception e) { - status = " error " + e.toString(); - } - long dur = System.currentTimeMillis() - st; - resp.setContentType("text/plain"); - resp.setStatus(200); - byte[] buf = (dur + status + "\n").getBytes(); - resp.setContentLength(buf.length); - resp.getOutputStream().write(buf); - return; - } - } - logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); - resp.sendError(HttpServletResponse.SC_NOT_FOUND); - return; - } - /** - * Handle all PUT requests - */ - protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - NodeUtils.setIpAndFqdnForEelf("doPut"); - eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); - common(req, resp, true); - } - /** - * Handle all DELETE requests - */ - protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - NodeUtils.setIpAndFqdnForEelf("doDelete"); - eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); - common(req, resp, false); - } - private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException { - if (down(resp)) { - return; - } - if (!req.isSecure()) { - logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); - return; - } - String fileid = req.getPathInfo(); - if (fileid == null) { - logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); - return; - } - String feedid = null; - String user = null; - String credentials = req.getHeader("Authorization"); - if (credentials == null) { - logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required"); - return; - } - String ip = req.getRemoteAddr(); - String lip = req.getLocalAddr(); - String pubid = null; - String xpubid = null; - String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; - Target[] targets = null; - if (fileid.startsWith("/publish/")) { - fileid = fileid.substring(9); - int i = fileid.indexOf('/'); - if (i == -1 || i == fileid.length() - 1) { - logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /. Possible missing fileid."); - return; - } - feedid = fileid.substring(0, i); - fileid = fileid.substring(i + 1); - pubid = config.getPublishId(); - xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID"); - targets = config.getTargets(feedid); - } else if (fileid.startsWith("/internal/publish/")) { - if (!config.isAnotherNode(credentials, ip)) { - logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip); - resp.sendError(HttpServletResponse.SC_FORBIDDEN); - return; - } - fileid = fileid.substring(18); - pubid = req.getHeader("X-ATT-DR-PUBLISH-ID"); - targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING")); - } else { - logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); - return; - } - if (fileid.indexOf('/') != -1) { - logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); - return; - } - String qs = req.getQueryString(); - if (qs != null) { - fileid = fileid + "?" + qs; - } - String hp = config.getMyName(); - int xp = config.getExtHttpsPort(); - if (xp != 443) { - hp = hp + ":" + xp; - } - String logurl = "https://" + hp + "/internal/publish/" + fileid; - if (feedid != null) { - logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid; - String reason = config.isPublishPermitted(feedid, credentials, ip); - if (reason != null) { - logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason); - resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason); - return; - } - user = config.getAuthUser(feedid, credentials); - String newnode = config.getIngressNode(feedid, user, ip); - if (newnode != null) { - String port = ""; - int iport = config.getExtHttpsPort(); - if (iport != 443) { - port = ":" + iport; - } - String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid; - logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto); - resp.sendRedirect(redirto); - return; - } - resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid); - } - String fbase = config.getSpoolDir() + "/" + pubid; - File data = new File(fbase); - File meta = new File(fbase + ".M"); - OutputStream dos = null; - Writer mw = null; - InputStream is = null; - try { - StringBuffer mx = new StringBuffer(); - mx.append(req.getMethod()).append('\t').append(fileid).append('\n'); - Enumeration hnames = req.getHeaderNames(); - String ctype = null; - while (hnames.hasMoreElements()) { - String hn = (String)hnames.nextElement(); - String hnlc = hn.toLowerCase(); - if ((isput && ("content-type".equals(hnlc) || - "content-language".equals(hnlc) || - "content-md5".equals(hnlc) || - "content-range".equals(hnlc))) || - "x-att-dr-meta".equals(hnlc) || - (feedid == null && "x-att-dr-received".equals(hnlc)) || - (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) { - Enumeration hvals = req.getHeaders(hn); - while (hvals.hasMoreElements()) { - String hv = (String)hvals.nextElement(); - if ("content-type".equals(hnlc)) { - ctype = hv; - } - if ("x-att-dr-meta".equals(hnlc)) { - if (hv.length() > 4096) { - logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip); - resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long"); - return; - } - if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) { - logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip); - resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata"); - return; - } - } - mx.append(hn).append('\t').append(hv).append('\n'); - } - } - } - mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n'); - String metadata = mx.toString(); - byte[] buf = new byte[1024 * 1024]; - int i; - try { - is = req.getInputStream(); - dos = new FileOutputStream(data); - while ((i = is.read(buf)) > 0) { - dos.write(buf, 0, i); - } - is.close(); - is = null; - dos.close(); - dos = null; - } catch (IOException ioe) { - long exlen = -1; - try { - exlen = Long.parseLong(req.getHeader("Content-Length")); - } catch (Exception e) { - } - StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage()); - throw ioe; - } - Path dpath = Paths.get(fbase); - for (Target t: targets) { - DestInfo di = t.getDestInfo(); - if (di == null) { - // TODO: unknown destination - continue; - } - String dbase = di.getSpool() + "/" + pubid; - Files.createLink(Paths.get(dbase), dpath); - mw = new FileWriter(meta); - mw.write(metadata); - if (di.getSubId() == null) { - mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n"); - } - mw.close(); - meta.renameTo(new File(dbase + ".M")); - } - resp.setStatus(HttpServletResponse.SC_NO_CONTENT); - resp.getOutputStream().close(); - StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT); - } catch (IOException ioe) { - logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe); - throw ioe; - } finally { - if (is != null) { try { is.close(); } catch (Exception e) {}} - if (dos != null) { try { dos.close(); } catch (Exception e) {}} - if (mw != null) { try { mw.close(); } catch (Exception e) {}} - try { data.delete(); } catch (Exception e) {} - try { meta.delete(); } catch (Exception e) {} - } - } - - private int getIdFromPath(HttpServletRequest req) { - String path = req.getPathInfo(); - if (path == null || path.length() < 2) - return -1; - try { - return Integer.parseInt(path.substring(1)); - } catch (NumberFormatException e) { - return -1; - } - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java deleted file mode 100644 index 5471c0d2..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java +++ /dev/null @@ -1,226 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN; -import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS; -import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; - -import java.security.*; -import java.io.*; -import java.util.*; -import java.security.cert.*; -import java.net.*; -import java.text.*; -import org.apache.commons.codec.binary.Base64; -import org.apache.log4j.Logger; -import org.slf4j.MDC; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.research.datarouter.node.eelf.EelfMsgs; - -/** - * Utility functions for the data router node - */ -public class NodeUtils { - private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeUtils"); - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeUtils"); - private static SimpleDateFormat logdate; - static { - logdate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); - logdate.setTimeZone(TimeZone.getTimeZone("GMT")); - } - private NodeUtils() {} - /** - * Base64 encode a byte array - * @param raw The bytes to be encoded - * @return The encoded string - */ - public static String base64Encode(byte[] raw) { - return(Base64.encodeBase64String(raw)); - } - /** - * Given a user and password, generate the credentials - * @param user User name - * @param password User password - * @return Authorization header value - */ - public static String getAuthHdr(String user, String password) { - if (user == null || password == null) { - return(null); - } - return("Basic " + base64Encode((user + ":" + password).getBytes())); - } - /** - * Given a node name, generate the credentials - * @param node Node name - */ - public static String getNodeAuthHdr(String node, String key) { - try { - MessageDigest md = MessageDigest.getInstance("SHA"); - md.update(key.getBytes()); - md.update(node.getBytes()); - md.update(key.getBytes()); - return(getAuthHdr(node, base64Encode(md.digest()))); - } catch (Exception e) { - return(null); - } - } - /** - * Given a keystore file and its password, return the value of the CN of the first private key entry with a certificate. - * @param kstype The type of keystore - * @param ksfile The file name of the keystore - * @param kspass The password of the keystore - * @return CN of the certificate subject or null - */ - public static String getCanonicalName(String kstype, String ksfile, String kspass) { - try { - KeyStore ks = KeyStore.getInstance(kstype); - ks.load(new FileInputStream(ksfile), kspass.toCharArray()); - return(getCanonicalName(ks)); - } catch (Exception e) { - setIpAndFqdnForEelf("getCanonicalName"); - eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, ksfile, e.toString()); - logger.error("NODE0401 Error loading my keystore file + " + ksfile + " " + e.toString(), e); - return(null); - } - } - /** - * Given a keystore, return the value of the CN of the first private key entry with a certificate. - * @param ks The KeyStore - * @return CN of the certificate subject or null - */ - public static String getCanonicalName(KeyStore ks) { - try { - Enumeration aliases = ks.aliases(); - while (aliases.hasMoreElements()) { - String s = aliases.nextElement(); - if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) { - X509Certificate c = (X509Certificate)ks.getCertificate(s); - if (c != null) { - String subject = c.getSubjectX500Principal().getName(); - String[] parts = subject.split(","); - if (parts.length < 1) { - return(null); - } - subject = parts[0].trim(); - if (!subject.startsWith("CN=")) { - return(null); - - } - return(subject.substring(3)); - } - } - } - } catch (Exception e) { - logger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e); - } - return(null); - } - /** - * Given a string representation of an IP address, get the corresponding byte array - * @param ip The IP address as a string - * @return The IP address as a byte array or null if the address is invalid - */ - public static byte[] getInetAddress(String ip) { - try { - return(InetAddress.getByName(ip).getAddress()); - } catch (Exception e) { - } - return(null); - } - /** - * Given a uri with parameters, split out the feed ID and file ID - */ - public static String[] getFeedAndFileID(String uriandparams) { - int end = uriandparams.length(); - int i = uriandparams.indexOf('#'); - if (i != -1 && i < end) { - end = i; - } - i = uriandparams.indexOf('?'); - if (i != -1 && i < end) { - end = i; - } - end = uriandparams.lastIndexOf('/', end); - if (end < 2) { - return(null); - } - i = uriandparams.lastIndexOf('/', end - 1); - if (i == -1) { - return(null); - } - return(new String[] { uriandparams.substring(i + 1, end - 1), uriandparams.substring(end + 1) }); - } - /** - * Escape fields that might contain vertical bar, backslash, or newline by replacing them with backslash p, backslash e and backslash n. - */ - public static String loge(String s) { - if (s == null) { - return(s); - } - return(s.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n")); - } - /** - * Undo what loge does. - */ - public static String unloge(String s) { - if (s == null) { - return(s); - } - return(s.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\")); - } - /** - * Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ - */ - public static String logts(long when) { - return(logts(new Date(when))); - } - /** - * Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ - */ - public static synchronized String logts(Date when) { - return(logdate.format(when)); - } - - /* Method prints method name, server FQDN and IP Address of the machine in EELF logs - * @Method - setIpAndFqdnForEelf - Rally:US664892 - * @Params - method, prints method name in EELF log. - */ - public static void setIpAndFqdnForEelf(String method) { - MDC.clear(); - MDC.put(MDC_SERVICE_NAME, method); - try { - MDC.put(MDC_SERVER_FQDN, InetAddress.getLocalHost().getHostName()); - MDC.put(MDC_SERVER_IP_ADDRESS, InetAddress.getLocalHost().getHostAddress()); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java deleted file mode 100644 index 7ff91839..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java +++ /dev/null @@ -1,132 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.util.*; - -/** - * Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to get from this node to any other node - */ - -public class PathFinder { - private static class Hop { - public boolean mark; - public boolean bad; - public NodeConfig.ProvHop basis; - } - private Vector errors = new Vector(); - private Hashtable routes = new Hashtable(); - /** - * Get list of errors encountered while finding paths - * @return array of error descriptions - */ - public String[] getErrors() { - return(errors.toArray(new String[errors.size()])); - } - /** - * Get the route from this node to the specified node - * @param destination node - * @return list of node names separated by and ending with "/" - */ - public String getPath(String destination) { - String ret = routes.get(destination); - if (ret == null) { - return(""); - } - return(ret); - } - private String plot(String from, String to, Hashtable info) { - Hop nh = info.get(from); - if (nh == null || nh.bad) { - return(to); - } - if (nh.mark) { - // loop detected; - while (!nh.bad) { - nh.bad = true; - errors.add(nh.basis + " is part of a cycle"); - nh = info.get(nh.basis.getVia()); - } - return(to); - } - nh.mark = true; - String x = plot(nh.basis.getVia(), to, info); - nh.mark = false; - if (nh.bad) { - return(to); - } - return(nh.basis.getVia() + "/" + x); - } - /** - * Find routes from a specified origin to all of the nodes given a set of specified next hops. - * @param origin where we start - * @param nodes where we can go - * @param hops detours along the way - */ - public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) { - HashSet known = new HashSet(); - Hashtable> ht = new Hashtable>(); - for (String n: nodes) { - known.add(n); - ht.put(n, new Hashtable()); - } - for (NodeConfig.ProvHop ph: hops) { - if (!known.contains(ph.getFrom())) { - errors.add(ph + " references unknown from node"); - continue; - } - if (!known.contains(ph.getTo())) { - errors.add(ph + " references unknown destination node"); - continue; - } - Hashtable ht2 = ht.get(ph.getTo()); - Hop h = ht2.get(ph.getFrom()); - if (h != null) { - h.bad = true; - errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia()); - continue; - } - h = new Hop(); - h.basis = ph; - ht2.put(ph.getFrom(), h); - if (!known.contains(ph.getVia())) { - errors.add(ph + " references unknown via node"); - h.bad = true; - continue; - } - if (ph.getVia().equals(ph.getTo())) { - errors.add(ph + " gives destination as via"); - h.bad = true; - continue; - } - } - for (String n: known) { - if (n.equals(origin)) { - routes.put(n, ""); - } - routes.put(n, plot(origin, n, ht.get(n)) + "/"); - } - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java deleted file mode 100644 index 19cb8993..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java +++ /dev/null @@ -1,302 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.io.*; -import java.util.*; -import org.json.*; -import org.apache.log4j.Logger; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.research.datarouter.node.eelf.EelfMsgs; - -/** - * Parser for provisioning data from the provisioning server. - *

- * The ProvData class uses a Reader for the text configuration from the - * provisioning server to construct arrays of raw configuration entries. - */ -public class ProvData { - private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.ProvData"); - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.ProvData"); - private NodeConfig.ProvNode[] pn; - private NodeConfig.ProvParam[] pp; - private NodeConfig.ProvFeed[] pf; - private NodeConfig.ProvFeedUser[] pfu; - private NodeConfig.ProvFeedSubnet[] pfsn; - private NodeConfig.ProvSubscription[] ps; - private NodeConfig.ProvForceIngress[] pfi; - private NodeConfig.ProvForceEgress[] pfe; - private NodeConfig.ProvHop[] ph; - private static String[] gvasa(JSONArray a, int index) { - return(gvasa(a.get(index))); - } - private static String[] gvasa(JSONObject o, String key) { - return(gvasa(o.opt(key))); - } - private static String[] gvasa(Object o) { - if (o instanceof JSONArray) { - JSONArray a = (JSONArray)o; - Vector v = new Vector(); - for (int i = 0; i < a.length(); i++) { - String s = gvas(a, i); - if (s != null) { - v.add(s); - } - } - return(v.toArray(new String[v.size()])); - } else { - String s = gvas(o); - if (s == null) { - return(new String[0]); - } else { - return(new String[] { s }); - } - } - } - private static String gvas(JSONArray a, int index) { - return(gvas(a.get(index))); - } - private static String gvas(JSONObject o, String key) { - return(gvas(o.opt(key))); - } - private static String gvas(Object o) { - if (o instanceof Boolean || o instanceof Number || o instanceof String) { - return(o.toString()); - } - return(null); - } - /** - * Construct raw provisioing data entries from the text (JSON) - * provisioning document received from the provisioning server - * @param r The reader for the JSON text. - */ - public ProvData(Reader r) throws IOException { - Vector pnv = new Vector(); - Vector ppv = new Vector(); - Vector pfv = new Vector(); - Vector pfuv = new Vector(); - Vector pfsnv = new Vector(); - Vector psv = new Vector(); - Vector pfiv = new Vector(); - Vector pfev = new Vector(); - Vector phv = new Vector(); - try { - JSONTokener jtx = new JSONTokener(r); - JSONObject jcfg = new JSONObject(jtx); - char c = jtx.nextClean(); - if (c != '\0') { - throw new JSONException("Spurious characters following configuration"); - } - r.close(); - JSONArray jfeeds = jcfg.optJSONArray("feeds"); - if (jfeeds != null) { - for (int fx = 0; fx < jfeeds.length(); fx++) { - JSONObject jfeed = jfeeds.getJSONObject(fx); - String stat = null; - if (jfeed.optBoolean("suspend", false)) { - stat = "Feed is suspended"; - } - if (jfeed.optBoolean("deleted", false)) { - stat = "Feed is deleted"; - } - String fid = gvas(jfeed, "feedid"); - String fname = gvas(jfeed, "name"); - String fver = gvas(jfeed, "version"); - pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat)); - JSONObject jauth = jfeed.optJSONObject("authorization"); - if (jauth == null) { - continue; - } - JSONArray jeids = jauth.optJSONArray("endpoint_ids"); - if (jeids != null) { - for (int ux = 0; ux < jeids.length(); ux++) { - JSONObject ju = jeids.getJSONObject(ux); - String login = gvas(ju, "id"); - String password = gvas(ju, "password"); - pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password))); - } - } - JSONArray jeips = jauth.optJSONArray("endpoint_addrs"); - if (jeips != null) { - for (int ix = 0; ix < jeips.length(); ix++) { - String sn = gvas(jeips, ix); - pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn)); - } - } - } - } - JSONArray jsubs = jcfg.optJSONArray("subscriptions"); - if (jsubs != null) { - for (int sx = 0; sx < jsubs.length(); sx++) { - JSONObject jsub = jsubs.getJSONObject(sx); - if (jsub.optBoolean("suspend", false)) { - continue; - } - String sid = gvas(jsub, "subid"); - String fid = gvas(jsub, "feedid"); - JSONObject jdel = jsub.getJSONObject("delivery"); - String delurl = gvas(jdel, "url"); - String id = gvas(jdel, "user"); - String password = gvas(jdel, "password"); - boolean monly = jsub.getBoolean("metadataOnly"); - boolean use100 = jdel.getBoolean("use100"); - psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100)); - } - } - JSONObject jparams = jcfg.optJSONObject("parameters"); - if (jparams != null) { - for (String pname: JSONObject.getNames(jparams)) { - String pvalue = gvas(jparams, pname); - if (pvalue != null) { - ppv.add(new NodeConfig.ProvParam(pname, pvalue)); - } - } - String sfx = gvas(jparams, "PROV_DOMAIN"); - JSONArray jnodes = jparams.optJSONArray("NODES"); - if (jnodes != null) { - for (int nx = 0; nx < jnodes.length(); nx++) { - String nn = gvas(jnodes, nx); - if (nn.indexOf('.') == -1) { - nn = nn + "." + sfx; - } - pnv.add(new NodeConfig.ProvNode(nn)); - } - } - } - JSONArray jingresses = jcfg.optJSONArray("ingress"); - if (jingresses != null) { - for (int fx = 0; fx < jingresses.length(); fx++) { - JSONObject jingress = jingresses.getJSONObject(fx); - String fid = gvas(jingress, "feedid"); - String subnet = gvas(jingress, "subnet"); - String user = gvas(jingress, "user"); - String[] nodes = gvasa(jingress, "node"); - if (fid == null || "".equals(fid)) { - continue; - } - if ("".equals(subnet)) { - subnet = null; - } - if ("".equals(user)) { - user = null; - } - pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes)); - } - } - JSONObject jegresses = jcfg.optJSONObject("egress"); - if (jegresses != null && JSONObject.getNames(jegresses) != null) { - for (String esid: JSONObject.getNames(jegresses)) { - String enode = gvas(jegresses, esid); - if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) { - pfev.add(new NodeConfig.ProvForceEgress(esid, enode)); - } - } - } - JSONArray jhops = jcfg.optJSONArray("routing"); - if (jhops != null) { - for (int fx = 0; fx < jhops.length(); fx++) { - JSONObject jhop = jhops.getJSONObject(fx); - String from = gvas(jhop, "from"); - String to = gvas(jhop, "to"); - String via = gvas(jhop, "via"); - if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) { - continue; - } - phv.add(new NodeConfig.ProvHop(from, to, via)); - } - } - } catch (JSONException jse) { - NodeUtils.setIpAndFqdnForEelf("ProvData"); - eelflogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString()); - logger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse); - throw new IOException(jse.toString(), jse); - } - pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]); - pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]); - pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]); - pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]); - pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]); - ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]); - pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]); - pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]); - ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]); - } - /** - * Get the raw node configuration entries - */ - public NodeConfig.ProvNode[] getNodes() { - return(pn); - } - /** - * Get the raw parameter configuration entries - */ - public NodeConfig.ProvParam[] getParams() { - return(pp); - } - /** - * Ge the raw feed configuration entries - */ - public NodeConfig.ProvFeed[] getFeeds() { - return(pf); - } - /** - * Get the raw feed user configuration entries - */ - public NodeConfig.ProvFeedUser[] getFeedUsers() { - return(pfu); - } - /** - * Get the raw feed subnet configuration entries - */ - public NodeConfig.ProvFeedSubnet[] getFeedSubnets() { - return(pfsn); - } - /** - * Get the raw subscription entries - */ - public NodeConfig.ProvSubscription[] getSubscriptions() { - return(ps); - } - /** - * Get the raw forced ingress entries - */ - public NodeConfig.ProvForceIngress[] getForceIngress() { - return(pfi); - } - /** - * Get the raw forced egress entries - */ - public NodeConfig.ProvForceEgress[] getForceEgress() { - return(pfe); - } - /** - * Get the raw next hop entries - */ - public NodeConfig.ProvHop[] getHops() { - return(ph); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java deleted file mode 100644 index 436adbad..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -/** - * Generate publish IDs - */ -public class PublishId { - private long nextuid; - private String myname; - - /** - * Generate publish IDs for the specified name - * @param myname Unique identifier for this publish ID generator (usually fqdn of server) - */ - public PublishId(String myname) { - this.myname = myname; - } - /** - * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes. - */ - public synchronized String next() { - long now = System.currentTimeMillis(); - if (now < nextuid) { - now = nextuid; - } - nextuid = now + 1; - return(now + "." + myname); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java deleted file mode 100644 index 5bcbed83..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java +++ /dev/null @@ -1,102 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.util.*; - -/** - * Execute an operation no more frequently than a specified interval - */ - -public abstract class RateLimitedOperation implements Runnable { - private boolean marked; // a timer task exists - private boolean executing; // the operation is currently in progress - private boolean remark; // a request was made while the operation was in progress - private Timer timer; - private long last; // when the last operation started - private long mininterval; - /** - * Create a rate limited operation - * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin - * @param timer The timer used to perform deferred executions - */ - public RateLimitedOperation(long mininterval, Timer timer) { - this.timer = timer; - this.mininterval = mininterval; - } - private class deferred extends TimerTask { - public void run() { - execute(); - } - } - private synchronized void unmark() { - marked = false; - } - private void execute() { - unmark(); - request(); - } - /** - * Request that the operation be performed by this thread or at a later time by the timer - */ - public void request() { - if (premark()) { - return; - } - do { - run(); - } while (demark()); - } - private synchronized boolean premark() { - if (executing) { - // currently executing - wait until it finishes - remark = true; - return(true); - } - if (marked) { - // timer currently running - will run when it expires - return(true); - } - long now = System.currentTimeMillis(); - if (last + mininterval > now) { - // too soon - schedule a timer - marked = true; - timer.schedule(new deferred(), last + mininterval - now); - return(true); - } - last = now; - executing = true; - // start execution - return(false); - } - private synchronized boolean demark() { - executing = false; - if (remark) { - remark = false; - return(!premark()); - } - return(false); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java deleted file mode 100644 index 09473c14..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java +++ /dev/null @@ -1,118 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.util.*; -import java.io.*; - -/** - * Track redirections of subscriptions - */ -public class RedirManager { - private Hashtable sid2primary = new Hashtable(); - private Hashtable sid2secondary = new Hashtable(); - private String redirfile; - RateLimitedOperation op; - /** - * Create a mechanism for maintaining subscription redirections. - * @param redirfile The file to store the redirection information. - * @param mininterval The minimum number of milliseconds between writes to the redirection information file. - * @param timer The timer thread used to run delayed file writes. - */ - public RedirManager(String redirfile, long mininterval, Timer timer) { - this.redirfile = redirfile; - op = new RateLimitedOperation(mininterval, timer) { - public void run() { - try { - StringBuffer sb = new StringBuffer(); - for (String s: sid2primary.keySet()) { - sb.append(s).append(' ').append(sid2primary.get(s)).append(' ').append(sid2secondary.get(s)).append('\n'); - } - OutputStream os = new FileOutputStream(RedirManager.this.redirfile); - os.write(sb.toString().getBytes()); - os.close(); - } catch (Exception e) { - } - } - }; - try { - String s; - BufferedReader br = new BufferedReader(new FileReader(redirfile)); - while ((s = br.readLine()) != null) { - s = s.trim(); - String[] sx = s.split(" "); - if (s.startsWith("#") || sx.length != 3) { - continue; - } - sid2primary.put(sx[0], sx[1]); - sid2secondary.put(sx[0], sx[2]); - } - br.close(); - } catch (Exception e) { - // missing file is normal - } - } - /** - * Set up redirection. If a request is to be sent to subscription ID sid, and that is configured to go to URL primary, instead, go to secondary. - * @param sid The subscription ID to be redirected - * @param primary The URL associated with that subscription ID - * @param secondary The replacement URL to use instead - */ - public synchronized void redirect(String sid, String primary, String secondary) { - sid2primary.put(sid, primary); - sid2secondary.put(sid, secondary); - op.request(); - } - /** - * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its primary URL. - * @param sid The subscription ID to remove from the table. - */ - public synchronized void forget(String sid) { - sid2primary.remove(sid); - sid2secondary.remove(sid); - op.request(); - } - /** - * Look up where to send a subscription. If the primary has changed or there is no redirection, use the primary. Otherwise, redirect to the secondary URL. - * @param sid The subscription ID to look up. - * @param primary The configured primary URL. - * @return The destination URL to really use. - */ - public synchronized String lookup(String sid, String primary) { - String oprim = sid2primary.get(sid); - if (primary.equals(oprim)) { - return(sid2secondary.get(sid)); - } else if (oprim != null) { - forget(sid); - } - return(primary); - } - /** - * Is a subscription redirected? - */ - public synchronized boolean isRedirected(String sid) { - return(sid != null && sid2secondary.get(sid) != null); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java deleted file mode 100644 index 66aa4add..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java +++ /dev/null @@ -1,229 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - -package com.att.research.datarouter.node; - -import java.util.regex.*; -import java.util.*; -import java.io.*; -import java.nio.file.*; -import java.text.*; - -/** - * Logging for data router delivery events (PUB/DEL/EXP) - */ -public class StatusLog { - private static StatusLog instance = new StatusLog(); - private HashSet toship = new HashSet(); - private SimpleDateFormat filedate; - private String prefix = "logs/events"; - private String suffix = ".log"; - private String plainfile; - private String curfile; - private long nexttime; - private OutputStream os; - private long intvl; - private NodeConfigManager config = NodeConfigManager.getInstance(); - { - try { filedate = new SimpleDateFormat("-yyyyMMddHHmm"); } catch (Exception e) {} - } - /** - * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours. If no units are specified, assume seconds. - */ - public static long parseInterval(String interval, int def) { - try { - Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval); - if (m.matches()) { - int dur = 0; - String x = m.group(1); - if (x != null) { - dur += 3600 * Integer.parseInt(x); - } - x = m.group(2); - if (x != null) { - dur += 60 * Integer.parseInt(x); - } - x = m.group(3); - if (x != null) { - dur += Integer.parseInt(x); - } - if (dur < 60) { - dur = 60; - } - int best = 86400; - int dist = best - dur; - if (dur > best) { - dist = dur - best; - } - int base = 1; - for (int i = 0; i < 8; i++) { - int base2 = base; - base *= 2; - for (int j = 0; j < 4; j++) { - int base3 = base2; - base2 *= 3; - for (int k = 0; k < 3; k++) { - int cur = base3; - base3 *= 5; - int ndist = cur - dur; - if (dur > cur) { - ndist = dur - cur; - } - if (ndist < dist) { - best = cur; - dist = ndist; - } - } - } - } - def = best * 1000; - } - } catch (Exception e) { - } - return(def); - } - private synchronized void checkRoll(long now) throws IOException { - if (now >= nexttime) { - if (os != null) { - os.close(); - os = null; - } - intvl = parseInterval(config.getEventLogInterval(), 300000); - prefix = config.getEventLogPrefix(); - suffix = config.getEventLogSuffix(); - nexttime = now - now % intvl + intvl; - curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix; - plainfile = prefix + suffix; - notify(); - } - } - /** - * Get the name of the current log file - * @return The full path name of the current event log file - */ - public static synchronized String getCurLogFile() { - try { - instance.checkRoll(System.currentTimeMillis()); - } catch (Exception e) { - } - return(instance.curfile); - } - private synchronized void log(String s) { - try { - long now = System.currentTimeMillis(); - checkRoll(now); - if (os == null) { - os = new FileOutputStream(curfile, true); - (new File(plainfile)).delete(); - Files.createLink(Paths.get(plainfile), Paths.get(curfile)); - } - os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes()); - os.flush(); - } catch (IOException ioe) { - } - } - /** - * Log a received publication attempt. - * @param pubid The publish ID assigned by the node - * @param feedid The feed id given by the publisher - * @param requrl The URL of the received request - * @param method The method (DELETE or PUT) in the received request - * @param ctype The content type (if method is PUT and clen > 0) - * @param clen The content length (if method is PUT) - * @param srcip The IP address of the publisher - * @param user The identity of the publisher - * @param status The status returned to the publisher - */ - public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) { - instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status); - } - /** - * Log a data transfer error receiving a publication attempt - * @param pubid The publish ID assigned by the node - * @param feedid The feed id given by the publisher - * @param requrl The URL of the received request - * @param method The method (DELETE or PUT) in the received request - * @param ctype The content type (if method is PUT and clen > 0) - * @param clen The expected content length (if method is PUT) - * @param rcvd The content length received - * @param srcip The IP address of the publisher - * @param user The identity of the publisher - * @param error The error message from the IO exception - */ - public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) { - instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error); - } - /** - * Log a delivery attempt. - * @param pubid The publish ID assigned by the node - * @param feedid The feed ID - * @param subid The (space delimited list of) subscription ID - * @param requrl The URL used in the attempt - * @param method The method (DELETE or PUT) in the attempt - * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) - * @param clen The content length (if PUT and not metaonly) - * @param user The identity given to the subscriber - * @param status The status returned by the subscriber or -1 if an exeception occured trying to connect - * @param xpubid The publish ID returned by the subscriber - */ - public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) { - if (feedid == null) { - return; - } - instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid); - } - /** - * Log delivery attempts expired - * @param pubid The publish ID assigned by the node - * @param feedid The feed ID - * @param subid The (space delimited list of) subscription ID - * @param requrl The URL that would be delivered to - * @param method The method (DELETE or PUT) in the request - * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) - * @param clen The content length (if PUT and not metaonly) - * @param reason The reason the attempts were discontinued - * @param attempts The number of attempts made - */ - public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) { - if (feedid == null) { - return; - } - instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts); - } - /** - * Log extra statistics about unsuccessful delivery attempts. - * @param pubid The publish ID assigned by the node - * @param feedid The feed ID - * @param subid The (space delimited list of) subscription ID - * @param clen The content length - * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred. - */ - public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) { - if (feedid == null) { - return; - } - instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent); - } - private StatusLog() { - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java deleted file mode 100644 index c1cfeaad..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java +++ /dev/null @@ -1,71 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.net.*; - -/** - * Compare IP addresses as byte arrays to a subnet specified as a CIDR - */ -public class SubnetMatcher { - private byte[] sn; - private int len; - private int mask; - /** - * Construct a subnet matcher given a CIDR - * @param subnet The CIDR to match - */ - public SubnetMatcher(String subnet) { - int i = subnet.lastIndexOf('/'); - if (i == -1) { - sn = NodeUtils.getInetAddress(subnet); - len = sn.length; - } else { - len = Integer.parseInt(subnet.substring(i + 1)); - sn = NodeUtils.getInetAddress(subnet.substring(0, i)); - mask = ((0xff00) >> (len % 8)) & 0xff; - len /= 8; - } - } - /** - * Is the IP address in the CIDR? - * @param addr the IP address as bytes in network byte order - * @return true if the IP address matches. - */ - public boolean matches(byte[] addr) { - if (addr.length != sn.length) { - return(false); - } - for (int i = 0; i < len; i++) { - if (addr[i] != sn[i]) { - return(false); - } - } - if (mask != 0 && ((addr[len] ^ sn[len]) & mask) != 0) { - return(false); - } - return(true); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java deleted file mode 100644 index fe595d50..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java +++ /dev/null @@ -1,60 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -/** - * A destination to deliver a message - */ -public class Target { - private DestInfo destinfo; - private String routing; - /** - * A destination to deliver a message - * @param destinfo Either info for a subscription ID or info for a node-to-node transfer - * @param routing For a node-to-node transfer, what to do when it gets there. - */ - public Target(DestInfo destinfo, String routing) { - this.destinfo = destinfo; - this.routing = routing; - } - /** - * Add additional routing - */ - public void addRouting(String routing) { - this.routing = this.routing + " " + routing; - } - /** - * Get the destination information for this target - */ - public DestInfo getDestInfo() { - return(destinfo); - } - /** - * Get the next hop information for this target - */ - public String getRouting() { - return(routing); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java deleted file mode 100644 index 401c72a6..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java +++ /dev/null @@ -1,113 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package com.att.research.datarouter.node; - -import java.util.*; - -/** - * Manage a list of tasks to be executed when an event occurs. - * This makes the following guarantees: - *

- */ -public class TaskList { - private Iterator runlist; - private HashSet tasks = new HashSet(); - private HashSet togo; - private HashSet sofar; - private HashSet added; - private HashSet removed; - /** - * Construct a new TaskList - */ - public TaskList() { - } - /** - * Start executing the sequence of tasks. - */ - public synchronized void startRun() { - sofar = new HashSet(); - added = new HashSet(); - removed = new HashSet(); - togo = new HashSet(tasks); - runlist = togo.iterator(); - } - /** - * Get the next task to execute - */ - public synchronized Runnable next() { - while (runlist != null) { - if (runlist.hasNext()) { - Runnable task = runlist.next(); - if (removed.contains(task)) { - continue; - } - if (sofar.contains(task)) { - continue; - } - sofar.add(task); - return(task); - } - if (added.size() != 0) { - togo = added; - added = new HashSet(); - removed.clear(); - runlist = togo.iterator(); - continue; - } - togo = null; - added = null; - removed = null; - sofar = null; - runlist = null; - } - return(null); - } - /** - * Add a task to the list of tasks to run whenever the event occurs. - */ - public synchronized void addTask(Runnable task) { - if (runlist != null) { - added.add(task); - removed.remove(task); - } - tasks.add(task); - } - /** - * Remove a task from the list of tasks to run whenever the event occurs. - */ - public synchronized void removeTask(Runnable task) { - if (runlist != null) { - removed.add(task); - added.remove(task); - } - tasks.remove(task); - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java deleted file mode 100644 index 9b006585..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java +++ /dev/null @@ -1,43 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ -package com.att.research.datarouter.node.eelf; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.filter.Filter; -import ch.qos.logback.core.spi.FilterReply; - -/* - * When EELF functionality added it default started logging Jetty logs as well which in turn stopped existing functionality of logging jetty statements in node.log - * added code in logback.xml to add jetty statements in node.log. - * This class removes extran EELF statements from node.log since they are being logged in apicalls.log - */ -public class EELFFilter extends Filter{ - @Override - public FilterReply decide(ILoggingEvent event) { - if (event.getMessage().contains("EELF")) { - return FilterReply.DENY; - } else { - return FilterReply.ACCEPT; - } - } -} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java deleted file mode 100644 index 9963f413..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java +++ /dev/null @@ -1,96 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ -package com.att.research.datarouter.node.eelf; - -import com.att.eelf.i18n.EELFResolvableErrorEnum; -import com.att.eelf.i18n.EELFResourceManager; - -public enum EelfMsgs implements EELFResolvableErrorEnum { - - /** - * Application message prints user (accepts one argument) - */ - MESSAGE_WITH_BEHALF, - - /** - * Application message prints user and FeedID (accepts two arguments) - */ - - MESSAGE_WITH_BEHALF_AND_FEEDID, - - /** - * Application message prints keystore file error in EELF errors log - */ - - MESSAGE_KEYSTORE_LOAD_ERROR, - - /** - * Application message prints Error extracting my name from my keystore file - */ - - MESSAGE_KEYSORE_NAME_ERROR, - - /** - * Application message prints Error parsing configuration data from provisioning server. - */ - - - MESSAGE_PARSING_ERROR, - - /** - * Application message printsConfiguration failed - */ - - - MESSAGE_CONF_FAILED, - - /** - * Application message prints Bad provisioning server URL - */ - - - MESSAGE_BAD_PROV_URL, - - /** - * Application message prints Unable to fetch canonical name from keystore file - */ - - - MESSAGE_KEYSTORE_FETCH_ERROR, - - /** - * Application message prints Unable to load local configuration file. - */ - - - MESSAGE_PROPERTIES_LOAD_ERROR; - - - /** - * Static initializer to ensure the resource bundles for this class are loaded... - * Here this application loads messages from three bundles - */ - static { - EELFResourceManager.loadMessageBundle("EelfMessages"); - } -} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java new file mode 100644 index 00000000..4494024d --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java @@ -0,0 +1,253 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + +package org.onap.dmaap.datarouter.node; + +import java.util.*; +import java.io.*; +import org.apache.log4j.Logger; + +/** + * Main control point for delivering files to destinations. + *

+ * The Delivery class manages assignment of delivery threads to delivery + * queues and creation and destruction of delivery queues as + * configuration changes. DeliveryQueues are assigned threads based on a + * modified round-robin approach giving priority to queues with more work + * as measured by both bytes to deliver and files to deliver and lower + * priority to queues that already have delivery threads working. + * A delivery thread continues to work for a delivery queue as long as + * that queue has more files to deliver. + */ +public class Delivery { + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.Delivery"); + private static class DelItem implements Comparable { + private String pubid; + private String spool; + public int compareTo(DelItem x) { + int i = pubid.compareTo(x.pubid); + if (i == 0) { + i = spool.compareTo(x.spool); + } + return(i); + } + public String getPublishId() { + return(pubid); + } + public String getSpool() { + return(spool); + } + public DelItem(String pubid, String spool) { + this.pubid = pubid; + this.spool = spool; + } + } + private double fdstart; + private double fdstop; + private int threads; + private int curthreads; + private NodeConfigManager config; + private Hashtable dqs = new Hashtable(); + private DeliveryQueue[] queues = new DeliveryQueue[0]; + private int qpos = 0; + private long nextcheck; + private Runnable cmon = new Runnable() { + public void run() { + checkconfig(); + } + }; + /** + * Constructs a new Delivery system using the specified configuration manager. + * @param config The configuration manager for this delivery system. + */ + public Delivery(NodeConfigManager config) { + this.config = config; + config.registerConfigTask(cmon); + checkconfig(); + } + private void cleardir(String dir) { + if (dqs.get(dir) != null) { + return; + } + File fdir = new File(dir); + for (File junk: fdir.listFiles()) { + if (junk.isFile()) { + junk.delete(); + } + } + fdir.delete(); + } + private void freeDiskCheck() { + File spoolfile = new File(config.getSpoolBase()); + long tspace = spoolfile.getTotalSpace(); + long start = (long)(tspace * fdstart); + long stop = (long)(tspace * fdstop); + long cur = spoolfile.getUsableSpace(); + if (cur >= start) { + return; + } + Vector cv = new Vector(); + for (String sdir: dqs.keySet()) { + for (String meta: (new File(sdir)).list()) { + if (!meta.endsWith(".M") || meta.charAt(0) == '.') { + continue; + } + cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir)); + } + } + DelItem[] items = cv.toArray(new DelItem[cv.size()]); + Arrays.sort(items); + logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace); + for (DelItem item: items) { + long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); + logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk"); + if (amount > 0) { + cur += amount; + if (cur >= stop) { + cur = spoolfile.getUsableSpace(); + } + if (cur >= stop) { + logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); + return; + } + } + } + cur = spoolfile.getUsableSpace(); + if (cur >= stop) { + logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); + return; + } + logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace); + } + private void cleardirs() { + String basedir = config.getSpoolBase(); + String nbase = basedir + "/n"; + for (String nodedir: (new File(nbase)).list()) { + if (!nodedir.startsWith(".")) { + cleardir(nbase + "/" + nodedir); + } + } + String sxbase = basedir + "/s"; + for (String sxdir: (new File(sxbase)).list()) { + if (sxdir.startsWith(".")) { + continue; + } + File sxf = new File(sxbase + "/" + sxdir); + for (String sdir: sxf.list()) { + if (!sdir.startsWith(".")) { + cleardir(sxbase + "/" + sxdir + "/" + sdir); + } + } + sxf.delete(); // won't if anything still in it + } + } + private synchronized void checkconfig() { + if (!config.isConfigured()) { + return; + } + fdstart = config.getFreeDiskStart(); + fdstop = config.getFreeDiskStop(); + threads = config.getDeliveryThreads(); + if (threads < 1) { + threads = 1; + } + DestInfo[] alldis = config.getAllDests(); + DeliveryQueue[] nqs = new DeliveryQueue[alldis.length]; + qpos = 0; + Hashtable ndqs = new Hashtable(); + for (DestInfo di: alldis) { + String spl = di.getSpool(); + DeliveryQueue dq = dqs.get(spl); + if (dq == null) { + dq = new DeliveryQueue(config, di); + } else { + dq.config(di); + } + ndqs.put(spl, dq); + nqs[qpos++] = dq; + } + queues = nqs; + dqs = ndqs; + cleardirs(); + while (curthreads < threads) { + curthreads++; + (new Thread() { + { + setName("Delivery Thread"); + } + public void run() { + dodelivery(); + } + }).start(); + } + nextcheck = 0; + notify(); + } + private void dodelivery() { + DeliveryQueue dq; + while ((dq = getNextQueue()) != null) { + dq.run(); + } + } + private synchronized DeliveryQueue getNextQueue() { + while (true) { + if (curthreads > threads) { + curthreads--; + return(null); + } + if (qpos < queues.length) { + DeliveryQueue dq = queues[qpos++]; + if (dq.isSkipSet()) { + continue; + } + nextcheck = 0; + notify(); + return(dq); + } + long now = System.currentTimeMillis(); + if (now < nextcheck) { + try { + wait(nextcheck + 500 - now); + } catch (Exception e) { + } + now = System.currentTimeMillis(); + } + if (now >= nextcheck) { + nextcheck = now + 5000; + qpos = 0; + freeDiskCheck(); + } + } + } + /** + * Reset the retry timer for a delivery queue + */ + public synchronized void resetQueue(String spool) { + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + dq.resetQueue(); + } + } + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java new file mode 100644 index 00000000..b2596a4d --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -0,0 +1,348 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.io.*; +import java.util.*; + +/** + * Mechanism for monitoring and controlling delivery of files to a destination. + *

+ * The DeliveryQueue class maintains lists of DeliveryTasks for a single + * destination (a subscription or another data router node) and assigns + * delivery threads to try to deliver them. It also maintains a delivery + * status that causes it to back off on delivery attempts after a failure. + *

+ * If the most recent delivery result was a failure, then no more attempts + * will be made for a period of time. Initially, and on the first failure + * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds). + * If, after this delay, additional failures occur, each failure will + * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a + * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer(). + * Note that this behavior applies to the delivery queue as a whole and not + * to individual files in the queue. If multiple files are being + * delivered and one fails, the delay will be started. If a second + * delivery fails while the delay was active, it will not change the delay + * or change the duration of any subsequent delay. + * If, however, it succeeds, it will cancel the delay. + *

+ * The queue maintains 3 collections of files to deliver: A todo list of + * files that will be attempted, a working set of files that are being + * attempted, and a retry set of files that were attempted and failed. + * Whenever the todo list is empty and needs to be refilled, a scan of the + * spool directory is made and the file names sorted. Any files in the working set are ignored. + * If a DeliveryTask for the file is in the retry set, then that delivery + * task is placed on the todo list. Otherwise, a new DeliveryTask for the + * file is created and placed on the todo list. + * If, when a DeliveryTask is about to be removed from the todo list, its + * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead + * marked as expired. + *

+ * A delivery queue also maintains a skip flag. This flag is true if the + * failure timer is active or if no files are found in a directory scan. + */ +public class DeliveryQueue implements Runnable, DeliveryTaskHelper { + private DeliveryQueueHelper dqh; + private DestInfo di; + private Hashtable working = new Hashtable(); + private Hashtable retry = new Hashtable(); + private int todoindex; + private boolean failed; + private long failduration; + private long resumetime; + File dir; + private Vector todo = new Vector(); + /** + * Try to cancel a delivery task. + * @return The length of the task in bytes or 0 if the task cannot be cancelled. + */ + public synchronized long cancelTask(String pubid) { + if (working.get(pubid) != null) { + return(0); + } + DeliveryTask dt = retry.get(pubid); + if (dt == null) { + for (int i = todoindex; i < todo.size(); i++) { + DeliveryTask xdt = todo.get(i); + if (xdt.getPublishId().equals(pubid)) { + dt = xdt; + break; + } + } + } + if (dt == null) { + dt = new DeliveryTask(this, pubid); + if (dt.getFileId() == null) { + return(0); + } + } + if (dt.isCleaned()) { + return(0); + } + StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts()); + dt.clean(); + return(dt.getLength()); + } + /** + * Mark that a delivery task has succeeded. + */ + public synchronized void markSuccess(DeliveryTask task) { + working.remove(task.getPublishId()); + task.clean(); + failed = false; + failduration = 0; + } + /** + * Mark that a delivery task has expired. + */ + public synchronized void markExpired(DeliveryTask task) { + task.clean(); + } + /** + * Mark that a delivery task has failed permanently. + */ + public synchronized void markFailNoRetry(DeliveryTask task) { + working.remove(task.getPublishId()); + task.clean(); + failed = false; + failduration = 0; + } + private void fdupdate() { + if (!failed) { + failed = true; + if (failduration == 0) { + failduration = dqh.getInitFailureTimer(); + } + resumetime = System.currentTimeMillis() + failduration; + long maxdur = dqh.getMaxFailureTimer(); + failduration = (long)(failduration * dqh.getFailureBackoff()); + if (failduration > maxdur) { + failduration = maxdur; + } + } + } + /** + * Mark that a delivery task has been redirected. + */ + public synchronized void markRedirect(DeliveryTask task) { + working.remove(task.getPublishId()); + retry.put(task.getPublishId(), task); + } + /** + * Mark that a delivery task has temporarily failed. + */ + public synchronized void markFailWithRetry(DeliveryTask task) { + working.remove(task.getPublishId()); + retry.put(task.getPublishId(), task); + fdupdate(); + } + /** + * Get the next task. + */ + public synchronized DeliveryTask getNext() { + DeliveryTask ret = peekNext(); + if (ret != null) { + todoindex++; + working.put(ret.getPublishId(), ret); + } + return(ret); + } + /** + * Peek at the next task. + */ + public synchronized DeliveryTask peekNext() { + long now = System.currentTimeMillis(); + long mindate = now - dqh.getExpirationTimer(); + if (failed) { + if (now > resumetime) { + failed = false; + } else { + return(null); + } + } + while (true) { + if (todoindex >= todo.size()) { + todoindex = 0; + todo = new Vector(); + String[] files = dir.list(); + Arrays.sort(files); + for (String fname: files) { + if (!fname.endsWith(".M")) { + continue; + } + String fname2 = fname.substring(0, fname.length() - 2); + long pidtime = 0; + int dot = fname2.indexOf('.'); + if (dot < 1) { + continue; + } + try { + pidtime = Long.parseLong(fname2.substring(0, dot)); + } catch (Exception e) { + } + if (pidtime < 1000000000000L) { + continue; + } + if (working.get(fname2) != null) { + continue; + } + DeliveryTask dt = retry.get(fname2); + if (dt == null) { + dt = new DeliveryTask(this, fname2); + } + todo.add(dt); + } + retry = new Hashtable(); + } + if (todoindex < todo.size()) { + DeliveryTask dt = todo.get(todoindex); + if (dt.isCleaned()) { + todoindex++; + continue; + } + if (dt.getDate() >= mindate) { + return(dt); + } + todoindex++; + reportExpiry(dt); + continue; + } + return(null); + } + } + /** + * Create a delivery queue for a given destination info + */ + public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) { + this.dqh = dqh; + this.di = di; + dir = new File(di.getSpool()); + dir.mkdirs(); + } + /** + * Update the destination info for this delivery queue + */ + public void config(DestInfo di) { + this.di = di; + } + /** + * Get the dest info + */ + public DestInfo getDestInfo() { + return(di); + } + /** + * Get the config manager + */ + public DeliveryQueueHelper getConfig() { + return(dqh); + } + /** + * Exceptional condition occurred during delivery + */ + public void reportDeliveryExtra(DeliveryTask task, long sent) { + StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent); + } + /** + * Message too old to deliver + */ + public void reportExpiry(DeliveryTask task) { + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); + markExpired(task); + } + /** + * Completed a delivery attempt + */ + public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { + if (status < 300) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid); + markSuccess(task); + } else if (status < 400 && dqh.isFollowRedirects()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + if (dqh.handleRedirection(di, location, task.getFileId())) { + markRedirect(task); + } else { + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); + markFailNoRetry(task); + } + } else if (status < 500) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); + markFailNoRetry(task); + } else { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + markFailWithRetry(task); + } + } + /** + * Delivery failed by reason of an exception + */ + public void reportException(DeliveryTask task, Exception exception) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString()); + dqh.handleUnreachable(di); + markFailWithRetry(task); + } + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + return(dqh.getFeedId(subid)); + } + /** + * Get the URL to deliver a message to given the file ID + */ + public String getDestURL(String fileid) { + return(dqh.getDestURL(di, fileid)); + } + /** + * Deliver files until there's a failure or there are no more + * files to deliver + */ + public void run() { + DeliveryTask t; + long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit(); + int filestogo = dqh.getFairFileLimit(); + while ((t = getNext()) != null) { + t.run(); + if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { + break; + } + } + } + /** + * Is there no work to do for this queue right now? + */ + public synchronized boolean isSkipSet() { + return(peekNext() == null); + } + /** + * Reset the retry timer + */ + public void resetQueue() { + resumetime = System.currentTimeMillis(); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java new file mode 100644 index 00000000..172678bd --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java @@ -0,0 +1,89 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +/** + * Interface to allow independent testing of the DeliveryQueue code + *

+ * This interface represents all of the configuration information and + * feedback mechanisms that a delivery queue needs. + */ +public interface DeliveryQueueHelper { + /** + * Get the timeout (milliseconds) before retrying after an initial delivery failure + */ + public long getInitFailureTimer(); + /** + * Get the ratio between timeouts on consecutive delivery attempts + */ + public double getFailureBackoff(); + /** + * Get the maximum timeout (milliseconds) between delivery attempts + */ + public long getMaxFailureTimer(); + /** + * Get the expiration timer (milliseconds) for deliveries + */ + public long getExpirationTimer(); + /** + * Get the maximum number of file delivery attempts before checking + * if another queue has work to be performed. + */ + public int getFairFileLimit(); + /** + * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed. + */ + public long getFairTimeLimit(); + /** + * Get the URL for delivering a file + * @param dest The destination information for the file to be delivered. + * @param fileid The file id for the file to be delivered. + * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid). + */ + public String getDestURL(DestInfo dest, String fileid); + /** + * Forget redirections associated with a subscriber + * @param dest Destination information to forget + */ + public void handleUnreachable(DestInfo dest); + /** + * Post redirection for a subscriber + * @param dest Destination information to update + * @param location Location given by subscriber + * @param fileid File ID of request + * @return true if this 3xx response is retryable, otherwise, false. + */ + public boolean handleRedirection(DestInfo dest, String location, String fileid); + /** + * Should I handle 3xx responses differently than 4xx responses? + */ + public boolean isFollowRedirects(); + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid); +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java new file mode 100644 index 00000000..c07822d2 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -0,0 +1,308 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.io.*; +import java.net.*; +import java.util.*; +import org.apache.log4j.Logger; + +/** + * A file to be delivered to a destination. + *

+ * A Delivery task represents a work item for the data router - a file that + * needs to be delivered and provides mechanisms to get information about + * the file and its delivery data as well as to attempt delivery. + */ +public class DeliveryTask implements Runnable, Comparable { + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); + private DeliveryTaskHelper dth; + private String pubid; + private DestInfo di; + private String spool; + private File datafile; + private File metafile; + private long length; + private long date; + private String method; + private String fileid; + private String ctype; + private String url; + private String feedid; + private String subid; + private int attempts; + private String[][] hdrs; + /** + * Is the object a DeliveryTask with the same publication ID? + */ + public boolean equals(Object o) { + if (!(o instanceof DeliveryTask)) { + return(false); + } + return(pubid.equals(((DeliveryTask)o).pubid)); + } + /** + * Compare the publication IDs. + */ + public int compareTo(DeliveryTask o) { + return(pubid.compareTo(o.pubid)); + } + /** + * Get the hash code of the publication ID. + */ + public int hashCode() { + return(pubid.hashCode()); + } + /** + * Return the publication ID. + */ + public String toString() { + return(pubid); + } + /** + * Create a delivery task for a given delivery queue and pub ID + * @param dth The delivery task helper for the queue this task is in. + * @param pubid The publish ID for this file. This is used as + * the base for the file name in the spool directory and is of + * the form . + */ + public DeliveryTask(DeliveryTaskHelper dth, String pubid) { + this.dth = dth; + this.pubid = pubid; + di = dth.getDestInfo(); + subid = di.getSubId(); + feedid = di.getLogData(); + spool = di.getSpool(); + String dfn = spool + "/" + pubid; + String mfn = dfn + ".M"; + datafile = new File(spool + "/" + pubid); + metafile = new File(mfn); + boolean monly = di.isMetaDataOnly(); + date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); + Vector hdrv = new Vector(); + try { + BufferedReader br = new BufferedReader(new FileReader(metafile)); + String s = br.readLine(); + int i = s.indexOf('\t'); + method = s.substring(0, i); + if (!"DELETE".equals(method) && !monly) { + length = datafile.length(); + } + fileid = s.substring(i + 1); + while ((s = br.readLine()) != null) { + i = s.indexOf('\t'); + String h = s.substring(0, i); + String v = s.substring(i + 1); + if ("x-att-dr-routing".equalsIgnoreCase(h)) { + subid = v.replaceAll("[^ ]*/", ""); + feedid = dth.getFeedId(subid.replaceAll(" .*", "")); + } + if (length == 0 && h.toLowerCase().startsWith("content-")) { + continue; + } + if (h.equalsIgnoreCase("content-type")) { + ctype = v; + } + hdrv.add(new String[] {h, v}); + } + br.close(); + } catch (Exception e) { + } + hdrs = hdrv.toArray(new String[hdrv.size()][]); + url = dth.getDestURL(fileid); + } + /** + * Get the publish ID + */ + public String getPublishId() { + return(pubid); + } + /** + * Attempt delivery + */ + public void run() { + attempts++; + try { + di = dth.getDestInfo(); + boolean expect100 = di.isUsing100(); + boolean monly = di.isMetaDataOnly(); + length = 0; + if (!"DELETE".equals(method) && !monly) { + length = datafile.length(); + } + url = dth.getDestURL(fileid); + URL u = new URL(url); + HttpURLConnection uc = (HttpURLConnection)u.openConnection(); + uc.setConnectTimeout(60000); + uc.setReadTimeout(60000); + uc.setInstanceFollowRedirects(false); + uc.setRequestMethod(method); + uc.setRequestProperty("Content-Length", Long.toString(length)); + uc.setRequestProperty("Authorization", di.getAuth()); + uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid); + for (String[] nv: hdrs) { + uc.addRequestProperty(nv[0], nv[1]); + } + if (length > 0) { + if (expect100) { + uc.setRequestProperty("Expect", "100-continue"); + } + uc.setFixedLengthStreamingMode(length); + uc.setDoOutput(true); + OutputStream os = null; + try { + os = uc.getOutputStream(); + } catch (ProtocolException pe) { + dth.reportDeliveryExtra(this, -1L); + // Rcvd error instead of 100-continue + } + if (os != null) { + long sofar = 0; + try { + byte[] buf = new byte[1024 * 1024]; + InputStream is = new FileInputStream(datafile); + while (sofar < length) { + int i = buf.length; + if (sofar + i > length) { + i = (int)(length - sofar); + } + i = is.read(buf, 0, i); + if (i <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); + } + sofar += i; + os.write(buf, 0, i); + } + is.close(); + os.close(); + } catch (IOException ioe) { + dth.reportDeliveryExtra(this, sofar); + throw ioe; + } + } + } + int rc = uc.getResponseCode(); + String rmsg = uc.getResponseMessage(); + if (rmsg == null) { + String h0 = uc.getHeaderField(0); + if (h0 != null) { + int i = h0.indexOf(' '); + int j = h0.indexOf(' ', i + 1); + if (i != -1 && j != -1) { + rmsg = h0.substring(j + 1); + } + } + } + String xpubid = null; + InputStream is; + if (rc >= 200 && rc <= 299) { + is = uc.getInputStream(); + xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID"); + } else { + if (rc >= 300 && rc <= 399) { + rmsg = uc.getHeaderField("Location"); + } + is = uc.getErrorStream(); + } + byte[] buf = new byte[4096]; + if (is != null) { + while (is.read(buf) > 0) { + } + is.close(); + } + dth.reportStatus(this, rc, xpubid, rmsg); + } catch (Exception e) { + dth.reportException(this, e); + } + } + /** + * Remove meta and data files + */ + public void clean() { + datafile.delete(); + metafile.delete(); + hdrs = null; + } + /** + * Has this delivery task been cleaned? + */ + public boolean isCleaned() { + return(hdrs == null); + } + /** + * Get length of body + */ + public long getLength() { + return(length); + } + /** + * Get creation date as encoded in the publish ID. + */ + public long getDate() { + return(date); + } + /** + * Get the most recent delivery attempt URL + */ + public String getURL() { + return(url); + } + /** + * Get the content type + */ + public String getCType() { + return(ctype); + } + /** + * Get the method + */ + public String getMethod() { + return(method); + } + /** + * Get the file ID + */ + public String getFileId() { + return(fileid); + } + /** + * Get the number of delivery attempts + */ + public int getAttempts() { + return(attempts); + } + /** + * Get the (space delimited list of) subscription ID for this delivery task + */ + public String getSubId() { + return(subid); + } + /** + * Get the feed ID for this delivery task + */ + public String getFeedId() { + return(feedid); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java new file mode 100644 index 00000000..c9f1ef84 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java @@ -0,0 +1,72 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +/** + * Interface to allow independent testing of the DeliveryTask code. + *

+ * This interface represents all the configuraiton information and + * feedback mechanisms that a delivery task needs. + */ + +public interface DeliveryTaskHelper { + /** + * Report that a delivery attempt failed due to an exception (like can't connect to remote host) + * @param task The task that failed + * @param exception The exception that occurred + */ + public void reportException(DeliveryTask task, Exception exception); + /** + * Report that a delivery attempt completed (successfully or unsuccessfully) + * @param task The task that failed + * @param status The HTTP status + * @param xpubid The publish ID from the far end (if any) + * @param location The redirection location for a 3XX response + */ + public void reportStatus(DeliveryTask task, int status, String xpubid, String location); + /** + * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue. + * @param task The task that failed + * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue. + */ + public void reportDeliveryExtra(DeliveryTask task, long sent); + /** + * Get the destination information for the delivery queue + * @return The destination information + */ + public DestInfo getDestInfo(); + /** + * Given a file ID, get the URL to deliver to + * @param fileid The file id + * @return The URL to deliver to + */ + public String getDestURL(String fileid); + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed iD + */ + public String getFeedId(String subid); +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java new file mode 100644 index 00000000..2b54f70a --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +/** + * Information for a delivery destination that doesn't change from message to message + */ +public class DestInfo { + private String name; + private String spool; + private String subid; + private String logdata; + private String url; + private String authuser; + private String authentication; + private boolean metaonly; + private boolean use100; + /** + * Create a destination information object. + * @param name n:fqdn or s:subid + * @param spool The directory where files are spooled. + * @param subid The subscription ID (if applicable). + * @param logdata Text to be included in log messages + * @param url The URL to deliver to. + * @param authuser The auth user for logging. + * @param authentication The credentials. + * @param metaonly Is this a metadata only delivery? + * @param use100 Should I use expect 100-continue? + */ + public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) { + this.name = name; + this.spool = spool; + this.subid = subid; + this.logdata = logdata; + this.url = url; + this.authuser = authuser; + this.authentication = authentication; + this.metaonly = metaonly; + this.use100 = use100; + } + public boolean equals(Object o) { + return((o instanceof DestInfo) && ((DestInfo)o).spool.equals(spool)); + } + public int hashCode() { + return(spool.hashCode()); + } + /** + * Get the name of this destination + */ + public String getName() { + return(name); + } + /** + * Get the spool directory for this destination. + * @return The spool directory + */ + public String getSpool() { + return(spool); + } + /** + * Get the subscription ID. + * @return Subscription ID or null if this is a node to node delivery. + */ + public String getSubId() { + return(subid); + } + /** + * Get the log data. + * @return Text to be included in a log message about delivery attempts. + */ + public String getLogData() { + return(logdata); + } + /** + * Get the delivery URL. + * @return The URL to deliver to (the primary URL). + */ + public String getURL() { + return(url); + + } + /** + * Get the user for authentication + * @return The name of the user for logging + */ + public String getAuthUser() { + return(authuser); + } + /** + * Get the authentication header + * @return The string to use to authenticate to the recipient. + */ + public String getAuth() { + return(authentication); + } + /** + * Is this a metadata only delivery? + * @return True if this is a metadata only delivery + */ + public boolean isMetaDataOnly() { + return(metaonly); + } + /** + * Should I send expect 100-continue header? + * @return True if I should. + */ + public boolean isUsing100() { + return(use100); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java new file mode 100644 index 00000000..36dbc235 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java @@ -0,0 +1,82 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.util.*; +import java.net.*; + +/** + * Determine if an IP address is from a machine + */ +public class IsFrom { + private long nextcheck; + private String[] ips; + private String fqdn; + /** + * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have any effect. + */ + public static void setDNSCache() { + java.security.Security.setProperty("networkaddress.cache.ttl", "10"); + } + /** + * Create an IsFrom for the specified fully qualified domain name. + */ + public IsFrom(String fqdn) { + this.fqdn = fqdn; + } + /** + * Check if an IP address matches. If it has been more than + * 10 seconds since DNS was last checked for changes to the + * IP address(es) of this FQDN, check again. Then check + * if the specified IP address belongs to the FQDN. + */ + public synchronized boolean isFrom(String ip) { + long now = System.currentTimeMillis(); + if (now > nextcheck) { + nextcheck = now + 10000; + Vector v = new Vector(); + try { + InetAddress[] addrs = InetAddress.getAllByName(fqdn); + for (InetAddress a: addrs) { + v.add(a.getHostAddress()); + } + } catch (Exception e) { + } + ips = v.toArray(new String[v.size()]); + } + for (String s: ips) { + if (s.equals(ip)) { + return(true); + } + } + return(false); + } + /** + * Return the fully qualified domain name + */ + public String toString() { + return(fqdn); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java new file mode 100644 index 00000000..0e89f417 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java @@ -0,0 +1,159 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ +package org.onap.dmaap.datarouter.node; + +import java.util.*; +import java.util.regex.*; +import java.io.*; +import java.nio.file.*; +import java.text.*; + +/** + * Cleanup of old log files. + *

+ * Periodically scan the log directory for log files that are older than + * the log file retention interval, and delete them. In a future release, + * This class will also be responsible for uploading events logs to the + * log server to support the log query APIs. + */ + +public class LogManager extends TimerTask { + private NodeConfigManager config; + private Matcher isnodelog; + private Matcher iseventlog; + private Uploader worker; + private String uploaddir; + private String logdir; + private class Uploader extends Thread implements DeliveryQueueHelper { + public long getInitFailureTimer() { return(10000L); } + public double getFailureBackoff() { return(2.0); } + public long getMaxFailureTimer() { return(150000L); } + public long getExpirationTimer() { return(604800000L); } + public int getFairFileLimit() { return(10000); } + public long getFairTimeLimit() { return(86400000); } + public String getDestURL(DestInfo dest, String fileid) { + return(config.getEventLogUrl()); + } + public void handleUnreachable(DestInfo dest) {} + public boolean handleRedirection(DestInfo dest, String location, String fileid) { return(false); } + public boolean isFollowRedirects() { return(false); } + public String getFeedId(String subid) { return(null); } + private DeliveryQueue dq; + public Uploader() { + dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, false)); + setDaemon(true); + setName("Log Uploader"); + start(); + } + private synchronized void snooze() { + try { + wait(10000); + } catch (Exception e) { + } + } + private synchronized void poke() { + notify(); + } + public void run() { + while (true) { + scan(); + dq.run(); + snooze(); + } + } + private void scan() { + long threshold = System.currentTimeMillis() - config.getLogRetention(); + File dir = new File(logdir); + String[] fns = dir.list(); + Arrays.sort(fns); + String lastqueued = "events-000000000000.log"; + String curlog = StatusLog.getCurLogFile(); + curlog = curlog.substring(curlog.lastIndexOf('/') + 1); + try { + Writer w = new FileWriter(uploaddir + "/.meta"); + w.write("POST\tlogdata\nContent-Type\ttext/plain\n"); + w.close(); + BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued")); + lastqueued = br.readLine(); + br.close(); + } catch (Exception e) { + } + for (String fn: fns) { + if (!isnodelog.reset(fn).matches()) { + if (!iseventlog.reset(fn).matches()) { + continue; + } + if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) { + lastqueued = fn; + try { + String pid = config.getPublishId(); + Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn)); + Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta")); + } catch (Exception e) { + } + } + } + File f = new File(dir, fn); + if (f.lastModified() < threshold) { + f.delete(); + } + } + try { + (new File(uploaddir + "/.meta")).delete(); + Writer w = new FileWriter(uploaddir + "/.lastqueued"); + w.write(lastqueued + "\n"); + w.close(); + } catch (Exception e) { + } + } + } + /** + * Construct a log manager + *

+ * The log manager will check for expired log files every 5 minutes + * at 20 seconds after the 5 minute boundary. (Actually, the + * interval is the event log rollover interval, which + * defaults to 5 minutes). + */ + public LogManager(NodeConfigManager config) { + this.config = config; + try { + isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher(""); + iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher(""); + } catch (Exception e) {} + logdir = config.getLogDir(); + uploaddir = logdir + "/.spool"; + (new File(uploaddir)).mkdirs(); + long now = System.currentTimeMillis(); + long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 300000); + long when = now - now % intvl + intvl + 20000L; + config.getTimer().scheduleAtFixedRate(this, when - now, intvl); + worker = new Uploader(); + } + /** + * Trigger check for expired log files and log files to upload + */ + public void run() { + worker.poke(); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java new file mode 100644 index 00000000..c196d46c --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java @@ -0,0 +1,722 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.util.*; +import java.io.*; + +/** + * Processed configuration for this node. + *

+ * The NodeConfig represents a processed configuration from the Data Router + * provisioning server. Each time configuration data is received from the + * provisioning server, a new NodeConfig is created and the previous one + * discarded. + */ +public class NodeConfig { + /** + * Raw configuration entry for a data router node + */ + public static class ProvNode { + private String cname; + /** + * Construct a node configuration entry. + * @param cname The cname of the node. + */ + public ProvNode(String cname) { + this.cname = cname; + } + /** + * Get the cname of the node + */ + public String getCName() { + return(cname); + } + } + /** + * Raw configuration entry for a provisioning parameter + */ + public static class ProvParam { + private String name; + private String value; + /** + * Construct a provisioning parameter configuration entry. + * @param name The name of the parameter. + * @param value The value of the parameter. + */ + public ProvParam(String name, String value) { + this.name = name; + this.value = value; + } + /** + * Get the name of the parameter. + */ + public String getName() { + return(name); + } + /** + * Get the value of the parameter. + */ + public String getValue() { + return(value); + } + } + /** + * Raw configuration entry for a data feed. + */ + public static class ProvFeed { + private String id; + private String logdata; + private String status; + /** + * Construct a feed configuration entry. + * @param id The feed ID of the entry. + * @param logdata String for log entries about the entry. + * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or null if it is valid. + */ + public ProvFeed(String id, String logdata, String status) { + this.id = id; + this.logdata = logdata; + this.status = status; + } + /** + * Get the feed id of the data feed. + */ + public String getId() { + return(id); + } + /** + * Get the log data of the data feed. + */ + public String getLogData() { + return(logdata); + } + /** + * Get the status of the data feed. + */ + public String getStatus() { + return(status); + } + } + /** + * Raw configuration entry for a feed user. + */ + public static class ProvFeedUser { + private String feedid; + private String user; + private String credentials; + /** + * Construct a feed user configuration entry + * @param feedid The feed id. + * @param user The user that will publish to the feed. + * @param credentials The Authorization header the user will use to publish. + */ + public ProvFeedUser(String feedid, String user, String credentials) { + this.feedid = feedid; + this.user = user; + this.credentials = credentials; + } + /** + * Get the feed id of the feed user. + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the user for the feed user. + */ + public String getUser() { + return(user); + } + /** + * Get the credentials for the feed user. + */ + public String getCredentials() { + return(credentials); + } + } + /** + * Raw configuration entry for a feed subnet + */ + public static class ProvFeedSubnet { + private String feedid; + private String cidr; + /** + * Construct a feed subnet configuration entry + * @param feedid The feed ID + * @param cidr The CIDR allowed to publish to the feed. + */ + public ProvFeedSubnet(String feedid, String cidr) { + this.feedid = feedid; + this.cidr = cidr; + } + /** + * Get the feed id of the feed subnet. + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the CIDR of the feed subnet. + */ + public String getCidr() { + return(cidr); + } + } + /** + * Raw configuration entry for a subscription + */ + public static class ProvSubscription { + private String subid; + private String feedid; + private String url; + private String authuser; + private String credentials; + private boolean metaonly; + private boolean use100; + /** + * Construct a subscription configuration entry + * @param subid The subscription ID + * @param feedid The feed ID + * @param url The base delivery URL (not including the fileid) + * @param authuser The user in the credentials used to deliver + * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the Authorization header. + * @param metaonly Is this a meta data only subscription? + * @param use100 Should we send Expect: 100-continue? + */ + public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100) { + this.subid = subid; + this.feedid = feedid; + this.url = url; + this.authuser = authuser; + this.credentials = credentials; + this.metaonly = metaonly; + this.use100 = use100; + } + /** + * Get the subscription ID + */ + public String getSubId() { + return(subid); + } + /** + * Get the feed ID + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the delivery URL + */ + public String getURL() { + return(url); + } + /** + * Get the user + */ + public String getAuthUser() { + return(authuser); + } + /** + * Get the delivery credentials + */ + public String getCredentials() { + return(credentials); + } + /** + * Is this a meta data only subscription? + */ + public boolean isMetaDataOnly() { + return(metaonly); + } + /** + * Should we send Expect: 100-continue? + */ + public boolean isUsing100() { + return(use100); + } + } + /** + * Raw configuration entry for controlled ingress to the data router node + */ + public static class ProvForceIngress { + private String feedid; + private String subnet; + private String user; + private String[] nodes; + /** + * Construct a forced ingress configuration entry + * @param feedid The feed ID that this entry applies to + * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all publisher IP addresses + * @param user The publishing user this entry applies to or "" if it applies to all publishing users. + * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to. + */ + public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) { + this.feedid = feedid; + this.subnet = subnet; + this.user = user; + this.nodes = nodes; + } + /** + * Get the feed ID + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the subnet + */ + public String getSubnet() { + return(subnet); + } + /** + * Get the user + */ + public String getUser() { + return(user); + } + /** + * Get the node + */ + public String[] getNodes() { + return(nodes); + } + } + /** + * Raw configuration entry for controlled egress from the data router + */ + public static class ProvForceEgress { + private String subid; + private String node; + /** + * Construct a forced egress configuration entry + * @param subid The subscription ID the subscription with forced egress + * @param node The node handling deliveries for this subscription + */ + public ProvForceEgress(String subid, String node) { + this.subid = subid; + this.node = node; + } + /** + * Get the subscription ID + */ + public String getSubId() { + return(subid); + } + /** + * Get the node + */ + public String getNode() { + return(node); + } + } + /** + * Raw configuration entry for routing within the data router network + */ + public static class ProvHop { + private String from; + private String to; + private String via; + /** + * A human readable description of this entry + */ + public String toString() { + return("Hop " + from + "->" + to + " via " + via); + } + /** + * Construct a hop entry + * @param from The FQDN of the node with the data to be delivered + * @param to The FQDN of the node that will deliver to the subscriber + * @param via The FQDN of the node where the from node should send the data + */ + public ProvHop(String from, String to, String via) { + this.from = from; + this.to = to; + this.via = via; + } + /** + * Get the from node + */ + public String getFrom() { + return(from); + } + /** + * Get the to node + */ + public String getTo() { + return(to); + } + /** + * Get the next intermediate node + */ + public String getVia() { + return(via); + } + } + private static class Redirection { + public SubnetMatcher snm; + public String user; + public String[] nodes; + } + private static class Feed { + public String loginfo; + public String status; + public SubnetMatcher[] subnets; + public Hashtable authusers = new Hashtable(); + public Redirection[] redirections; + public Target[] targets; + } + private Hashtable params = new Hashtable(); + private Hashtable feeds = new Hashtable(); + private Hashtable nodeinfo = new Hashtable(); + private Hashtable subinfo = new Hashtable(); + private Hashtable nodes = new Hashtable(); + private String myname; + private String myauth; + private DestInfo[] alldests; + private int rrcntr; + /** + * Process the raw provisioning data to configure this node + * @param pd The parsed provisioning data + * @param myname My name as seen by external systems + * @param spooldir The directory where temporary files live + * @param port The port number for URLs + * @param nodeauthkey The keying string used to generate node authentication credentials + */ + public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) { + this.myname = myname; + for (ProvParam p: pd.getParams()) { + params.put(p.getName(), p.getValue()); + } + Vector div = new Vector(); + myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); + for (ProvNode pn: pd.getNodes()) { + String cn = pn.getCName(); + if (nodeinfo.get(cn) != null) { + continue; + } + String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey); + DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true); + (new File(di.getSpool())).mkdirs(); + div.add(di); + nodeinfo.put(cn, di); + nodes.put(auth, new IsFrom(cn)); + } + PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[nodeinfo.size()]), pd.getHops()); + Hashtable> rdtab = new Hashtable>(); + for (ProvForceIngress pfi: pd.getForceIngress()) { + Vector v = rdtab.get(pfi.getFeedId()); + if (v == null) { + v = new Vector(); + rdtab.put(pfi.getFeedId(), v); + } + Redirection r = new Redirection(); + if (pfi.getSubnet() != null) { + r.snm = new SubnetMatcher(pfi.getSubnet()); + } + r.user = pfi.getUser(); + r.nodes = pfi.getNodes(); + v.add(r); + } + Hashtable> pfutab = new Hashtable>(); + for (ProvFeedUser pfu: pd.getFeedUsers()) { + Hashtable t = pfutab.get(pfu.getFeedId()); + if (t == null) { + t = new Hashtable(); + pfutab.put(pfu.getFeedId(), t); + } + t.put(pfu.getCredentials(), pfu.getUser()); + } + Hashtable egrtab = new Hashtable(); + for (ProvForceEgress pfe: pd.getForceEgress()) { + if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) { + continue; + } + egrtab.put(pfe.getSubId(), pfe.getNode()); + } + Hashtable> pfstab = new Hashtable>(); + for (ProvFeedSubnet pfs: pd.getFeedSubnets()) { + Vector v = pfstab.get(pfs.getFeedId()); + if (v == null) { + v = new Vector(); + pfstab.put(pfs.getFeedId(), v); + } + v.add(new SubnetMatcher(pfs.getCidr())); + } + Hashtable ttab = new Hashtable(); + HashSet allfeeds = new HashSet(); + for (ProvFeed pfx: pd.getFeeds()) { + if (pfx.getStatus() == null) { + allfeeds.add(pfx.getId()); + } + } + for (ProvSubscription ps: pd.getSubscriptions()) { + String sid = ps.getSubId(); + String fid = ps.getFeedId(); + if (!allfeeds.contains(fid)) { + continue; + } + if (subinfo.get(sid) != null) { + continue; + } + int sididx = 999; + try { + sididx = Integer.parseInt(sid); + sididx -= sididx % 100; + } catch (Exception e) { + } + String siddir = sididx + "/" + sid; + DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100()); + (new File(di.getSpool())).mkdirs(); + div.add(di); + subinfo.put(sid, di); + String egr = egrtab.get(sid); + if (egr != null) { + sid = pf.getPath(egr) + sid; + } + StringBuffer sb = ttab.get(fid); + if (sb == null) { + sb = new StringBuffer(); + ttab.put(fid, sb); + } + sb.append(' ').append(sid); + } + alldests = div.toArray(new DestInfo[div.size()]); + for (ProvFeed pfx: pd.getFeeds()) { + String fid = pfx.getId(); + Feed f = feeds.get(fid); + if (f != null) { + continue; + } + f = new Feed(); + feeds.put(fid, f); + f.loginfo = pfx.getLogData(); + f.status = pfx.getStatus(); + Vector v1 = pfstab.get(fid); + if (v1 == null) { + f.subnets = new SubnetMatcher[0]; + } else { + f.subnets = v1.toArray(new SubnetMatcher[v1.size()]); + } + Hashtable h1 = pfutab.get(fid); + if (h1 == null) { + h1 = new Hashtable(); + } + f.authusers = h1; + Vector v2 = rdtab.get(fid); + if (v2 == null) { + f.redirections = new Redirection[0]; + } else { + f.redirections = v2.toArray(new Redirection[v2.size()]); + } + StringBuffer sb = ttab.get(fid); + if (sb == null) { + f.targets = new Target[0]; + } else { + f.targets = parseRouting(sb.toString()); + } + } + } + /** + * Parse a target string into an array of targets + * @param routing Target string + * @return Array of targets. + */ + public Target[] parseRouting(String routing) { + routing = routing.trim(); + if ("".equals(routing)) { + return(new Target[0]); + } + String[] xx = routing.split("\\s+"); + Hashtable tmap = new Hashtable(); + HashSet subset = new HashSet(); + Vector tv = new Vector(); + Target[] ret = new Target[xx.length]; + for (int i = 0; i < xx.length; i++) { + String t = xx[i]; + int j = t.indexOf('/'); + if (j == -1) { + DestInfo di = subinfo.get(t); + if (di == null) { + tv.add(new Target(null, t)); + } else { + if (!subset.contains(t)) { + subset.add(t); + tv.add(new Target(di, null)); + } + } + } else { + String node = t.substring(0, j); + String rtg = t.substring(j + 1); + DestInfo di = nodeinfo.get(node); + if (di == null) { + tv.add(new Target(null, t)); + } else { + Target tt = tmap.get(node); + if (tt == null) { + tt = new Target(di, rtg); + tmap.put(node, tt); + tv.add(tt); + } else { + tt.addRouting(rtg); + } + } + } + } + return(tv.toArray(new Target[tv.size()])); + } + /** + * Check whether this is a valid node-to-node transfer + * @param credentials Credentials offered by the supposed node + * @param ip IP address the request came from + */ + public boolean isAnotherNode(String credentials, String ip) { + IsFrom n = nodes.get(credentials); + return (n != null && n.isFrom(ip)); + } + /** + * Check whether publication is allowed. + * @param feedid The ID of the feed being requested. + * @param credentials The offered credentials + * @param ip The requesting IP address + */ + public String isPublishPermitted(String feedid, String credentials, String ip) { + Feed f = feeds.get(feedid); + String nf = "Feed does not exist"; + if (f != null) { + nf = f.status; + } + if (nf != null) { + return(nf); + } + String user = f.authusers.get(credentials); + if (user == null) { + return("Publisher not permitted for this feed"); + } + if (f.subnets.length == 0) { + return(null); + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (SubnetMatcher snm: f.subnets) { + if (snm.matches(addr)) { + return(null); + } + } + return("Publisher not permitted for this feed"); + } + /** + * Get authenticated user + */ + public String getAuthUser(String feedid, String credentials) { + return(feeds.get(feedid).authusers.get(credentials)); + } + /** + * Check if the request should be redirected to a different ingress node + */ + public String getIngressNode(String feedid, String user, String ip) { + Feed f = feeds.get(feedid); + if (f.redirections.length == 0) { + return(null); + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (Redirection r: f.redirections) { + if (r.user != null && !user.equals(r.user)) { + continue; + } + if (r.snm != null && !r.snm.matches(addr)) { + continue; + } + for (String n: r.nodes) { + if (myname.equals(n)) { + return(null); + } + } + if (r.nodes.length == 0) { + return(null); + } + return(r.nodes[rrcntr++ % r.nodes.length]); + } + return(null); + } + /** + * Get a provisioned configuration parameter + */ + public String getProvParam(String name) { + return(params.get(name)); + } + /** + * Get all the DestInfos + */ + public DestInfo[] getAllDests() { + return(alldests); + } + /** + * Get the targets for a feed + * @param feedid The feed ID + * @return The targets this feed should be delivered to + */ + public Target[] getTargets(String feedid) { + if (feedid == null) { + return(new Target[0]); + } + Feed f = feeds.get(feedid); + if (f == null) { + return(new Target[0]); + } + return(f.targets); + } + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + DestInfo di = subinfo.get(subid); + if (di == null) { + return(null); + } + return(di.getLogData()); + } + /** + * Get the spool directory for a subscription + * @param subid The subscription ID + * @return The spool directory + */ + public String getSpoolDir(String subid) { + DestInfo di = subinfo.get(subid); + if (di == null) { + return(null); + } + return(di.getSpool()); + } + /** + * Get the Authorization value this node uses + * @return The Authorization header value for this node + */ + public String getMyAuth() { + return(myauth); + } + +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java new file mode 100644 index 00000000..b80babd6 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java @@ -0,0 +1,599 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.net.*; +import java.util.*; +import java.io.*; +import org.apache.log4j.Logger; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + + +/** + * Maintain the configuration of a Data Router node + *

+ * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention subsystems to access configuration information. (Log4J has its own configuration mechanism). + *

+ * There are two basic sets of configuration data. The + * static local configuration data, stored in a local configuration file (created + * as part of installation by SWM), and the dynamic global + * configuration data fetched from the data router provisioning server. + */ +public class NodeConfigManager implements DeliveryQueueHelper { + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager"); + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager"); + private static NodeConfigManager base = new NodeConfigManager(); + + private Timer timer = new Timer("Node Configuration Timer", true); + private long maxfailuretimer; + private long initfailuretimer; + private long expirationtimer; + private double failurebackoff; + private long fairtimelimit; + private int fairfilelimit; + private double fdpstart; + private double fdpstop; + private int deliverythreads; + private String provurl; + private String provhost; + private IsFrom provcheck; + private int gfport; + private int svcport; + private int port; + private String spooldir; + private String logdir; + private long logretention; + private String redirfile; + private String kstype; + private String ksfile; + private String kspass; + private String kpass; + private String tstype; + private String tsfile; + private String tspass; + private String myname; + private RedirManager rdmgr; + private RateLimitedOperation pfetcher; + private NodeConfig config; + private File quiesce; + private PublishId pid; + private String nak; + private TaskList configtasks = new TaskList(); + private String eventlogurl; + private String eventlogprefix; + private String eventlogsuffix; + private String eventloginterval; + private boolean followredirects; + + + /** + * Get the default node configuration manager + */ + public static NodeConfigManager getInstance() { + return(base); + } + /** + * Initialize the configuration of a Data Router node + */ + private NodeConfigManager() { + Properties p = new Properties(); + try { + p.load(new FileInputStream(System.getProperty("org.onap.dmaap.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"))); + } catch (Exception e) { + + NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR); + logger.error("NODE0301 Unable to load local configuration file " + System.getProperty("org.onap.dmaap.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"), e); + } + provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov"); + try { + provhost = (new URL(provurl)).getHost(); + } catch (Exception e) { + NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl); + logger.error("NODE0302 Bad provisioning server URL " + provurl); + System.exit(1); + } + logger.info("NODE0303 Provisioning server is " + provhost); + eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs"); + provcheck = new IsFrom(provhost); + gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080")); + svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443")); + port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443")); + long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000")); + long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000")); + spooldir = p.getProperty("SpoolDir", "spool"); + File fdir = new File(spooldir + "/f"); + fdir.mkdirs(); + for (File junk: fdir.listFiles()) { + if (junk.isFile()) { + junk.delete(); + } + } + logdir = p.getProperty("LogDir", "logs"); + (new File(logdir)).mkdirs(); + logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L; + eventlogprefix = logdir + "/events"; + eventlogsuffix = ".log"; + String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat"); + kstype = p.getProperty("KeyStoreType", "jks"); + ksfile = p.getProperty("KeyStoreFile", "etc/keystore"); + kspass = p.getProperty("KeyStorePassword", "changeme"); + kpass = p.getProperty("KeyPassword", "changeme"); + tstype = p.getProperty("TrustStoreType", "jks"); + tsfile = p.getProperty("TrustStoreFile"); + tspass = p.getProperty("TrustStorePassword", "changeme"); + if (tsfile != null && tsfile.length() > 0) { + System.setProperty("javax.net.ssl.trustStoreType", tstype); + System.setProperty("javax.net.ssl.trustStore", tsfile); + System.setProperty("javax.net.ssl.trustStorePassword", tspass); + } + nak = p.getProperty("NodeAuthKey", "Node123!"); + quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN")); + myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass); + if (myname == null) { + NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile); + logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile); + System.exit(1); + } + logger.info("NODE0304 My certificate says my name is " + myname); + pid = new PublishId(myname); + rdmgr = new RedirManager(redirfile, minrsinterval, timer); + pfetcher = new RateLimitedOperation(minpfinterval, timer) { + public void run() { + fetchconfig(); + } + }; + logger.info("NODE0305 Attempting to fetch configuration at " + provurl); + pfetcher.request(); + } + private void localconfig() { + followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); + eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m"); + initfailuretimer = 10000; + maxfailuretimer = 3600000; + expirationtimer = 86400000; + failurebackoff = 2.0; + deliverythreads = 40; + fairfilelimit = 100; + fairtimelimit = 60000; + fdpstart = 0.05; + fdpstop = 0.2; + try { initfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) {} + try { maxfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) {} + try { expirationtimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) {} + try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) {} + try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) {} + try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) {} + try { fairtimelimit = (long)(Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) {} + try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) {} + try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) {} + if (fdpstart < 0.01) { + fdpstart = 0.01; + } + if (fdpstart > 0.5) { + fdpstart = 0.5; + } + if (fdpstop < fdpstart) { + fdpstop = fdpstart; + } + if (fdpstop > 0.5) { + fdpstop = 0.5; + } + } + private void fetchconfig() { + try { + System.out.println("provurl:: "+provurl); + Reader r = new InputStreamReader((new URL(provurl)).openStream()); + config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak); + localconfig(); + configtasks.startRun(); + Runnable rr; + while ((rr = configtasks.next()) != null) { + try { + rr.run(); + } catch (Exception e) { + } + } + } catch (Exception e) { + e.printStackTrace(); + NodeUtils.setIpAndFqdnForEelf("fetchconfigs"); + eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString()); + logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e); + pfetcher.request(); + } + } + /** + * Process a gofetch request from a particular IP address. If the + * IP address is not an IP address we would go to to fetch the + * provisioning data, ignore the request. If the data has been + * fetched very recently (default 10 seconds), wait a while before fetching again. + */ + public synchronized void gofetch(String remoteaddr) { + if (provcheck.isFrom(remoteaddr)) { + logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr); + pfetcher.request(); + } else { + logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr); + } + } + /** + * Am I configured? + */ + public boolean isConfigured() { + return(config != null); + } + /** + * Am I shut down? + */ + public boolean isShutdown() { + return(quiesce.exists()); + } + /** + * Given a routing string, get the targets. + * @param routing Target string + * @return array of targets + */ + public Target[] parseRouting(String routing) { + return(config.parseRouting(routing)); + } + /** + * Given a set of credentials and an IP address, is this request from another node? + * @param credentials Credentials offered by the supposed node + * @param ip IP address the request came from + * @return If the credentials and IP address are recognized, true, otherwise false. + */ + public boolean isAnotherNode(String credentials, String ip) { + return(config.isAnotherNode(credentials, ip)); + } + /** + * Check whether publication is allowed. + * @param feedid The ID of the feed being requested + * @param credentials The offered credentials + * @param ip The requesting IP address + * @return True if the IP and credentials are valid for the specified feed. + */ + public String isPublishPermitted(String feedid, String credentials, String ip) { + return(config.isPublishPermitted(feedid, credentials, ip)); + } + /** + * Check who the user is given the feed ID and the offered credentials. + * @param feedid The ID of the feed specified + * @param credentials The offered credentials + * @return Null if the credentials are invalid or the user if they are valid. + */ + public String getAuthUser(String feedid, String credentials) { + return(config.getAuthUser(feedid, credentials)); + } + /** + * Check if the publish request should be sent to another node based on the feedid, user, and source IP address. + * @param feedid The ID of the feed specified + * @param user The publishing user + * @param ip The IP address of the publish endpoint + * @return Null if the request should be accepted or the correct hostname if it should be sent to another node. + */ + public String getIngressNode(String feedid, String user, String ip) { + return(config.getIngressNode(feedid, user, ip)); + } + /** + * Get a provisioned configuration parameter (from the provisioning server configuration) + * @param name The name of the parameter + * @return The value of the parameter or null if it is not defined. + */ + public String getProvParam(String name) { + return(config.getProvParam(name)); + } + /** + * Get a provisioned configuration parameter (from the provisioning server configuration) + * @param name The name of the parameter + * @param deflt The value to use if the parameter is not defined + * @return The value of the parameter or deflt if it is not defined. + */ + public String getProvParam(String name, String deflt) { + name = config.getProvParam(name); + if (name == null) { + name = deflt; + } + return(name); + } + /** + * Generate a publish ID + */ + public String getPublishId() { + return(pid.next()); + } + /** + * Get all the outbound spooling destinations. + * This will include both subscriptions and nodes. + */ + public DestInfo[] getAllDests() { + return(config.getAllDests()); + } + /** + * Register a task to run whenever the configuration changes + */ + public void registerConfigTask(Runnable task) { + configtasks.addTask(task); + } + /** + * Deregister a task to run whenever the configuration changes + */ + public void deregisterConfigTask(Runnable task) { + configtasks.removeTask(task); + } + /** + * Get the URL to deliver a message to. + * @param destinfo The destination information + * @param fileid The file ID + * @return The URL to deliver to + */ + public String getDestURL(DestInfo destinfo, String fileid) { + String subid = destinfo.getSubId(); + String purl = destinfo.getURL(); + if (followredirects && subid != null) { + purl = rdmgr.lookup(subid, purl); + } + return(purl + "/" + fileid); + } + /** + * Is a destination redirected? + */ + public boolean isDestRedirected(DestInfo destinfo) { + return(followredirects && rdmgr.isRedirected(destinfo.getSubId())); + } + /** + * Set up redirection on receipt of a 3XX from a target URL + */ + public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) { + fileid = "/" + fileid; + String subid = destinfo.getSubId(); + String purl = destinfo.getURL(); + if (followredirects && subid != null && redirto.endsWith(fileid)) { + redirto = redirto.substring(0, redirto.length() - fileid.length()); + if (!redirto.equals(purl)) { + rdmgr.redirect(subid, purl, redirto); + return(true); + } + } + return(false); + } + /** + * Handle unreachable target URL + */ + public void handleUnreachable(DestInfo destinfo) { + String subid = destinfo.getSubId(); + if (followredirects && subid != null) { + rdmgr.forget(subid); + } + } + /** + * Get the timeout before retrying after an initial delivery failure + */ + public long getInitFailureTimer() { + return(initfailuretimer); + } + /** + * Get the maximum timeout between delivery attempts + */ + public long getMaxFailureTimer() { + return(maxfailuretimer); + } + /** + * Get the ratio between consecutive delivery attempts + */ + public double getFailureBackoff() { + return(failurebackoff); + } + /** + * Get the expiration timer for deliveries + */ + public long getExpirationTimer() { + return(expirationtimer); + } + /** + * Get the maximum number of file delivery attempts before checking + * if another queue has work to be performed. + */ + public int getFairFileLimit() { + return(fairfilelimit); + } + /** + * Get the maximum amount of time spent delivering files before + * checking if another queue has work to be performed. + */ + public long getFairTimeLimit() { + return(fairtimelimit); + } + /** + * Get the targets for a feed + * @param feedid The feed ID + * @return The targets this feed should be delivered to + */ + public Target[] getTargets(String feedid) { + return(config.getTargets(feedid)); + } + /** + * Get the spool directory for temporary files + */ + public String getSpoolDir() { + return(spooldir + "/f"); + } + /** + * Get the base directory for spool directories + */ + public String getSpoolBase() { + return(spooldir); + } + /** + * Get the key store type + */ + public String getKSType() { + return(kstype); + } + /** + * Get the key store file + */ + public String getKSFile() { + return(ksfile); + } + /** + * Get the key store password + */ + public String getKSPass() { + return(kspass); + } + /** + * Get the key password + */ + public String getKPass() { + return(kpass); + } + /** + * Get the http port + */ + public int getHttpPort() { + return(gfport); + } + /** + * Get the https port + */ + public int getHttpsPort() { + return(svcport); + } + /** + * Get the externally visible https port + */ + public int getExtHttpsPort() { + return(port); + } + /** + * Get the external name of this machine + */ + public String getMyName() { + return(myname); + } + /** + * Get the number of threads to use for delivery + */ + public int getDeliveryThreads() { + return(deliverythreads); + } + /** + * Get the URL for uploading the event log data + */ + public String getEventLogUrl() { + return(eventlogurl); + } + /** + * Get the prefix for the names of event log files + */ + public String getEventLogPrefix() { + return(eventlogprefix); + } + /** + * Get the suffix for the names of the event log files + */ + public String getEventLogSuffix() { + return(eventlogsuffix); + } + /** + * Get the interval between event log file rollovers + */ + public String getEventLogInterval() { + return(eventloginterval); + } + /** + * Should I follow redirects from subscribers? + */ + public boolean isFollowRedirects() { + return(followredirects); + } + /** + * Get the directory where the event and node log files live + */ + public String getLogDir() { + return(logdir); + } + /** + * How long do I keep log files (in milliseconds) + */ + public long getLogRetention() { + return(logretention); + } + /** + * Get the timer + */ + public Timer getTimer() { + return(timer); + } + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + return(config.getFeedId(subid)); + } + /** + * Get the authorization string this node uses + * @return The Authorization string for this node + */ + public String getMyAuth() { + return(config.getMyAuth()); + } + /** + * Get the fraction of free spool disk space where we start throwing away undelivered files. This is FREE_DISK_RED_PERCENT / 100.0. Default is 0.05. Limited by 0.01 <= FreeDiskStart <= 0.5. + */ + public double getFreeDiskStart() { + return(fdpstart); + } + /** + * Get the fraction of free spool disk space where we stop throwing away undelivered files. This is FREE_DISK_YELLOW_PERCENT / 100.0. Default is 0.2. Limited by FreeDiskStart <= FreeDiskStop <= 0.5. + */ + public double getFreeDiskStop() { + return(fdpstop); + } + /** + * Get the spool directory for a subscription + */ + public String getSpoolDir(String subid, String remoteaddr) { + if (provcheck.isFrom(remoteaddr)) { + String sdir = config.getSpoolDir(subid); + if (sdir != null) { + logger.info("NODE0310 Received subscription reset request for subscription " + subid + " from provisioning server " + remoteaddr); + } else { + logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid + " from provisioning server " + remoteaddr); + } + return(sdir); + } else { + logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr); + return(null); + } + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java new file mode 100644 index 00000000..a34bacd0 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java @@ -0,0 +1,113 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import org.eclipse.jetty.servlet.*; +import org.eclipse.jetty.util.ssl.*; +import org.eclipse.jetty.server.*; +import org.eclipse.jetty.server.nio.*; +import org.eclipse.jetty.server.ssl.*; +import org.apache.log4j.Logger; + +/** + * The main starting point for the Data Router node + */ +public class NodeMain { + private NodeMain() {} + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeMain"); + private static class wfconfig implements Runnable { + private NodeConfigManager ncm; + public wfconfig(NodeConfigManager ncm) { + this.ncm = ncm; + } + public synchronized void run() { + notify(); + } + public synchronized void waitforconfig() { + ncm.registerConfigTask(this); + while (!ncm.isConfigured()) { + logger.info("NODE0003 Waiting for Node Configuration"); + try { + wait(); + } catch (Exception e) { + } + } + ncm.deregisterConfigTask(this); + logger.info("NODE0004 Node Configuration Data Received"); + } + } + private static Delivery d; + private static NodeConfigManager ncm; + /** + * Reset the retry timer for a subscription + */ + public static void resetQueue(String subid, String ip) { + d.resetQueue(ncm.getSpoolDir(subid, ip)); + } + /** + * Start the data router. + *

+ * The location of the node configuration file can be set using the + * org.onap.dmaap.datarouter.node.ConfigFile system property. By + * default, it is "etc/node.properties". + */ + public static void main(String[] args) throws Exception { + logger.info("NODE0001 Data Router Node Starting"); + IsFrom.setDNSCache(); + ncm = NodeConfigManager.getInstance(); + logger.info("NODE0002 I am " + ncm.getMyName()); + (new wfconfig(ncm)).waitforconfig(); + d = new Delivery(ncm); + LogManager lm = new LogManager(ncm); + Server server = new Server(); + SelectChannelConnector http = new SelectChannelConnector(); + http.setPort(ncm.getHttpPort()); + http.setMaxIdleTime(2000); + http.setRequestHeaderSize(2048); + SslSelectChannelConnector https = new SslSelectChannelConnector(); + https.setPort(ncm.getHttpsPort()); + https.setMaxIdleTime(30000); + https.setRequestHeaderSize(8192); + SslContextFactory cf = https.getSslContextFactory(); + + /**Skip SSLv3 Fixes*/ + cf.addExcludeProtocols("SSLv3"); + logger.info("Excluded protocols node-"+cf.getExcludeProtocols()); + /**End of SSLv3 Fixes*/ + + cf.setKeyStoreType(ncm.getKSType()); + cf.setKeyStorePath(ncm.getKSFile()); + cf.setKeyStorePassword(ncm.getKSPass()); + cf.setKeyManagerPassword(ncm.getKPass()); + server.setConnectors(new Connector[] { http, https }); + ServletContextHandler ctxt = new ServletContextHandler(0); + ctxt.setContextPath("/"); + server.setHandler(ctxt); + ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*"); + logger.info("NODE0005 Data Router Node Activating Service"); + server.start(); + server.join(); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java new file mode 100644 index 00000000..eae7ca06 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java @@ -0,0 +1,380 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import javax.servlet.*; +import javax.servlet.http.*; +import java.util.*; +import java.util.regex.*; +import java.io.*; +import java.nio.file.*; +import org.apache.log4j.Logger; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.net.*; + +/** + * Servlet for handling all http and https requests to the data router node + *

+ * Handled requests are: + *
+ * GET http://node/internal/fetchProv - fetch the provisioning data + *
+ * PUT/DELETE https://node/internal/publish/fileid - n2n transfer + *
+ * PUT/DELETE https://node/publish/feedid/fileid - publsh request + */ +public class NodeServlet extends HttpServlet { + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeServlet"); + private static NodeConfigManager config; + private static Pattern MetaDataPattern; + private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25"); + //Adding EELF Logger Rally:US664892 + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("org.onap.dmaap.datarouter.node.NodeServlet"); + + static { + try { + String ws = "\\s*"; + // assume that \\ and \" have been replaced by X + String string = "\"[^\"]*\""; + //String string = "\"(?:[^\"\\\\]|\\\\.)*\""; + String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?"; + String value = "(?:" + string + "|" + number + "|null|true|false)"; + String item = string + ws + ":" + ws + value + ws; + String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws; + MetaDataPattern = Pattern.compile(object, Pattern.DOTALL); + } catch (Exception e) { + } + } + /** + * Get the NodeConfigurationManager + */ + public void init() { + config = NodeConfigManager.getInstance(); + logger.info("NODE0101 Node Servlet Configured"); + } + private boolean down(HttpServletResponse resp) throws IOException { + if (config.isShutdown() || !config.isConfigured()) { + resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + logger.info("NODE0102 Rejecting request: Service is being quiesced"); + return(true); + } + return(false); + } + /** + * Handle a GET for /internal/fetchProv + */ + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + NodeUtils.setIpAndFqdnForEelf("doGet"); + eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); + if (down(resp)) { + return; + } + String path = req.getPathInfo(); + String qs = req.getQueryString(); + String ip = req.getRemoteAddr(); + if (qs != null) { + path = path + "?" + qs; + } + if ("/internal/fetchProv".equals(path)) { + config.gofetch(ip); + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + return; + } else if (path.startsWith("/internal/resetSubscription/")) { + String subid = path.substring(28); + if (subid.length() != 0 && subid.indexOf('/') == -1) { + NodeMain.resetQueue(subid, ip); + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + return; + } + } + if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) { + if (path.startsWith("/internal/logs/")) { + String f = path.substring(15); + File fn = new File(config.getLogDir() + "/" + f); + if (f.indexOf('/') != -1 || !fn.isFile()) { + logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + byte[] buf = new byte[65536]; + resp.setContentType("text/plain"); + resp.setContentLength((int)fn.length()); + resp.setStatus(200); + InputStream is = new FileInputStream(fn); + OutputStream os = resp.getOutputStream(); + int i; + while ((i = is.read(buf)) > 0) { + os.write(buf, 0, i); + } + is.close(); + return; + } + if (path.startsWith("/internal/rtt/")) { + String xip = path.substring(14); + long st = System.currentTimeMillis(); + String status = " unknown"; + try { + Socket s = new Socket(xip, 443); + s.close(); + status = " connected"; + } catch (Exception e) { + status = " error " + e.toString(); + } + long dur = System.currentTimeMillis() - st; + resp.setContentType("text/plain"); + resp.setStatus(200); + byte[] buf = (dur + status + "\n").getBytes(); + resp.setContentLength(buf.length); + resp.getOutputStream().write(buf); + return; + } + } + logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + /** + * Handle all PUT requests + */ + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + NodeUtils.setIpAndFqdnForEelf("doPut"); + eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); + common(req, resp, true); + } + /** + * Handle all DELETE requests + */ + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + NodeUtils.setIpAndFqdnForEelf("doDelete"); + eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); + common(req, resp, false); + } + private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException { + if (down(resp)) { + return; + } + if (!req.isSecure()) { + logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); + return; + } + String fileid = req.getPathInfo(); + if (fileid == null) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); + return; + } + String feedid = null; + String user = null; + String credentials = req.getHeader("Authorization"); + if (credentials == null) { + logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required"); + return; + } + String ip = req.getRemoteAddr(); + String lip = req.getLocalAddr(); + String pubid = null; + String xpubid = null; + String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; + Target[] targets = null; + if (fileid.startsWith("/publish/")) { + fileid = fileid.substring(9); + int i = fileid.indexOf('/'); + if (i == -1 || i == fileid.length() - 1) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /. Possible missing fileid."); + return; + } + feedid = fileid.substring(0, i); + fileid = fileid.substring(i + 1); + pubid = config.getPublishId(); + xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID"); + targets = config.getTargets(feedid); + } else if (fileid.startsWith("/internal/publish/")) { + if (!config.isAnotherNode(credentials, ip)) { + logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip); + resp.sendError(HttpServletResponse.SC_FORBIDDEN); + return; + } + fileid = fileid.substring(18); + pubid = req.getHeader("X-ATT-DR-PUBLISH-ID"); + targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING")); + } else { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); + return; + } + if (fileid.indexOf('/') != -1) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); + return; + } + String qs = req.getQueryString(); + if (qs != null) { + fileid = fileid + "?" + qs; + } + String hp = config.getMyName(); + int xp = config.getExtHttpsPort(); + if (xp != 443) { + hp = hp + ":" + xp; + } + String logurl = "https://" + hp + "/internal/publish/" + fileid; + if (feedid != null) { + logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid; + String reason = config.isPublishPermitted(feedid, credentials, ip); + if (reason != null) { + logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason); + resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason); + return; + } + user = config.getAuthUser(feedid, credentials); + String newnode = config.getIngressNode(feedid, user, ip); + if (newnode != null) { + String port = ""; + int iport = config.getExtHttpsPort(); + if (iport != 443) { + port = ":" + iport; + } + String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid; + logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto); + resp.sendRedirect(redirto); + return; + } + resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid); + } + String fbase = config.getSpoolDir() + "/" + pubid; + File data = new File(fbase); + File meta = new File(fbase + ".M"); + OutputStream dos = null; + Writer mw = null; + InputStream is = null; + try { + StringBuffer mx = new StringBuffer(); + mx.append(req.getMethod()).append('\t').append(fileid).append('\n'); + Enumeration hnames = req.getHeaderNames(); + String ctype = null; + while (hnames.hasMoreElements()) { + String hn = (String)hnames.nextElement(); + String hnlc = hn.toLowerCase(); + if ((isput && ("content-type".equals(hnlc) || + "content-language".equals(hnlc) || + "content-md5".equals(hnlc) || + "content-range".equals(hnlc))) || + "x-att-dr-meta".equals(hnlc) || + (feedid == null && "x-att-dr-received".equals(hnlc)) || + (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) { + Enumeration hvals = req.getHeaders(hn); + while (hvals.hasMoreElements()) { + String hv = (String)hvals.nextElement(); + if ("content-type".equals(hnlc)) { + ctype = hv; + } + if ("x-att-dr-meta".equals(hnlc)) { + if (hv.length() > 4096) { + logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long"); + return; + } + if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) { + logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata"); + return; + } + } + mx.append(hn).append('\t').append(hv).append('\n'); + } + } + } + mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n'); + String metadata = mx.toString(); + byte[] buf = new byte[1024 * 1024]; + int i; + try { + is = req.getInputStream(); + dos = new FileOutputStream(data); + while ((i = is.read(buf)) > 0) { + dos.write(buf, 0, i); + } + is.close(); + is = null; + dos.close(); + dos = null; + } catch (IOException ioe) { + long exlen = -1; + try { + exlen = Long.parseLong(req.getHeader("Content-Length")); + } catch (Exception e) { + } + StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage()); + throw ioe; + } + Path dpath = Paths.get(fbase); + for (Target t: targets) { + DestInfo di = t.getDestInfo(); + if (di == null) { + // TODO: unknown destination + continue; + } + String dbase = di.getSpool() + "/" + pubid; + Files.createLink(Paths.get(dbase), dpath); + mw = new FileWriter(meta); + mw.write(metadata); + if (di.getSubId() == null) { + mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n"); + } + mw.close(); + meta.renameTo(new File(dbase + ".M")); + } + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + resp.getOutputStream().close(); + StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT); + } catch (IOException ioe) { + logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe); + throw ioe; + } finally { + if (is != null) { try { is.close(); } catch (Exception e) {}} + if (dos != null) { try { dos.close(); } catch (Exception e) {}} + if (mw != null) { try { mw.close(); } catch (Exception e) {}} + try { data.delete(); } catch (Exception e) {} + try { meta.delete(); } catch (Exception e) {} + } + } + + private int getIdFromPath(HttpServletRequest req) { + String path = req.getPathInfo(); + if (path == null || path.length() < 2) + return -1; + try { + return Integer.parseInt(path.substring(1)); + } catch (NumberFormatException e) { + return -1; + } + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java new file mode 100644 index 00000000..303d7ee7 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java @@ -0,0 +1,226 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN; +import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS; +import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; + +import java.security.*; +import java.io.*; +import java.util.*; +import java.security.cert.*; +import java.net.*; +import java.text.*; +import org.apache.commons.codec.binary.Base64; +import org.apache.log4j.Logger; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import org.slf4j.MDC; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * Utility functions for the data router node + */ +public class NodeUtils { + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("org.onap.dmaap.datarouter.node.NodeUtils"); + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeUtils"); + private static SimpleDateFormat logdate; + static { + logdate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + logdate.setTimeZone(TimeZone.getTimeZone("GMT")); + } + private NodeUtils() {} + /** + * Base64 encode a byte array + * @param raw The bytes to be encoded + * @return The encoded string + */ + public static String base64Encode(byte[] raw) { + return(Base64.encodeBase64String(raw)); + } + /** + * Given a user and password, generate the credentials + * @param user User name + * @param password User password + * @return Authorization header value + */ + public static String getAuthHdr(String user, String password) { + if (user == null || password == null) { + return(null); + } + return("Basic " + base64Encode((user + ":" + password).getBytes())); + } + /** + * Given a node name, generate the credentials + * @param node Node name + */ + public static String getNodeAuthHdr(String node, String key) { + try { + MessageDigest md = MessageDigest.getInstance("SHA"); + md.update(key.getBytes()); + md.update(node.getBytes()); + md.update(key.getBytes()); + return(getAuthHdr(node, base64Encode(md.digest()))); + } catch (Exception e) { + return(null); + } + } + /** + * Given a keystore file and its password, return the value of the CN of the first private key entry with a certificate. + * @param kstype The type of keystore + * @param ksfile The file name of the keystore + * @param kspass The password of the keystore + * @return CN of the certificate subject or null + */ + public static String getCanonicalName(String kstype, String ksfile, String kspass) { + try { + KeyStore ks = KeyStore.getInstance(kstype); + ks.load(new FileInputStream(ksfile), kspass.toCharArray()); + return(getCanonicalName(ks)); + } catch (Exception e) { + setIpAndFqdnForEelf("getCanonicalName"); + eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, ksfile, e.toString()); + logger.error("NODE0401 Error loading my keystore file + " + ksfile + " " + e.toString(), e); + return(null); + } + } + /** + * Given a keystore, return the value of the CN of the first private key entry with a certificate. + * @param ks The KeyStore + * @return CN of the certificate subject or null + */ + public static String getCanonicalName(KeyStore ks) { + try { + Enumeration aliases = ks.aliases(); + while (aliases.hasMoreElements()) { + String s = aliases.nextElement(); + if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) { + X509Certificate c = (X509Certificate)ks.getCertificate(s); + if (c != null) { + String subject = c.getSubjectX500Principal().getName(); + String[] parts = subject.split(","); + if (parts.length < 1) { + return(null); + } + subject = parts[0].trim(); + if (!subject.startsWith("CN=")) { + return(null); + + } + return(subject.substring(3)); + } + } + } + } catch (Exception e) { + logger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e); + } + return(null); + } + /** + * Given a string representation of an IP address, get the corresponding byte array + * @param ip The IP address as a string + * @return The IP address as a byte array or null if the address is invalid + */ + public static byte[] getInetAddress(String ip) { + try { + return(InetAddress.getByName(ip).getAddress()); + } catch (Exception e) { + } + return(null); + } + /** + * Given a uri with parameters, split out the feed ID and file ID + */ + public static String[] getFeedAndFileID(String uriandparams) { + int end = uriandparams.length(); + int i = uriandparams.indexOf('#'); + if (i != -1 && i < end) { + end = i; + } + i = uriandparams.indexOf('?'); + if (i != -1 && i < end) { + end = i; + } + end = uriandparams.lastIndexOf('/', end); + if (end < 2) { + return(null); + } + i = uriandparams.lastIndexOf('/', end - 1); + if (i == -1) { + return(null); + } + return(new String[] { uriandparams.substring(i + 1, end - 1), uriandparams.substring(end + 1) }); + } + /** + * Escape fields that might contain vertical bar, backslash, or newline by replacing them with backslash p, backslash e and backslash n. + */ + public static String loge(String s) { + if (s == null) { + return(s); + } + return(s.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n")); + } + /** + * Undo what loge does. + */ + public static String unloge(String s) { + if (s == null) { + return(s); + } + return(s.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\")); + } + /** + * Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ + */ + public static String logts(long when) { + return(logts(new Date(when))); + } + /** + * Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ + */ + public static synchronized String logts(Date when) { + return(logdate.format(when)); + } + + /* Method prints method name, server FQDN and IP Address of the machine in EELF logs + * @Method - setIpAndFqdnForEelf - Rally:US664892 + * @Params - method, prints method name in EELF log. + */ + public static void setIpAndFqdnForEelf(String method) { + MDC.clear(); + MDC.put(MDC_SERVICE_NAME, method); + try { + MDC.put(MDC_SERVER_FQDN, InetAddress.getLocalHost().getHostName()); + MDC.put(MDC_SERVER_IP_ADDRESS, InetAddress.getLocalHost().getHostAddress()); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java new file mode 100644 index 00000000..b99f1685 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.util.*; + +/** + * Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to get from this node to any other node + */ + +public class PathFinder { + private static class Hop { + public boolean mark; + public boolean bad; + public NodeConfig.ProvHop basis; + } + private Vector errors = new Vector(); + private Hashtable routes = new Hashtable(); + /** + * Get list of errors encountered while finding paths + * @return array of error descriptions + */ + public String[] getErrors() { + return(errors.toArray(new String[errors.size()])); + } + /** + * Get the route from this node to the specified node + * @param destination node + * @return list of node names separated by and ending with "/" + */ + public String getPath(String destination) { + String ret = routes.get(destination); + if (ret == null) { + return(""); + } + return(ret); + } + private String plot(String from, String to, Hashtable info) { + Hop nh = info.get(from); + if (nh == null || nh.bad) { + return(to); + } + if (nh.mark) { + // loop detected; + while (!nh.bad) { + nh.bad = true; + errors.add(nh.basis + " is part of a cycle"); + nh = info.get(nh.basis.getVia()); + } + return(to); + } + nh.mark = true; + String x = plot(nh.basis.getVia(), to, info); + nh.mark = false; + if (nh.bad) { + return(to); + } + return(nh.basis.getVia() + "/" + x); + } + /** + * Find routes from a specified origin to all of the nodes given a set of specified next hops. + * @param origin where we start + * @param nodes where we can go + * @param hops detours along the way + */ + public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) { + HashSet known = new HashSet(); + Hashtable> ht = new Hashtable>(); + for (String n: nodes) { + known.add(n); + ht.put(n, new Hashtable()); + } + for (NodeConfig.ProvHop ph: hops) { + if (!known.contains(ph.getFrom())) { + errors.add(ph + " references unknown from node"); + continue; + } + if (!known.contains(ph.getTo())) { + errors.add(ph + " references unknown destination node"); + continue; + } + Hashtable ht2 = ht.get(ph.getTo()); + Hop h = ht2.get(ph.getFrom()); + if (h != null) { + h.bad = true; + errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia()); + continue; + } + h = new Hop(); + h.basis = ph; + ht2.put(ph.getFrom(), h); + if (!known.contains(ph.getVia())) { + errors.add(ph + " references unknown via node"); + h.bad = true; + continue; + } + if (ph.getVia().equals(ph.getTo())) { + errors.add(ph + " gives destination as via"); + h.bad = true; + continue; + } + } + for (String n: known) { + if (n.equals(origin)) { + routes.put(n, ""); + } + routes.put(n, plot(origin, n, ht.get(n)) + "/"); + } + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java new file mode 100644 index 00000000..c0e46eb3 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java @@ -0,0 +1,302 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.io.*; +import java.util.*; +import org.json.*; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import org.apache.log4j.Logger; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * Parser for provisioning data from the provisioning server. + *

+ * The ProvData class uses a Reader for the text configuration from the + * provisioning server to construct arrays of raw configuration entries. + */ +public class ProvData { + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("org.onap.dmaap.datarouter.node.ProvData"); + private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.ProvData"); + private NodeConfig.ProvNode[] pn; + private NodeConfig.ProvParam[] pp; + private NodeConfig.ProvFeed[] pf; + private NodeConfig.ProvFeedUser[] pfu; + private NodeConfig.ProvFeedSubnet[] pfsn; + private NodeConfig.ProvSubscription[] ps; + private NodeConfig.ProvForceIngress[] pfi; + private NodeConfig.ProvForceEgress[] pfe; + private NodeConfig.ProvHop[] ph; + private static String[] gvasa(JSONArray a, int index) { + return(gvasa(a.get(index))); + } + private static String[] gvasa(JSONObject o, String key) { + return(gvasa(o.opt(key))); + } + private static String[] gvasa(Object o) { + if (o instanceof JSONArray) { + JSONArray a = (JSONArray)o; + Vector v = new Vector(); + for (int i = 0; i < a.length(); i++) { + String s = gvas(a, i); + if (s != null) { + v.add(s); + } + } + return(v.toArray(new String[v.size()])); + } else { + String s = gvas(o); + if (s == null) { + return(new String[0]); + } else { + return(new String[] { s }); + } + } + } + private static String gvas(JSONArray a, int index) { + return(gvas(a.get(index))); + } + private static String gvas(JSONObject o, String key) { + return(gvas(o.opt(key))); + } + private static String gvas(Object o) { + if (o instanceof Boolean || o instanceof Number || o instanceof String) { + return(o.toString()); + } + return(null); + } + /** + * Construct raw provisioing data entries from the text (JSON) + * provisioning document received from the provisioning server + * @param r The reader for the JSON text. + */ + public ProvData(Reader r) throws IOException { + Vector pnv = new Vector(); + Vector ppv = new Vector(); + Vector pfv = new Vector(); + Vector pfuv = new Vector(); + Vector pfsnv = new Vector(); + Vector psv = new Vector(); + Vector pfiv = new Vector(); + Vector pfev = new Vector(); + Vector phv = new Vector(); + try { + JSONTokener jtx = new JSONTokener(r); + JSONObject jcfg = new JSONObject(jtx); + char c = jtx.nextClean(); + if (c != '\0') { + throw new JSONException("Spurious characters following configuration"); + } + r.close(); + JSONArray jfeeds = jcfg.optJSONArray("feeds"); + if (jfeeds != null) { + for (int fx = 0; fx < jfeeds.length(); fx++) { + JSONObject jfeed = jfeeds.getJSONObject(fx); + String stat = null; + if (jfeed.optBoolean("suspend", false)) { + stat = "Feed is suspended"; + } + if (jfeed.optBoolean("deleted", false)) { + stat = "Feed is deleted"; + } + String fid = gvas(jfeed, "feedid"); + String fname = gvas(jfeed, "name"); + String fver = gvas(jfeed, "version"); + pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat)); + JSONObject jauth = jfeed.optJSONObject("authorization"); + if (jauth == null) { + continue; + } + JSONArray jeids = jauth.optJSONArray("endpoint_ids"); + if (jeids != null) { + for (int ux = 0; ux < jeids.length(); ux++) { + JSONObject ju = jeids.getJSONObject(ux); + String login = gvas(ju, "id"); + String password = gvas(ju, "password"); + pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password))); + } + } + JSONArray jeips = jauth.optJSONArray("endpoint_addrs"); + if (jeips != null) { + for (int ix = 0; ix < jeips.length(); ix++) { + String sn = gvas(jeips, ix); + pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn)); + } + } + } + } + JSONArray jsubs = jcfg.optJSONArray("subscriptions"); + if (jsubs != null) { + for (int sx = 0; sx < jsubs.length(); sx++) { + JSONObject jsub = jsubs.getJSONObject(sx); + if (jsub.optBoolean("suspend", false)) { + continue; + } + String sid = gvas(jsub, "subid"); + String fid = gvas(jsub, "feedid"); + JSONObject jdel = jsub.getJSONObject("delivery"); + String delurl = gvas(jdel, "url"); + String id = gvas(jdel, "user"); + String password = gvas(jdel, "password"); + boolean monly = jsub.getBoolean("metadataOnly"); + boolean use100 = jdel.getBoolean("use100"); + psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100)); + } + } + JSONObject jparams = jcfg.optJSONObject("parameters"); + if (jparams != null) { + for (String pname: JSONObject.getNames(jparams)) { + String pvalue = gvas(jparams, pname); + if (pvalue != null) { + ppv.add(new NodeConfig.ProvParam(pname, pvalue)); + } + } + String sfx = gvas(jparams, "PROV_DOMAIN"); + JSONArray jnodes = jparams.optJSONArray("NODES"); + if (jnodes != null) { + for (int nx = 0; nx < jnodes.length(); nx++) { + String nn = gvas(jnodes, nx); + if (nn.indexOf('.') == -1) { + nn = nn + "." + sfx; + } + pnv.add(new NodeConfig.ProvNode(nn)); + } + } + } + JSONArray jingresses = jcfg.optJSONArray("ingress"); + if (jingresses != null) { + for (int fx = 0; fx < jingresses.length(); fx++) { + JSONObject jingress = jingresses.getJSONObject(fx); + String fid = gvas(jingress, "feedid"); + String subnet = gvas(jingress, "subnet"); + String user = gvas(jingress, "user"); + String[] nodes = gvasa(jingress, "node"); + if (fid == null || "".equals(fid)) { + continue; + } + if ("".equals(subnet)) { + subnet = null; + } + if ("".equals(user)) { + user = null; + } + pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes)); + } + } + JSONObject jegresses = jcfg.optJSONObject("egress"); + if (jegresses != null && JSONObject.getNames(jegresses) != null) { + for (String esid: JSONObject.getNames(jegresses)) { + String enode = gvas(jegresses, esid); + if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) { + pfev.add(new NodeConfig.ProvForceEgress(esid, enode)); + } + } + } + JSONArray jhops = jcfg.optJSONArray("routing"); + if (jhops != null) { + for (int fx = 0; fx < jhops.length(); fx++) { + JSONObject jhop = jhops.getJSONObject(fx); + String from = gvas(jhop, "from"); + String to = gvas(jhop, "to"); + String via = gvas(jhop, "via"); + if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) { + continue; + } + phv.add(new NodeConfig.ProvHop(from, to, via)); + } + } + } catch (JSONException jse) { + NodeUtils.setIpAndFqdnForEelf("ProvData"); + eelflogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString()); + logger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse); + throw new IOException(jse.toString(), jse); + } + pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]); + pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]); + pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]); + pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]); + pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]); + ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]); + pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]); + pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]); + ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]); + } + /** + * Get the raw node configuration entries + */ + public NodeConfig.ProvNode[] getNodes() { + return(pn); + } + /** + * Get the raw parameter configuration entries + */ + public NodeConfig.ProvParam[] getParams() { + return(pp); + } + /** + * Ge the raw feed configuration entries + */ + public NodeConfig.ProvFeed[] getFeeds() { + return(pf); + } + /** + * Get the raw feed user configuration entries + */ + public NodeConfig.ProvFeedUser[] getFeedUsers() { + return(pfu); + } + /** + * Get the raw feed subnet configuration entries + */ + public NodeConfig.ProvFeedSubnet[] getFeedSubnets() { + return(pfsn); + } + /** + * Get the raw subscription entries + */ + public NodeConfig.ProvSubscription[] getSubscriptions() { + return(ps); + } + /** + * Get the raw forced ingress entries + */ + public NodeConfig.ProvForceIngress[] getForceIngress() { + return(pfi); + } + /** + * Get the raw forced egress entries + */ + public NodeConfig.ProvForceEgress[] getForceEgress() { + return(pfe); + } + /** + * Get the raw next hop entries + */ + public NodeConfig.ProvHop[] getHops() { + return(ph); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java new file mode 100644 index 00000000..fa285b27 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +/** + * Generate publish IDs + */ +public class PublishId { + private long nextuid; + private String myname; + + /** + * Generate publish IDs for the specified name + * @param myname Unique identifier for this publish ID generator (usually fqdn of server) + */ + public PublishId(String myname) { + this.myname = myname; + } + /** + * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes. + */ + public synchronized String next() { + long now = System.currentTimeMillis(); + if (now < nextuid) { + now = nextuid; + } + nextuid = now + 1; + return(now + "." + myname); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java new file mode 100644 index 00000000..5890e193 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java @@ -0,0 +1,102 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.util.*; + +/** + * Execute an operation no more frequently than a specified interval + */ + +public abstract class RateLimitedOperation implements Runnable { + private boolean marked; // a timer task exists + private boolean executing; // the operation is currently in progress + private boolean remark; // a request was made while the operation was in progress + private Timer timer; + private long last; // when the last operation started + private long mininterval; + /** + * Create a rate limited operation + * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin + * @param timer The timer used to perform deferred executions + */ + public RateLimitedOperation(long mininterval, Timer timer) { + this.timer = timer; + this.mininterval = mininterval; + } + private class deferred extends TimerTask { + public void run() { + execute(); + } + } + private synchronized void unmark() { + marked = false; + } + private void execute() { + unmark(); + request(); + } + /** + * Request that the operation be performed by this thread or at a later time by the timer + */ + public void request() { + if (premark()) { + return; + } + do { + run(); + } while (demark()); + } + private synchronized boolean premark() { + if (executing) { + // currently executing - wait until it finishes + remark = true; + return(true); + } + if (marked) { + // timer currently running - will run when it expires + return(true); + } + long now = System.currentTimeMillis(); + if (last + mininterval > now) { + // too soon - schedule a timer + marked = true; + timer.schedule(new deferred(), last + mininterval - now); + return(true); + } + last = now; + executing = true; + // start execution + return(false); + } + private synchronized boolean demark() { + executing = false; + if (remark) { + remark = false; + return(!premark()); + } + return(false); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java new file mode 100644 index 00000000..336eee32 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java @@ -0,0 +1,118 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.util.*; +import java.io.*; + +/** + * Track redirections of subscriptions + */ +public class RedirManager { + private Hashtable sid2primary = new Hashtable(); + private Hashtable sid2secondary = new Hashtable(); + private String redirfile; + RateLimitedOperation op; + /** + * Create a mechanism for maintaining subscription redirections. + * @param redirfile The file to store the redirection information. + * @param mininterval The minimum number of milliseconds between writes to the redirection information file. + * @param timer The timer thread used to run delayed file writes. + */ + public RedirManager(String redirfile, long mininterval, Timer timer) { + this.redirfile = redirfile; + op = new RateLimitedOperation(mininterval, timer) { + public void run() { + try { + StringBuffer sb = new StringBuffer(); + for (String s: sid2primary.keySet()) { + sb.append(s).append(' ').append(sid2primary.get(s)).append(' ').append(sid2secondary.get(s)).append('\n'); + } + OutputStream os = new FileOutputStream(RedirManager.this.redirfile); + os.write(sb.toString().getBytes()); + os.close(); + } catch (Exception e) { + } + } + }; + try { + String s; + BufferedReader br = new BufferedReader(new FileReader(redirfile)); + while ((s = br.readLine()) != null) { + s = s.trim(); + String[] sx = s.split(" "); + if (s.startsWith("#") || sx.length != 3) { + continue; + } + sid2primary.put(sx[0], sx[1]); + sid2secondary.put(sx[0], sx[2]); + } + br.close(); + } catch (Exception e) { + // missing file is normal + } + } + /** + * Set up redirection. If a request is to be sent to subscription ID sid, and that is configured to go to URL primary, instead, go to secondary. + * @param sid The subscription ID to be redirected + * @param primary The URL associated with that subscription ID + * @param secondary The replacement URL to use instead + */ + public synchronized void redirect(String sid, String primary, String secondary) { + sid2primary.put(sid, primary); + sid2secondary.put(sid, secondary); + op.request(); + } + /** + * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its primary URL. + * @param sid The subscription ID to remove from the table. + */ + public synchronized void forget(String sid) { + sid2primary.remove(sid); + sid2secondary.remove(sid); + op.request(); + } + /** + * Look up where to send a subscription. If the primary has changed or there is no redirection, use the primary. Otherwise, redirect to the secondary URL. + * @param sid The subscription ID to look up. + * @param primary The configured primary URL. + * @return The destination URL to really use. + */ + public synchronized String lookup(String sid, String primary) { + String oprim = sid2primary.get(sid); + if (primary.equals(oprim)) { + return(sid2secondary.get(sid)); + } else if (oprim != null) { + forget(sid); + } + return(primary); + } + /** + * Is a subscription redirected? + */ + public synchronized boolean isRedirected(String sid) { + return(sid != null && sid2secondary.get(sid) != null); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java new file mode 100644 index 00000000..109eef27 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java @@ -0,0 +1,229 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + +package org.onap.dmaap.datarouter.node; + +import java.util.regex.*; +import java.util.*; +import java.io.*; +import java.nio.file.*; +import java.text.*; + +/** + * Logging for data router delivery events (PUB/DEL/EXP) + */ +public class StatusLog { + private static StatusLog instance = new StatusLog(); + private HashSet toship = new HashSet(); + private SimpleDateFormat filedate; + private String prefix = "logs/events"; + private String suffix = ".log"; + private String plainfile; + private String curfile; + private long nexttime; + private OutputStream os; + private long intvl; + private NodeConfigManager config = NodeConfigManager.getInstance(); + { + try { filedate = new SimpleDateFormat("-yyyyMMddHHmm"); } catch (Exception e) {} + } + /** + * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours. If no units are specified, assume seconds. + */ + public static long parseInterval(String interval, int def) { + try { + Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval); + if (m.matches()) { + int dur = 0; + String x = m.group(1); + if (x != null) { + dur += 3600 * Integer.parseInt(x); + } + x = m.group(2); + if (x != null) { + dur += 60 * Integer.parseInt(x); + } + x = m.group(3); + if (x != null) { + dur += Integer.parseInt(x); + } + if (dur < 60) { + dur = 60; + } + int best = 86400; + int dist = best - dur; + if (dur > best) { + dist = dur - best; + } + int base = 1; + for (int i = 0; i < 8; i++) { + int base2 = base; + base *= 2; + for (int j = 0; j < 4; j++) { + int base3 = base2; + base2 *= 3; + for (int k = 0; k < 3; k++) { + int cur = base3; + base3 *= 5; + int ndist = cur - dur; + if (dur > cur) { + ndist = dur - cur; + } + if (ndist < dist) { + best = cur; + dist = ndist; + } + } + } + } + def = best * 1000; + } + } catch (Exception e) { + } + return(def); + } + private synchronized void checkRoll(long now) throws IOException { + if (now >= nexttime) { + if (os != null) { + os.close(); + os = null; + } + intvl = parseInterval(config.getEventLogInterval(), 300000); + prefix = config.getEventLogPrefix(); + suffix = config.getEventLogSuffix(); + nexttime = now - now % intvl + intvl; + curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix; + plainfile = prefix + suffix; + notify(); + } + } + /** + * Get the name of the current log file + * @return The full path name of the current event log file + */ + public static synchronized String getCurLogFile() { + try { + instance.checkRoll(System.currentTimeMillis()); + } catch (Exception e) { + } + return(instance.curfile); + } + private synchronized void log(String s) { + try { + long now = System.currentTimeMillis(); + checkRoll(now); + if (os == null) { + os = new FileOutputStream(curfile, true); + (new File(plainfile)).delete(); + Files.createLink(Paths.get(plainfile), Paths.get(curfile)); + } + os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes()); + os.flush(); + } catch (IOException ioe) { + } + } + /** + * Log a received publication attempt. + * @param pubid The publish ID assigned by the node + * @param feedid The feed id given by the publisher + * @param requrl The URL of the received request + * @param method The method (DELETE or PUT) in the received request + * @param ctype The content type (if method is PUT and clen > 0) + * @param clen The content length (if method is PUT) + * @param srcip The IP address of the publisher + * @param user The identity of the publisher + * @param status The status returned to the publisher + */ + public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) { + instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status); + } + /** + * Log a data transfer error receiving a publication attempt + * @param pubid The publish ID assigned by the node + * @param feedid The feed id given by the publisher + * @param requrl The URL of the received request + * @param method The method (DELETE or PUT) in the received request + * @param ctype The content type (if method is PUT and clen > 0) + * @param clen The expected content length (if method is PUT) + * @param rcvd The content length received + * @param srcip The IP address of the publisher + * @param user The identity of the publisher + * @param error The error message from the IO exception + */ + public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) { + instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error); + } + /** + * Log a delivery attempt. + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param requrl The URL used in the attempt + * @param method The method (DELETE or PUT) in the attempt + * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) + * @param clen The content length (if PUT and not metaonly) + * @param user The identity given to the subscriber + * @param status The status returned by the subscriber or -1 if an exeception occured trying to connect + * @param xpubid The publish ID returned by the subscriber + */ + public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) { + if (feedid == null) { + return; + } + instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid); + } + /** + * Log delivery attempts expired + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param requrl The URL that would be delivered to + * @param method The method (DELETE or PUT) in the request + * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) + * @param clen The content length (if PUT and not metaonly) + * @param reason The reason the attempts were discontinued + * @param attempts The number of attempts made + */ + public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) { + if (feedid == null) { + return; + } + instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts); + } + /** + * Log extra statistics about unsuccessful delivery attempts. + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param clen The content length + * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred. + */ + public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) { + if (feedid == null) { + return; + } + instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent); + } + private StatusLog() { + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java new file mode 100644 index 00000000..f353d513 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.net.*; + +/** + * Compare IP addresses as byte arrays to a subnet specified as a CIDR + */ +public class SubnetMatcher { + private byte[] sn; + private int len; + private int mask; + /** + * Construct a subnet matcher given a CIDR + * @param subnet The CIDR to match + */ + public SubnetMatcher(String subnet) { + int i = subnet.lastIndexOf('/'); + if (i == -1) { + sn = NodeUtils.getInetAddress(subnet); + len = sn.length; + } else { + len = Integer.parseInt(subnet.substring(i + 1)); + sn = NodeUtils.getInetAddress(subnet.substring(0, i)); + mask = ((0xff00) >> (len % 8)) & 0xff; + len /= 8; + } + } + /** + * Is the IP address in the CIDR? + * @param addr the IP address as bytes in network byte order + * @return true if the IP address matches. + */ + public boolean matches(byte[] addr) { + if (addr.length != sn.length) { + return(false); + } + for (int i = 0; i < len; i++) { + if (addr[i] != sn[i]) { + return(false); + } + } + if (mask != 0 && ((addr[len] ^ sn[len]) & mask) != 0) { + return(false); + } + return(true); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java new file mode 100644 index 00000000..eeb18408 --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +/** + * A destination to deliver a message + */ +public class Target { + private DestInfo destinfo; + private String routing; + /** + * A destination to deliver a message + * @param destinfo Either info for a subscription ID or info for a node-to-node transfer + * @param routing For a node-to-node transfer, what to do when it gets there. + */ + public Target(DestInfo destinfo, String routing) { + this.destinfo = destinfo; + this.routing = routing; + } + /** + * Add additional routing + */ + public void addRouting(String routing) { + this.routing = this.routing + " " + routing; + } + /** + * Get the destination information for this target + */ + public DestInfo getDestInfo() { + return(destinfo); + } + /** + * Get the next hop information for this target + */ + public String getRouting() { + return(routing); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java new file mode 100644 index 00000000..c8d58e6e --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java @@ -0,0 +1,113 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import java.util.*; + +/** + * Manage a list of tasks to be executed when an event occurs. + * This makes the following guarantees: + *

    + *
  • Tasks can be safely added and removed in the middle of a run.
  • + *
  • No task will be returned more than once during a run.
  • + *
  • No task will be returned when it is not, at that moment, in the list of tasks.
  • + *
  • At the moment when next() returns null, all tasks on the list have been returned during the run.
  • + *
  • Initially and once next() returns null during a run, next() will continue to return null until startRun() is called. + *
+ */ +public class TaskList { + private Iterator runlist; + private HashSet tasks = new HashSet(); + private HashSet togo; + private HashSet sofar; + private HashSet added; + private HashSet removed; + /** + * Construct a new TaskList + */ + public TaskList() { + } + /** + * Start executing the sequence of tasks. + */ + public synchronized void startRun() { + sofar = new HashSet(); + added = new HashSet(); + removed = new HashSet(); + togo = new HashSet(tasks); + runlist = togo.iterator(); + } + /** + * Get the next task to execute + */ + public synchronized Runnable next() { + while (runlist != null) { + if (runlist.hasNext()) { + Runnable task = runlist.next(); + if (removed.contains(task)) { + continue; + } + if (sofar.contains(task)) { + continue; + } + sofar.add(task); + return(task); + } + if (added.size() != 0) { + togo = added; + added = new HashSet(); + removed.clear(); + runlist = togo.iterator(); + continue; + } + togo = null; + added = null; + removed = null; + sofar = null; + runlist = null; + } + return(null); + } + /** + * Add a task to the list of tasks to run whenever the event occurs. + */ + public synchronized void addTask(Runnable task) { + if (runlist != null) { + added.add(task); + removed.remove(task); + } + tasks.add(task); + } + /** + * Remove a task from the list of tasks to run whenever the event occurs. + */ + public synchronized void removeTask(Runnable task) { + if (runlist != null) { + removed.add(task); + added.remove(task); + } + tasks.remove(task); + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java new file mode 100644 index 00000000..225205ca --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EELFFilter.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ +package org.onap.dmaap.datarouter.node.eelf; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.filter.Filter; +import ch.qos.logback.core.spi.FilterReply; + +/* + * When EELF functionality added it default started logging Jetty logs as well which in turn stopped existing functionality of logging jetty statements in node.log + * added code in logback.xml to add jetty statements in node.log. + * This class removes extran EELF statements from node.log since they are being logged in apicalls.log + */ +public class EELFFilter extends Filter{ + @Override + public FilterReply decide(ILoggingEvent event) { + if (event.getMessage().contains("EELF")) { + return FilterReply.DENY; + } else { + return FilterReply.ACCEPT; + } + } +} diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EelfMsgs.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EelfMsgs.java new file mode 100644 index 00000000..509bceea --- /dev/null +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/eelf/EelfMsgs.java @@ -0,0 +1,96 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ +package org.onap.dmaap.datarouter.node.eelf; + +import com.att.eelf.i18n.EELFResolvableErrorEnum; +import com.att.eelf.i18n.EELFResourceManager; + +public enum EelfMsgs implements EELFResolvableErrorEnum { + + /** + * Application message prints user (accepts one argument) + */ + MESSAGE_WITH_BEHALF, + + /** + * Application message prints user and FeedID (accepts two arguments) + */ + + MESSAGE_WITH_BEHALF_AND_FEEDID, + + /** + * Application message prints keystore file error in EELF errors log + */ + + MESSAGE_KEYSTORE_LOAD_ERROR, + + /** + * Application message prints Error extracting my name from my keystore file + */ + + MESSAGE_KEYSORE_NAME_ERROR, + + /** + * Application message prints Error parsing configuration data from provisioning server. + */ + + + MESSAGE_PARSING_ERROR, + + /** + * Application message printsConfiguration failed + */ + + + MESSAGE_CONF_FAILED, + + /** + * Application message prints Bad provisioning server URL + */ + + + MESSAGE_BAD_PROV_URL, + + /** + * Application message prints Unable to fetch canonical name from keystore file + */ + + + MESSAGE_KEYSTORE_FETCH_ERROR, + + /** + * Application message prints Unable to load local configuration file. + */ + + + MESSAGE_PROPERTIES_LOAD_ERROR; + + + /** + * Static initializer to ensure the resource bundles for this class are loaded... + * Here this application loads messages from three bundles + */ + static { + EELFResourceManager.loadMessageBundle("EelfMessages"); + } +} -- cgit 1.2.3-korg