diff options
author | sunil unnava <sunil.unnava@att.com> | 2018-10-23 12:18:59 -0400 |
---|---|---|
committer | sunil unnava <sunil.unnava@att.com> | 2018-10-23 12:22:02 -0400 |
commit | 3504265229c589ecc166e3ad4c33bb198b11e4ce (patch) | |
tree | 022235018aa3ad863eaf24862543bbd509f35a21 /src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java | |
parent | 8a3dfd3fe521f18ce07c2d24202a51b28d424fa2 (diff) |
update the package name1.1.11
Issue-ID: DMAAP-858
Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c
Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java | 229 |
1 files changed, 0 insertions, 229 deletions
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java deleted file mode 100644 index 3dbf339..0000000 --- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaStreamReader.java +++ /dev/null @@ -1,229 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.resources.streamReaders; - -import java.io.IOException; -import java.io.InputStream; - -import javax.servlet.http.HttpServletResponse; - -import com.att.dmf.mr.CambriaApiException; -import com.att.dmf.mr.backends.Publisher.message; -import com.att.dmf.mr.beans.LogDetails; -import com.att.dmf.mr.resources.CambriaEventSet.reader; - -/** - * Read an optionally chunked stream in the Cambria app format. This format - * allows for speedier server-side message parsing than pure JSON. It's looks - * like:<br/> - * <br/> - * <keyLength>.<msgLength>.<key><message><br/> - * <br/> - * Whitespace before/after each entry is ignored, so messages can be delivered - * with newlines between them, or not. - * - * @author peter - * - */ -public class CambriaStreamReader implements reader { - /** - * constructor initializing InputStream with fStream - * - * @param senderStream - * @throws CambriaApiException - */ - public CambriaStreamReader(InputStream senderStream) throws CambriaApiException { - fStream = senderStream; - } - - @Override - /** - * next method iterates through msg length - * throws IOException - * throws CambriaApiException - * - */ - public message next() throws IOException, CambriaApiException { - final int keyLen = readLength(); - if (keyLen == -1) - return null; - - final int msgLen = readLength(); - final String keyPart = readString(keyLen); - final String msgPart = readString(msgLen); - - return new msg(keyPart, msgPart); - } - - private static class msg implements message { - /** - * constructor initialization - * - * @param key - * @param msg - */ - public msg(String key, String msg) { - // if no key, use the current time. This allows the message to be - // delivered - // in any order without forcing it into a single partition as empty - // string would. - if (key.length() < 1) { - key = "" + System.currentTimeMillis(); - } - - fKey = key; - fMsg = msg; - } - - @Override - /** - * @returns fkey - */ - public String getKey() { - return fKey; - } - - @Override - /** - * returns the message in String type object - */ - public String getMessage() { - return fMsg; - } - - private final String fKey; - private final String fMsg; - private LogDetails logDetails; - private boolean transactionEnabled; - - /** - * returns boolean value which - * indicates whether transaction is enabled - */ - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - /** - * sets boolean value which - * indicates whether transaction is enabled - */ - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - @Override - /** - * set log details in logDetails variable - */ - public void setLogDetails(LogDetails logDetails) { - this.logDetails = logDetails; - } - - @Override - /** - * get the log details - */ - public LogDetails getLogDetails() { - return this.logDetails; - } - - } - - private final InputStream fStream; - - /** - * max cambria length indicates message length - - // This limit is here to prevent the server from spinning on a long string of numbers - // that is delivered with 'application/cambria' as the format. The limit needs to be - // large enough to support the max message length (currently 1MB, the default Kafka - // limit) - * */ - - private static final int kMaxCambriaLength = 4*1000*1024; - - - /** - * - * @return - * @throws IOException - * @throws CambriaApiException - */ - private int readLength() throws IOException, CambriaApiException { - // always ignore leading whitespace - int c = fStream.read(); - while (Character.isWhitespace(c)) { - c = fStream.read(); - } - - if (c == -1) { - return -1; - } - - int result = 0; - while (Character.isDigit(c)) { - result = (result * 10) + (c - '0'); - if (result > kMaxCambriaLength) { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); - } - c = fStream.read(); - } - - if (c != '.') { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); - } - - return result; - } - - /** - * - * @param len - * @return - * @throws IOException - * @throws CambriaApiException - */ - private String readString(int len) throws IOException, CambriaApiException { - final byte[] buffer = new byte[len]; - - final long startMs = System.currentTimeMillis(); - final long timeoutMs = startMs + 30000; // FIXME configurable - - int readTotal = 0; - while (readTotal < len) { - final int read = fStream.read(buffer, readTotal, len - readTotal); - if (read == -1 || System.currentTimeMillis() > timeoutMs) { - // EOF - break; - } - readTotal += read; - } - - if (readTotal < len) { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, - "End of stream while reading " + len + " bytes"); - } - - return new String(buffer); - } -} |