diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/resources/streamReaders/CambriaStreamReader.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/resources/streamReaders/CambriaStreamReader.java | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/resources/streamReaders/CambriaStreamReader.java b/src/main/java/org/onap/dmaap/dmf/mr/resources/streamReaders/CambriaStreamReader.java new file mode 100644 index 0000000..d786804 --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/resources/streamReaders/CambriaStreamReader.java @@ -0,0 +1,228 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.dmf.mr.resources.streamReaders; + +import org.onap.dmaap.dmf.mr.CambriaApiException; +import org.onap.dmaap.dmf.mr.backends.Publisher.message; +import org.onap.dmaap.dmf.mr.beans.LogDetails; +import org.onap.dmaap.dmf.mr.resources.CambriaEventSet.reader; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.InputStream; + +/** + * Read an optionally chunked stream in the Cambria app format. This format + * allows for speedier server-side message parsing than pure JSON. It's looks + * like:<br/> + * <br/> + * <keyLength>.<msgLength>.<key><message><br/> + * <br/> + * Whitespace before/after each entry is ignored, so messages can be delivered + * with newlines between them, or not. + * + * @author peter + * + */ +public class CambriaStreamReader implements reader { + /** + * constructor initializing InputStream with fStream + * + * @param senderStream + * @throws CambriaApiException + */ + public CambriaStreamReader(InputStream senderStream) throws CambriaApiException { + fStream = senderStream; + } + + @Override + /** + * next method iterates through msg length + * throws IOException + * throws CambriaApiException + * + */ + public message next() throws IOException, CambriaApiException { + final int keyLen = readLength(); + if (keyLen == -1) + return null; + + final int msgLen = readLength(); + final String keyPart = readString(keyLen); + final String msgPart = readString(msgLen); + + return new msg(keyPart, msgPart); + } + + private static class msg implements message { + /** + * constructor initialization + * + * @param key + * @param msg + */ + public msg(String key, String msg) { + // if no key, use the current time. This allows the message to be + // delivered + // in any order without forcing it into a single partition as empty + // string would. + if (key.length() < 1) { + key = "" + System.currentTimeMillis(); + } + + fKey = key; + fMsg = msg; + } + + @Override + /** + * @returns fkey + */ + public String getKey() { + return fKey; + } + + @Override + /** + * returns the message in String type object + */ + public String getMessage() { + return fMsg; + } + + private final String fKey; + private final String fMsg; + private LogDetails logDetails; + private boolean transactionEnabled; + + /** + * returns boolean value which + * indicates whether transaction is enabled + */ + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + /** + * sets boolean value which + * indicates whether transaction is enabled + */ + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + @Override + /** + * set log details in logDetails variable + */ + public void setLogDetails(LogDetails logDetails) { + this.logDetails = logDetails; + } + + @Override + /** + * get the log details + */ + public LogDetails getLogDetails() { + return this.logDetails; + } + + } + + private final InputStream fStream; + + /** + * max cambria length indicates message length + + // This limit is here to prevent the server from spinning on a long string of numbers + // that is delivered with 'application/cambria' as the format. The limit needs to be + // large enough to support the max message length (currently 1MB, the default Kafka + // limit) + * */ + + private static final int kMaxCambriaLength = 4*1000*1024; + + + /** + * + * @return + * @throws IOException + * @throws CambriaApiException + */ + private int readLength() throws IOException, CambriaApiException { + // always ignore leading whitespace + int c = fStream.read(); + while (Character.isWhitespace(c)) { + c = fStream.read(); + } + + if (c == -1) { + return -1; + } + + int result = 0; + while (Character.isDigit(c)) { + result = (result * 10) + (c - '0'); + if (result > kMaxCambriaLength) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); + } + c = fStream.read(); + } + + if (c != '.') { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); + } + + return result; + } + + /** + * + * @param len + * @return + * @throws IOException + * @throws CambriaApiException + */ + private String readString(int len) throws IOException, CambriaApiException { + final byte[] buffer = new byte[len]; + + final long startMs = System.currentTimeMillis(); + final long timeoutMs = startMs + 30000; // FIXME configurable + + int readTotal = 0; + while (readTotal < len) { + final int read = fStream.read(buffer, readTotal, len - readTotal); + if (read == -1 || System.currentTimeMillis() > timeoutMs) { + // EOF + break; + } + readTotal += read; + } + + if (readTotal < len) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, + "End of stream while reading " + len + " bytes"); + } + + return new String(buffer); + } +} |