summaryrefslogtreecommitdiffstats
path: root/datarouter-node/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datarouter-node/src/main')
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java133
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java17
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java2
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java14
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java5
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java22
-rw-r--r--datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java3
7 files changed, 154 insertions, 42 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
index b2c31691..a3af88fc 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
@@ -27,6 +27,7 @@ package org.onap.dmaap.datarouter.node;
import java.io.*;
import java.net.*;
import java.util.*;
+import java.util.zip.GZIPInputStream;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -35,6 +36,7 @@ import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
import static com.att.eelf.configuration.Configuration.*;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
/**
* A file to be delivered to a destination.
@@ -69,10 +71,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
/**
* Create a delivery task for a given delivery queue and pub ID
*
- * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as
- * the base for the file name in the spool directory and is of
- * the form <milliseconds since 1970>.<fqdn of initial data router node>
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+ * @param pubid The publish ID for this file. This is used as
+ * the base for the file name in the spool directory and is of
+ * the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
this.deliveryTaskHelper = deliveryTaskHelper;
@@ -128,6 +130,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
hdrs = hdrv.toArray(new String[hdrv.size()][]);
url = deliveryTaskHelper.getDestURL(fileid);
}
+
/**
* Is the object a DeliveryTask with the same publication ID?
*/
@@ -158,6 +161,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
public String toString() {
return (pubid);
}
+
/**
* Get the publish ID
*/
@@ -178,6 +182,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
+ if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
+ fileid = fileid.replace(".gz", "");
+ }
url = deliveryTaskHelper.getDestURL(fileid);
URL u = new URL(url);
HttpURLConnection uc = (HttpURLConnection) u.openConnection();
@@ -195,37 +202,16 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
if (expect100) {
uc.setRequestProperty("Expect", "100-continue");
}
- uc.setFixedLengthStreamingMode(length);
uc.setDoOutput(true);
- OutputStream os = null;
- try {
- os = uc.getOutputStream();
- } catch (ProtocolException pe) {
- deliveryTaskHelper.reportDeliveryExtra(this, -1L);
- // Rcvd error instead of 100-continue
- loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
- }
- if (os != null) {
- long sofar = 0;
- try (InputStream is = new FileInputStream(datafile)) {
- byte[] buf = new byte[1024 * 1024];
- while (sofar < length) {
- int i = buf.length;
- if (sofar + i > length) {
- i = (int) (length - sofar);
- }
- i = is.read(buf, 0, i);
- if (i <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += i;
- os.write(buf, 0, i);
- }
- os.close();
- } catch (IOException ioe) {
- deliveryTaskHelper.reportDeliveryExtra(this, sofar);
- throw ioe;
+ if (destInfo.isDecompress()) {
+ if (isFiletypeGzip(datafile)) {
+ sendDecompressedFile(uc);
+ } else {
+ uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
+ sendFile(uc);
}
+ } else {
+ sendFile(uc);
}
}
int rc = uc.getResponseCode();
@@ -259,12 +245,91 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
}
deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
- loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+ loggerDeliveryTask.error("Exception " + e.getStackTrace(), e);
deliveryTaskHelper.reportException(this, e);
}
}
/**
+ * To send decompressed gzip to the subscribers
+ *
+ * @param httpURLConnection connection used to make request
+ * @throws IOException
+ */
+ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+ byte[] buffer = new byte[8164];
+ httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+ OutputStream outputStream = getOutputStream(httpURLConnection);
+ if (outputStream != null) {
+ int bytesRead = 0;
+ try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+ int bufferLength = buffer.length;
+ while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ outputStream.close();
+ } catch (IOException e) {
+ httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+ loggerDeliveryTask.info("Could not decompress file");
+ sendFile(httpURLConnection);
+ }
+
+ }
+ }
+
+ /**
+ * To send any file to the subscriber.
+ *
+ * @param httpURLConnection connection used to make request
+ * @throws IOException
+ */
+ private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream os = getOutputStream(httpURLConnection);
+ if (os != null) {
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int i = buf.length;
+ if (sofar + i > length) {
+ i = (int) (length - sofar);
+ }
+ i = is.read(buf, 0, i);
+ if (i <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
+ }
+ sofar += i;
+ os.write(buf, 0, i);
+ }
+ os.close();
+ } catch (IOException ioe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Get the outputstream that will be used to send data
+ *
+ * @param httpURLConnection connection used to make request
+ * @return AN Outpustream that can be used to send your data.
+ * @throws IOException
+ */
+ private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream outputStream = null;
+
+ try {
+ outputStream = httpURLConnection.getOutputStream();
+ } catch (ProtocolException pe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+ // Rcvd error instead of 100-continue
+ loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe);
+ }
+ return outputStream;
+ }
+
+ /**
* Remove meta and data files
*/
public void clean() {
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
index c3e0057c..73753527 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
@@ -38,6 +38,7 @@ public class DestInfo {
private boolean metaonly;
private boolean use100;
private boolean privilegedSubscriber;
+ private boolean decompress;
/**
* Create a destination information object.
@@ -52,8 +53,9 @@ public class DestInfo {
* @param metaonly Is this a metadata only delivery?
* @param use100 Should I use expect 100-continue?
* @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file
+ * @param decompress To see if the they want there information compressed or decompressed
*/
- public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) {
+ public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) {
this.name = name;
this.spool = spool;
this.subid = subid;
@@ -64,6 +66,7 @@ public class DestInfo {
this.metaonly = metaonly;
this.use100 = use100;
this.privilegedSubscriber = privilegedSubscriber;
+ this.decompress = decompress;
}
/**
@@ -84,6 +87,7 @@ public class DestInfo {
this.metaonly = subscription.isMetaDataOnly();
this.use100 = subscription.isUsing100();
this.privilegedSubscriber = subscription.isPrivilegedSubscriber();
+ this.decompress = subscription.isDecompress();
}
public boolean equals(Object o) {
@@ -180,4 +184,15 @@ public class DestInfo {
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
+
+ /**
+ * Should i decompress the file before sending it on
+ *
+ * @return True if I should.
+ */
+ public boolean isDecompress() {
+ return (decompress);
+ }
+
+
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
index ff803afc..032c6ced 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
@@ -105,7 +105,7 @@ public class LogManager extends TimerTask {
public Uploader() {
dq = new DeliveryQueue(this,
new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false,
- false, false));
+ false, false, false));
setDaemon(true);
setName("Log Uploader");
start();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
index d3d3d01b..5577e52e 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
@@ -232,6 +232,7 @@ public class NodeConfig {
private boolean metaonly;
private boolean use100;
private boolean privilegedSubscriber;
+ private boolean decompress;
/**
* Construct a subscription configuration entry
@@ -245,9 +246,10 @@ public class NodeConfig {
* @param metaonly Is this a meta data only subscription?
* @param use100 Should we send Expect: 100-continue?
* @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
+ * @param decompress To see if they want their information compressed or decompressed
*/
public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
- boolean metaonly, boolean use100, boolean privilegedSubscriber) {
+ boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
@@ -256,6 +258,7 @@ public class NodeConfig {
this.metaonly = metaonly;
this.use100 = use100;
this.privilegedSubscriber = privilegedSubscriber;
+ this.decompress = decompress;
}
/**
@@ -313,6 +316,13 @@ public class NodeConfig {
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
+
+ /**
+ * Should i decompress the file before sending it on
+ */
+ public boolean isDecompress() {
+ return (decompress);
+ }
}
/**
@@ -506,7 +516,7 @@ public class NodeConfig {
}
String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
- "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false);
+ "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false, false);
(new File(di.getSpool())).mkdirs();
destInfos.add(di);
nodeinfo.put(cn, di);
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
index 35b29064..79888795 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
@@ -24,8 +24,6 @@
package org.onap.dmaap.datarouter.node;
-import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import java.io.File;
@@ -48,6 +46,8 @@ import org.jetbrains.annotations.Nullable;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
+import static org.onap.dmaap.datarouter.node.NodeUtils.*;
+
/**
* Servlet for handling all http and https requests to the data router node
* <p>
@@ -390,6 +390,7 @@ public class NodeServlet extends HttpServlet {
}
mw.close();
meta.renameTo(new File(dbase + ".M"));
+
}
resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
resp.getOutputStream().close();
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
index 2ba97163..d2ab8034 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
@@ -26,6 +26,8 @@ package org.onap.dmaap.datarouter.node;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
@@ -37,6 +39,7 @@ import java.util.Date;
import java.util.Enumeration;
import java.util.TimeZone;
import java.util.UUID;
+import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.codec.binary.Base64;
@@ -62,7 +65,7 @@ public class NodeUtils {
/**
* Base64 encode a byte array
*
- * @param raw The bytes to be encoded
+ * @param raw The bytes to be encoded
* @return The encoded string
*/
public static String base64Encode(byte[] raw) {
@@ -287,5 +290,22 @@ public class NodeUtils {
}
}
+ /**
+ * Method to check to see if file is of type gzip
+ *
+ * @param file The name of the file to be checked
+ * @return True if the file is of type gzip
+ */
+ public static boolean isFiletypeGzip(File file){
+ try(FileInputStream fileInputStream = new FileInputStream(file);
+ GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
+
+ return true;
+ }catch (IOException e){
+ nodeUtilsLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e);
+ return false;
+ }
+ }
+
}
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
index 765a4075..77c5e996 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
@@ -174,7 +174,8 @@ public class ProvData {
boolean monly = jsub.getBoolean("metadataOnly");
boolean use100 = jdel.getBoolean("use100");
boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber));
+ boolean decompress = jsub.getBoolean("decompress");
+ psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, decompress));
}
}
JSONObject jparams = jcfg.optJSONObject("parameters");