diff options
Diffstat (limited to 'datarouter-node')
9 files changed, 156 insertions, 43 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index b2c31691..a3af88fc 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -27,6 +27,7 @@ package org.onap.dmaap.datarouter.node; import java.io.*; import java.net.*; import java.util.*; +import java.util.zip.GZIPInputStream; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -35,6 +36,7 @@ import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; import static com.att.eelf.configuration.Configuration.*; +import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; /** * A file to be delivered to a destination. @@ -69,10 +71,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Create a delivery task for a given delivery queue and pub ID * - * @param deliveryTaskHelper The delivery task helper for the queue this task is in. - * @param pubid The publish ID for this file. This is used as - * the base for the file name in the spool directory and is of - * the form <milliseconds since 1970>.<fqdn of initial data router node> + * @param deliveryTaskHelper The delivery task helper for the queue this task is in. + * @param pubid The publish ID for this file. This is used as + * the base for the file name in the spool directory and is of + * the form <milliseconds since 1970>.<fqdn of initial data router node> */ public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { this.deliveryTaskHelper = deliveryTaskHelper; @@ -128,6 +130,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { hdrs = hdrv.toArray(new String[hdrv.size()][]); url = deliveryTaskHelper.getDestURL(fileid); } + /** * Is the object a DeliveryTask with the same publication ID? */ @@ -158,6 +161,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { public String toString() { return (pubid); } + /** * Get the publish ID */ @@ -178,6 +182,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } + if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){ + fileid = fileid.replace(".gz", ""); + } url = deliveryTaskHelper.getDestURL(fileid); URL u = new URL(url); HttpURLConnection uc = (HttpURLConnection) u.openConnection(); @@ -195,37 +202,16 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { if (expect100) { uc.setRequestProperty("Expect", "100-continue"); } - uc.setFixedLengthStreamingMode(length); uc.setDoOutput(true); - OutputStream os = null; - try { - os = uc.getOutputStream(); - } catch (ProtocolException pe) { - deliveryTaskHelper.reportDeliveryExtra(this, -1L); - // Rcvd error instead of 100-continue - loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe); - } - if (os != null) { - long sofar = 0; - try (InputStream is = new FileInputStream(datafile)) { - byte[] buf = new byte[1024 * 1024]; - while (sofar < length) { - int i = buf.length; - if (sofar + i > length) { - i = (int) (length - sofar); - } - i = is.read(buf, 0, i); - if (i <= 0) { - throw new IOException("Unexpected problem reading data file " + datafile); - } - sofar += i; - os.write(buf, 0, i); - } - os.close(); - } catch (IOException ioe) { - deliveryTaskHelper.reportDeliveryExtra(this, sofar); - throw ioe; + if (destInfo.isDecompress()) { + if (isFiletypeGzip(datafile)) { + sendDecompressedFile(uc); + } else { + uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT"); + sendFile(uc); } + } else { + sendFile(uc); } } int rc = uc.getResponseCode(); @@ -259,12 +245,91 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { - loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); + loggerDeliveryTask.error("Exception " + e.getStackTrace(), e); deliveryTaskHelper.reportException(this, e); } } /** + * To send decompressed gzip to the subscribers + * + * @param httpURLConnection connection used to make request + * @throws IOException + */ + private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException { + byte[] buffer = new byte[8164]; + httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS"); + OutputStream outputStream = getOutputStream(httpURLConnection); + if (outputStream != null) { + int bytesRead = 0; + try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) { + int bufferLength = buffer.length; + while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) { + outputStream.write(buffer, 0, bytesRead); + } + outputStream.close(); + } catch (IOException e) { + httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE"); + loggerDeliveryTask.info("Could not decompress file"); + sendFile(httpURLConnection); + } + + } + } + + /** + * To send any file to the subscriber. + * + * @param httpURLConnection connection used to make request + * @throws IOException + */ + private void sendFile(HttpURLConnection httpURLConnection) throws IOException { + OutputStream os = getOutputStream(httpURLConnection); + if (os != null) { + long sofar = 0; + try (InputStream is = new FileInputStream(datafile)) { + byte[] buf = new byte[1024 * 1024]; + while (sofar < length) { + int i = buf.length; + if (sofar + i > length) { + i = (int) (length - sofar); + } + i = is.read(buf, 0, i); + if (i <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); + } + sofar += i; + os.write(buf, 0, i); + } + os.close(); + } catch (IOException ioe) { + deliveryTaskHelper.reportDeliveryExtra(this, sofar); + throw ioe; + } + } + } + + /** + * Get the outputstream that will be used to send data + * + * @param httpURLConnection connection used to make request + * @return AN Outpustream that can be used to send your data. + * @throws IOException + */ + private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { + OutputStream outputStream = null; + + try { + outputStream = httpURLConnection.getOutputStream(); + } catch (ProtocolException pe) { + deliveryTaskHelper.reportDeliveryExtra(this, -1L); + // Rcvd error instead of 100-continue + loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe); + } + return outputStream; + } + + /** * Remove meta and data files */ public void clean() { diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java index c3e0057c..73753527 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java @@ -38,6 +38,7 @@ public class DestInfo { private boolean metaonly; private boolean use100; private boolean privilegedSubscriber; + private boolean decompress; /** * Create a destination information object. @@ -52,8 +53,9 @@ public class DestInfo { * @param metaonly Is this a metadata only delivery? * @param use100 Should I use expect 100-continue? * @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file + * @param decompress To see if the they want there information compressed or decompressed */ - public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) { + public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) { this.name = name; this.spool = spool; this.subid = subid; @@ -64,6 +66,7 @@ public class DestInfo { this.metaonly = metaonly; this.use100 = use100; this.privilegedSubscriber = privilegedSubscriber; + this.decompress = decompress; } /** @@ -84,6 +87,7 @@ public class DestInfo { this.metaonly = subscription.isMetaDataOnly(); this.use100 = subscription.isUsing100(); this.privilegedSubscriber = subscription.isPrivilegedSubscriber(); + this.decompress = subscription.isDecompress(); } public boolean equals(Object o) { @@ -180,4 +184,15 @@ public class DestInfo { public boolean isPrivilegedSubscriber() { return (privilegedSubscriber); } + + /** + * Should i decompress the file before sending it on + * + * @return True if I should. + */ + public boolean isDecompress() { + return (decompress); + } + + } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java index ff803afc..032c6ced 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java @@ -105,7 +105,7 @@ public class LogManager extends TimerTask { public Uploader() { dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, - false, false)); + false, false, false)); setDaemon(true); setName("Log Uploader"); start(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java index d3d3d01b..5577e52e 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java @@ -232,6 +232,7 @@ public class NodeConfig { private boolean metaonly; private boolean use100; private boolean privilegedSubscriber; + private boolean decompress; /** * Construct a subscription configuration entry @@ -245,9 +246,10 @@ public class NodeConfig { * @param metaonly Is this a meta data only subscription? * @param use100 Should we send Expect: 100-continue? * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file + * @param decompress To see if they want their information compressed or decompressed */ public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, - boolean metaonly, boolean use100, boolean privilegedSubscriber) { + boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) { this.subid = subid; this.feedid = feedid; this.url = url; @@ -256,6 +258,7 @@ public class NodeConfig { this.metaonly = metaonly; this.use100 = use100; this.privilegedSubscriber = privilegedSubscriber; + this.decompress = decompress; } /** @@ -313,6 +316,13 @@ public class NodeConfig { public boolean isPrivilegedSubscriber() { return (privilegedSubscriber); } + + /** + * Should i decompress the file before sending it on + */ + public boolean isDecompress() { + return (decompress); + } } /** @@ -506,7 +516,7 @@ public class NodeConfig { } String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey); DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, - "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false); + "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false, false); (new File(di.getSpool())).mkdirs(); destInfos.add(di); nodeinfo.put(cn, di); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java index 35b29064..79888795 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java @@ -24,8 +24,6 @@ package org.onap.dmaap.datarouter.node; -import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import java.io.File; @@ -48,6 +46,8 @@ import org.jetbrains.annotations.Nullable; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; +import static org.onap.dmaap.datarouter.node.NodeUtils.*; + /** * Servlet for handling all http and https requests to the data router node * <p> @@ -390,6 +390,7 @@ public class NodeServlet extends HttpServlet { } mw.close(); meta.renameTo(new File(dbase + ".M")); + } resp.setStatus(HttpServletResponse.SC_NO_CONTENT); resp.getOutputStream().close(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java index 2ba97163..d2ab8034 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java @@ -26,6 +26,8 @@ package org.onap.dmaap.datarouter.node; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; + +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetAddress; @@ -37,6 +39,7 @@ import java.util.Date; import java.util.Enumeration; import java.util.TimeZone; import java.util.UUID; +import java.util.zip.GZIPInputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.codec.binary.Base64; @@ -62,7 +65,7 @@ public class NodeUtils { /** * Base64 encode a byte array * - * @param raw The bytes to be encoded + * @param raw The bytes to be encoded * @return The encoded string */ public static String base64Encode(byte[] raw) { @@ -287,5 +290,22 @@ public class NodeUtils { } } + /** + * Method to check to see if file is of type gzip + * + * @param file The name of the file to be checked + * @return True if the file is of type gzip + */ + public static boolean isFiletypeGzip(File file){ + try(FileInputStream fileInputStream = new FileInputStream(file); + GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) { + + return true; + }catch (IOException e){ + nodeUtilsLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e); + return false; + } + } + } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java index 765a4075..77c5e996 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java @@ -174,7 +174,8 @@ public class ProvData { boolean monly = jsub.getBoolean("metadataOnly"); boolean use100 = jdel.getBoolean("use100"); boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber"); - psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber)); + boolean decompress = jsub.getBoolean("decompress"); + psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, decompress)); } } JSONObject jparams = jcfg.optJSONObject("parameters"); diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java index ae8cd2cd..4ca907f7 100644 --- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java +++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java @@ -97,7 +97,7 @@ public class DeliveryTest { private DestInfo[] createDestInfoObjects() { DestInfo[] destInfos = new DestInfo[1]; - DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false); + DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false, false); destInfos[0] = destInfo; return destInfos; } diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java index 5092141a..4b614d56 100644 --- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java +++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java @@ -211,6 +211,7 @@ public class NodeConfigTest { delivery.put("use100", true); subscription.put("delivery", delivery); subscription.put("privilegedSubscriber", false); + subscription.put("decompress", false); subscriptions.put(subscription); provData.put("subscriptions", subscriptions); } |