summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/resources/streamReaders
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-10-23 12:18:59 -0400
committersunil unnava <sunil.unnava@att.com>2018-10-23 12:22:02 -0400
commit3504265229c589ecc166e3ad4c33bb198b11e4ce (patch)
tree022235018aa3ad863eaf24862543bbd509f35a21 /src/main/java/com/att/dmf/mr/resources/streamReaders
parent8a3dfd3fe521f18ce07c2d24202a51b28d424fa2 (diff)
update the package name1.1.11
Issue-ID: DMAAP-858 Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/resources/streamReaders')
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java169
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java141
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java229
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaTextStreamReader.java140
4 files changed, 0 insertions, 679 deletions
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
deleted file mode 100644
index 7a67c92..0000000
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.resources.streamReaders;
-
-import java.io.InputStream;
-
-import javax.servlet.http.HttpServletResponse;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Publisher.message;
-import com.att.dmf.mr.beans.LogDetails;
-import com.att.dmf.mr.resources.CambriaEventSet.reader;
-
-/**
- *
- * @author anowarul.islam
- *
- */
-public class CambriaJsonStreamReader implements reader {
- private final JSONTokener fTokens;
- private final boolean fIsList;
- private long fCount;
- private final String fDefPart;
- public static final String kKeyField = "cambria.partition";
-
- /**
- *
- * @param is
- * @param defPart
- * @throws CambriaApiException
- */
- public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException {
- try {
- fTokens = new JSONTokener(is);
- fCount = 0;
- fDefPart = defPart;
-
- final int c = fTokens.next();
- if (c == '[') {
- fIsList = true;
- } else if (c == '{') {
- fTokens.back();
- fIsList = false;
- } else {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object.");
- }
- } catch (JSONException e) {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
- }
- }
-
- @Override
- public message next() throws CambriaApiException {
- try {
- if (!fTokens.more()) {
- return null;
- }
-
- final int c = fTokens.next();
-
-
- if (fIsList) {
- if (c == ']' || (fCount > 0 && c == 10))
- return null;
-
-
- if (fCount > 0 && c != ',' && c!= 10) {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
- "Expected ',' or closing ']' after last object.");
- }
-
- if (fCount == 0 && c != '{' && c!= 10 && c!=32) {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object.");
- }
- } else if (fCount != 0 || c != '{') {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object.");
- }
-
- if (c == '{') {
- fTokens.back();
- }
- final JSONObject o = new JSONObject(fTokens);
- fCount++;
- return new msg(o);
- } catch (JSONException e) {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
-
- }
- }
-
- private class msg implements message {
- private final String fKey;
- private String fMsg;
- private LogDetails logDetails;
- private boolean transactionEnabled;
-
- /**
- * constructor
- *
- * @param o
- */
-
-
-
- public msg(JSONObject o) {
- String key = o.optString(kKeyField, fDefPart);
- if (key == null) {
- key = "" + System.currentTimeMillis();
- }
- fKey = key;
-
- fMsg = o.toString().trim();
-
- }
-
- @Override
- public String getKey() {
- return fKey;
- }
-
- @Override
- public String getMessage() {
- return fMsg;
- }
-
- @Override
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- @Override
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- @Override
- public void setLogDetails(LogDetails logDetails) {
- this.logDetails = logDetails;
- }
-
- @Override
- public LogDetails getLogDetails() {
- return logDetails;
- }
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
deleted file mode 100644
index f64c0de..0000000
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.resources.streamReaders;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.servlet.http.HttpServletResponse;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Publisher.message;
-import com.att.dmf.mr.beans.LogDetails;
-import com.att.dmf.mr.resources.CambriaEventSet.reader;
-import com.att.nsa.util.StreamTools;
-
-/**
- *
- * This stream reader reads raw bytes creating a single message.
- * @author peter
- *
- */
-public class CambriaRawStreamReader implements reader
-{
- /**
- * This is the constructor of CambriaRawStreamReader, it will basically the read from Input stream
- * @param is
- * @param defPart
- * @throws CambriaApiException
- */
- public CambriaRawStreamReader ( InputStream is, String defPart ) throws CambriaApiException
- {
- fStream = is;
- fDefPart = defPart;
- fClosed = false;
- }
-
- @Override
- /**
- *
- * next() method reads the bytes and
- * iterates through the messages
- * @throws CambriaApiException
- *
- */
- public message next () throws CambriaApiException
- {
- if ( fClosed ) return null;
-
- try
- {
- final byte[] rawBytes = StreamTools.readBytes ( fStream );
- fClosed = true;
- return new message ()
- {
- private LogDetails logDetails;
- private boolean transactionEnabled;
-
- /**
- * returns boolean value which
- * indicates whether transaction is enabled
- */
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- /**
- * sets boolean value which
- * indicates whether transaction is enabled
- */
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- @Override
- /**
- * @returns key
- * It ch4ecks whether fDefPart value is Null.
- * If yes, it will return ystem.currentTimeMillis () else
- * it will return fDefPart variable value
- */
- public String getKey ()
- {
- return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart;
- }
-
- @Override
- /**
- * returns the message in String type object
- */
- public String getMessage ()
- {
- return new String ( rawBytes );
- }
-
- /**
- * set log details in logDetails variable
- */
- @Override
- public void setLogDetails(LogDetails logDetails) {
- this.logDetails = logDetails;
- }
-
- @Override
- /**
- * get the log details
- */
- public LogDetails getLogDetails() {
- return this.logDetails;
- }
- };
- }
- catch ( IOException e )
- {
- throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () );
- }
- }
-
- private final InputStream fStream;
- private final String fDefPart;
- private boolean fClosed;
-
-}
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java
deleted file mode 100644
index 3dbf339..0000000
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.resources.streamReaders;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.servlet.http.HttpServletResponse;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Publisher.message;
-import com.att.dmf.mr.beans.LogDetails;
-import com.att.dmf.mr.resources.CambriaEventSet.reader;
-
-/**
- * Read an optionally chunked stream in the Cambria app format. This format
- * allows for speedier server-side message parsing than pure JSON. It's looks
- * like:<br/>
- * <br/>
- * &lt;keyLength&gt;.&lt;msgLength&gt;.&lt;key&gt;&lt;message&gt;<br/>
- * <br/>
- * Whitespace before/after each entry is ignored, so messages can be delivered
- * with newlines between them, or not.
- *
- * @author peter
- *
- */
-public class CambriaStreamReader implements reader {
- /**
- * constructor initializing InputStream with fStream
- *
- * @param senderStream
- * @throws CambriaApiException
- */
- public CambriaStreamReader(InputStream senderStream) throws CambriaApiException {
- fStream = senderStream;
- }
-
- @Override
- /**
- * next method iterates through msg length
- * throws IOException
- * throws CambriaApiException
- *
- */
- public message next() throws IOException, CambriaApiException {
- final int keyLen = readLength();
- if (keyLen == -1)
- return null;
-
- final int msgLen = readLength();
- final String keyPart = readString(keyLen);
- final String msgPart = readString(msgLen);
-
- return new msg(keyPart, msgPart);
- }
-
- private static class msg implements message {
- /**
- * constructor initialization
- *
- * @param key
- * @param msg
- */
- public msg(String key, String msg) {
- // if no key, use the current time. This allows the message to be
- // delivered
- // in any order without forcing it into a single partition as empty
- // string would.
- if (key.length() < 1) {
- key = "" + System.currentTimeMillis();
- }
-
- fKey = key;
- fMsg = msg;
- }
-
- @Override
- /**
- * @returns fkey
- */
- public String getKey() {
- return fKey;
- }
-
- @Override
- /**
- * returns the message in String type object
- */
- public String getMessage() {
- return fMsg;
- }
-
- private final String fKey;
- private final String fMsg;
- private LogDetails logDetails;
- private boolean transactionEnabled;
-
- /**
- * returns boolean value which
- * indicates whether transaction is enabled
- */
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- /**
- * sets boolean value which
- * indicates whether transaction is enabled
- */
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- @Override
- /**
- * set log details in logDetails variable
- */
- public void setLogDetails(LogDetails logDetails) {
- this.logDetails = logDetails;
- }
-
- @Override
- /**
- * get the log details
- */
- public LogDetails getLogDetails() {
- return this.logDetails;
- }
-
- }
-
- private final InputStream fStream;
-
- /**
- * max cambria length indicates message length
-
- // This limit is here to prevent the server from spinning on a long string of numbers
- // that is delivered with 'application/cambria' as the format. The limit needs to be
- // large enough to support the max message length (currently 1MB, the default Kafka
- // limit)
- * */
-
- private static final int kMaxCambriaLength = 4*1000*1024;
-
-
- /**
- *
- * @return
- * @throws IOException
- * @throws CambriaApiException
- */
- private int readLength() throws IOException, CambriaApiException {
- // always ignore leading whitespace
- int c = fStream.read();
- while (Character.isWhitespace(c)) {
- c = fStream.read();
- }
-
- if (c == -1) {
- return -1;
- }
-
- int result = 0;
- while (Character.isDigit(c)) {
- result = (result * 10) + (c - '0');
- if (result > kMaxCambriaLength) {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length.");
- }
- c = fStream.read();
- }
-
- if (c != '.') {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length.");
- }
-
- return result;
- }
-
- /**
- *
- * @param len
- * @return
- * @throws IOException
- * @throws CambriaApiException
- */
- private String readString(int len) throws IOException, CambriaApiException {
- final byte[] buffer = new byte[len];
-
- final long startMs = System.currentTimeMillis();
- final long timeoutMs = startMs + 30000; // FIXME configurable
-
- int readTotal = 0;
- while (readTotal < len) {
- final int read = fStream.read(buffer, readTotal, len - readTotal);
- if (read == -1 || System.currentTimeMillis() > timeoutMs) {
- // EOF
- break;
- }
- readTotal += read;
- }
-
- if (readTotal < len) {
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
- "End of stream while reading " + len + " bytes");
- }
-
- return new String(buffer);
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaTextStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaTextStreamReader.java
deleted file mode 100644
index b06e17a..0000000
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaTextStreamReader.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.resources.streamReaders;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import javax.servlet.http.HttpServletResponse;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Publisher.message;
-import com.att.dmf.mr.beans.LogDetails;
-import com.att.dmf.mr.resources.CambriaEventSet.reader;
-
-/**
- * This stream reader just pulls single lines. It uses the default partition if provided. If
- * not, the key is the current time, which does not guarantee ordering.
- *
- * @author peter
- *
- */
-public class CambriaTextStreamReader implements reader
-{
- /**
- * This is the constructor for Cambria Text Reader format
- * @param is
- * @param defPart
- * @throws CambriaApiException
- */
- public CambriaTextStreamReader ( InputStream is, String defPart ) throws CambriaApiException
- {
- fReader = new BufferedReader ( new InputStreamReader ( is ) );
- fDefPart = defPart;
- }
-
- @Override
- /**
- * next() method iterates through msg length
- * throws IOException
- * throws CambriaApiException
- *
- */
- public message next () throws CambriaApiException
- {
- try
- {
- final String line = fReader.readLine ();
- if ( line == null ) return null;
-
- return new message ()
- {
- private LogDetails logDetails;
- private boolean transactionEnabled;
-
- /**
- * returns boolean value which
- * indicates whether transaction is enabled
- * @return
- */
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- /**
- * sets boolean value which
- * indicates whether transaction is enabled
- */
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- @Override
- /**
- * @returns key
- * It ch4ecks whether fDefPart value is Null.
- * If yes, it will return ystem.currentTimeMillis () else
- * it will return fDefPart variable value
- */
- public String getKey ()
- {
- return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart;
- }
-
- @Override
- /**
- * returns the message in String type object
- * @return
- */
- public String getMessage ()
- {
- return line;
- }
-
- @Override
- /**
- * set log details in logDetails variable
- */
- public void setLogDetails(LogDetails logDetails) {
- this.logDetails = logDetails;
- }
-
- @Override
- /**
- * get the log details
- */
- public LogDetails getLogDetails() {
- return this.logDetails;
- }
- };
- }
- catch ( IOException e )
- {
- throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () );
- }
- }
-
- private final BufferedReader fReader;
- private final String fDefPart;
-}