diff options
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/resources')
6 files changed, 0 insertions, 1326 deletions
diff --git a/src/main/java/com/att/nsa/cambria/resources/CambriaEventSet.java b/src/main/java/com/att/nsa/cambria/resources/CambriaEventSet.java deleted file mode 100644 index 85cc902..0000000 --- a/src/main/java/com/att/nsa/cambria/resources/CambriaEventSet.java +++ /dev/null @@ -1,114 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.resources; - -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.GZIPInputStream; - -import javax.servlet.http.HttpServletResponse; - -import com.att.nsa.apiServer.streams.ChunkedInputStream; -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Publisher.message; -import com.att.nsa.cambria.resources.streamReaders.CambriaJsonStreamReader; -import com.att.nsa.cambria.resources.streamReaders.CambriaRawStreamReader; -import com.att.nsa.cambria.resources.streamReaders.CambriaStreamReader; -import com.att.nsa.cambria.resources.streamReaders.CambriaTextStreamReader; -import com.att.nsa.drumlin.service.standards.HttpStatusCodes; - -/** - * An inbound event set. - * - * @author author - */ -public class CambriaEventSet { - private final reader fReader; - - /** - * constructor initialization - * - * @param mediaType - * @param originalStream - * @param chunked - * @param defPartition - * @throws CambriaApiException - */ - public CambriaEventSet(String mediaType, InputStream originalStream, - boolean chunked, String defPartition) throws CambriaApiException { - InputStream is = originalStream; - if (chunked) { - is = new ChunkedInputStream(originalStream); - } - - if (("application/json").equals(mediaType)) { - if (chunked) { - throw new CambriaApiException( - HttpServletResponse.SC_BAD_REQUEST, - "The JSON stream reader doesn't support chunking."); - } - fReader = new CambriaJsonStreamReader(is, defPartition); - } else if (("application/cambria").equals(mediaType)) { - fReader = new CambriaStreamReader(is); - } else if (("application/cambria-zip").equals(mediaType)) { - try { - is = new GZIPInputStream(is); - } catch (IOException e) { - throw new CambriaApiException(HttpStatusCodes.k400_badRequest, - "Couldn't read compressed format: " + e); - } - fReader = new CambriaStreamReader(is); - } else if (("text/plain").equals(mediaType)) { - fReader = new CambriaTextStreamReader(is, defPartition); - } else { - fReader = new CambriaRawStreamReader(is, defPartition); - } - } - - /** - * Get the next message from this event set. Returns null when the end of - * stream is reached. Will block until a message arrives (or the stream is - * closed/broken). - * - * @return a message, or null - * @throws IOException - * @throws CambriaApiException - */ - public message next() throws IOException, CambriaApiException { - return fReader.next(); - } - - /** - * - * @author author - * - */ - public interface reader { - /** - * - * @return - * @throws IOException - * @throws CambriaApiException - */ - message next() throws IOException, CambriaApiException; - } -} diff --git a/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java deleted file mode 100644 index 7366dde..0000000 --- a/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java +++ /dev/null @@ -1,519 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.resources; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Date; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.Consumer.Message; -import com.att.nsa.cambria.beans.DMaaPContext; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.metabroker.Topic; -import com.att.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter; -import com.att.nsa.cambria.utils.Utils; - -import jline.internal.Log; - - -/** - * class used to write the consumed messages - * - * @author author - * - */ -public class CambriaOutboundEventStream implements StreamWriter { - private static final int kTopLimit = 1024 * 4; - - /** - * - * static innerclass it takes all the input parameter for kafka consumer - * like limit, timeout, meta, pretty - * - * @author author - * - */ - public static class Builder { - - // Required - private final Consumer fConsumer; - //private final rrNvReadable fSettings; // used during write to tweak - // format, decide to explicitly - // close stream or not - - // Optional - private int fLimit; - private int fTimeoutMs; - private String fTopicFilter; - private boolean fPretty; - private boolean fWithMeta; - - // private int fOffset; - /** - * constructor it initializes all the consumer parameters - * - * @param c - * @param settings - */ - public Builder(Consumer c) { - this.fConsumer = c; - //this.fSettings = settings; - - fLimit = CambriaConstants.kNoTimeout; - fTimeoutMs = CambriaConstants.kNoLimit; - fTopicFilter = CambriaConstants.kNoFilter; - fPretty = false; - fWithMeta = false; - // fOffset = CambriaEvents.kNextOffset; - } - - /** - * - * constructor initializes with limit - * - * @param l - * only l no of messages will be consumed - * @return - */ - public Builder limit(int l) { - this.fLimit = l; - return this; - } - - /** - * constructor initializes with timeout - * - * @param t - * if there is no message to consume, them DMaaP will wait - * for t time - * @return - */ - public Builder timeout(int t) { - this.fTimeoutMs = t; - return this; - } - - /** - * constructor initializes with filter - * - * @param f - * filter - * @return - */ - public Builder filter(String f) { - this.fTopicFilter = f; - return this; - } - - /** - * constructor initializes with boolean value pretty - * - * @param p - * messages print in new line - * @return - */ - public Builder pretty(boolean p) { - fPretty = p; - return this; - } - - /** - * constructor initializes with boolean value meta - * - * @param withMeta, - * along with messages offset will print - * @return - */ - public Builder withMeta(boolean withMeta) { - fWithMeta = withMeta; - return this; - } - - // public Builder atOffset ( int pos ) - // { - // fOffset = pos; - // return this; - // } - /** - * method returs object of CambriaOutboundEventStream - * - * @return - * @throws CambriaApiException - */ - public CambriaOutboundEventStream build() throws CambriaApiException { - return new CambriaOutboundEventStream(this); - } - } - - @SuppressWarnings("unchecked") - /** - * - * @param builder - * @throws CambriaApiException - * - */ - private CambriaOutboundEventStream(Builder builder) throws CambriaApiException { - fConsumer = builder.fConsumer; - fLimit = builder.fLimit; - fTimeoutMs = builder.fTimeoutMs; - //fSettings = builder.fSettings; - fSent = 0; - fPretty = builder.fPretty; - fWithMeta = builder.fWithMeta; - -// if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) { -// fHpAlarmFilter = null; -// fHppe = null; -// } else { -// try { -// final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter)); -// HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>(); -// fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter); -// final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory(); -// fHppe = new HpProcessingEngine<HpJsonEvent>(ef); -// } catch (HpReaderException e) { -// // JSON was okay, but the filter engine says it's bogus -// throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, -// "Couldn't create filter: " + e.getMessage()); -// } catch (JSONException e) { -// // user sent a bogus JSON object -// throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, -// "Couldn't parse JSON: " + e.getMessage()); -// } -// } - } - - /** - * - * interface provides onWait and onMessage methods - * - */ - public interface operation { - /** - * Call thread.sleep - * @throws IOException - */ - void onWait() throws IOException; -/** - * provides the output based in the consumer paramter - * @param count - * @param msg - * @throws IOException - */ - void onMessage(int count, Message msg) throws IOException; - } - - /** - * - * @return - */ - public int getSentCount() { - return fSent; - } - - @Override - /** - * - * @param os - * throws IOException - */ - public void write(final OutputStream os) throws IOException { - //final boolean transactionEnabled = topic.isTransactionEnabled(); - //final boolean transactionEnabled = isTransEnabled(); - final boolean transactionEnabled = istransEnable; - os.write('['); - - fSent = forEachMessage(new operation() { - @Override - public void onMessage(int count, Message msg) throws IOException, JSONException { - - String message = ""; - JSONObject jsonMessage = null; - if (transactionEnabled) { - jsonMessage = new JSONObject(msg.getMessage()); - message = jsonMessage.getString("message"); - } - - if (count > 0) { - os.write(','); - } - - if (fWithMeta) { - final JSONObject entry = new JSONObject(); - entry.put("offset", msg.getOffset()); - entry.put("message", message); - os.write(entry.toString().getBytes()); - } else { - //os.write(message.getBytes()); - String jsonString = ""; - if(transactionEnabled){ - jsonString= JSONObject.valueToString(message); - }else{ - jsonString = JSONObject.valueToString (msg.getMessage()); - } - os.write ( jsonString.getBytes () ); - } - - if (fPretty) { - os.write('\n'); - } - - - String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); - if (null==metricTopicname) - metricTopicname="msgrtr.apinode.metrics.dmaap"; - - if (!metricTopicname.equalsIgnoreCase(topic.getName())) { - if (transactionEnabled) { - final String transactionId = jsonMessage.getString("transactionId"); - responseTransactionId = transactionId; - - StringBuilder consumerInfo = new StringBuilder(); - if (null != dmaapContext && null != dmaapContext.getRequest()) { - final HttpServletRequest request = dmaapContext.getRequest(); - consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\","); - consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\","); - consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\","); - consumerInfo.append( - "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\","); - consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\","); - } - - log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId - + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]"); - } - } - - } - - @Override - /** - * - * It makes thread to wait - * @throws IOException - */ - public void onWait() throws IOException { - os.flush(); // likely totally unnecessary for a network socket - try { - // FIXME: would be good to wait/signal - Thread.sleep(100); - } catch (InterruptedException e) { - Log.error(e.toString()); - Thread.currentThread().interrupt(); - } - } - }); - - //if (null != dmaapContext && isTransactionEnabled()) { - if (null != dmaapContext && istransEnable) { - - dmaapContext.getResponse().setHeader("transactionId", - Utils.getResponseTransactionId(responseTransactionId)); - } - - os.write(']'); - os.flush(); - - boolean close_out_stream = true; - String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream"); - if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream); - - //if (fSettings.getBoolean("close.output.stream", true)) { - if (close_out_stream) { - os.close(); - } - } - - /** - * - * @param requestURI - * @return - */ - private String getConsumerGroupFromRequest(String requestURI) { - if (null != requestURI && !requestURI.isEmpty()) { - - String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7); - - int startIndex = consumerDetails.indexOf("/") + 1; - int endIndex = consumerDetails.lastIndexOf("/"); - return consumerDetails.substring(startIndex, endIndex); - } - return null; - } -/** - * - * @param op - * @return - * @throws IOException - * @throws JSONException - */ - public int forEachMessage(operation op) throws IOException, JSONException { - final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit); - - int count = 0; - boolean firstPing = true; - - final long startMs = System.currentTimeMillis(); - final long timeoutMs = fTimeoutMs + startMs; - - while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) { - if (!firstPing) { - op.onWait(); - } - firstPing = false; - - Consumer.Message msg = null; - while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) { - - - String message = ""; - // if (topic.isTransactionEnabled() || true) { - if (istransEnable) { - // As part of DMaaP changes we are wrapping the original - // message into a json object - // and then this json object is further wrapped into message - // object before publishing, - // so extracting the original message from the message - // object for matching with filter. - final JSONObject jsonMessage = new JSONObject(msg.getMessage()); - message = jsonMessage.getString("message"); - } else { - message = msg.getMessage(); - } - - // If filters are enabled/set, message should be in JSON format - // for filters to work for - // otherwise filter will automatically ignore message in - // non-json format. - if (filterMatches(message)) { - op.onMessage(count, msg); - count++; - } - } - } - - return count; - } - - /** - * - * Checks whether filter is initialized - */ -// private boolean isFilterInitialized() { -// return (fHpAlarmFilter != null && fHppe != null); -// } - - /** - * - * @param msg - * @return - */ - private boolean filterMatches(String msg) { - boolean result = true; -// if (isFilterInitialized()) { -// try { -// final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg)); -// result = fHpAlarmFilter.matches(fHppe, e); -// } catch (JSONException x) { -// // the msg may not be JSON -// result = false; -// log.error("Failed due to " + x.getMessage()); -// } catch (Exception x) { -// log.error("Error using filter: " + x.getMessage(), x); -// } -// } - - return result; - } - - public DMaaPContext getDmaapContext() { - return dmaapContext; - } - - public void setDmaapContext(DMaaPContext dmaapContext) { - this.dmaapContext = dmaapContext; - } - - public Topic getTopic() { - return topic; - } - - public void setTopic(Topic topic) { - this.topic = topic; - } - - public void setTopicStyle(boolean aaftopic) { - this.isAAFTopic = aaftopic; - } - - public void setTransEnabled ( boolean transEnable) { - this.istransEnable = transEnable; - } - - /*private boolean isTransactionEnabled() { - //return topic.isTransactionEnabled(); - return true; // let metrics creates for all the topics - }*/ - - private boolean isTransEnabled() { - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd"); - boolean istransidreqd=false; - if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){ - istransidreqd = true; - } - - return istransidreqd; - - } - - private final Consumer fConsumer; - private final int fLimit; - private final int fTimeoutMs; - //private final rrNvReadable fSettings; - private final boolean fPretty; - private final boolean fWithMeta; - private int fSent; -// private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter; -// private final HpProcessingEngine<HpJsonEvent> fHppe; - private DMaaPContext dmaapContext; - private String responseTransactionId; - private Topic topic; - private boolean isAAFTopic = false; - private boolean istransEnable = false; - - - //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class); - - private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class); -}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaJsonStreamReader.java deleted file mode 100644 index c8172a9..0000000 --- a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaJsonStreamReader.java +++ /dev/null @@ -1,178 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.resources.streamReaders; - -import java.io.InputStream; -import java.util.logging.Logger; - -import javax.servlet.http.HttpServletResponse; - -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Publisher.message; -import com.att.nsa.cambria.beans.LogDetails; -import com.att.nsa.cambria.resources.CambriaEventSet.reader; - -import jline.internal.Log; - -/** - * - * @author author - * - */ -public class CambriaJsonStreamReader implements reader { - private static final Logger LOG = Logger.getLogger(CambriaJsonStreamReader.class.toString()); - private final JSONTokener fTokens; - private final boolean fIsList; - private long fCount; - private final String fDefPart; - public static final String kKeyField = "cambria.partition"; - - /** - * - * @param is - * @param defPart - * @throws CambriaApiException - */ - public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException { - try { - fTokens = new JSONTokener(is); - fCount = 0; - fDefPart = defPart; - - final int c = fTokens.next(); - if (c == '[') { - fIsList = true; - } else if (c == '{') { - fTokens.back(); - fIsList = false; - } else { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object."); - } - } catch (JSONException e) { - Log.error(e); - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); - } - } - - @Override - public message next() throws CambriaApiException { - try { - if (!fTokens.more()) { - return null; - } - - final int c = fTokens.next(); - - /*if (c ==','){ - fCloseCount++; - System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount); - }*/ - if (fIsList) { - if (c == ']' || (fCount > 0 && c == 10)) - return null; - - - if (fCount > 0 && c != ',' && c!= 10) { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, - "Expected ',' or closing ']' after last object."); - } - - if (fCount == 0 && c != '{' && c!= 10 && c!=32) { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object."); - } - } else if (fCount != 0 || c != '{') { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object."); - } - - if (c == '{') { - fTokens.back(); - } - final JSONObject o = new JSONObject(fTokens); - fCount++; - return new msg(o); - } catch (JSONException e) { - Log.error(e.toString()); - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); - - } - } - - private class msg implements message { - private final String fKey; - private String fMsg; - private LogDetails logDetails; - private boolean transactionEnabled; - - /** - * constructor - * - * @param o - */ - //public msg(JSONObject o){} - - - public msg(JSONObject o) { - String key = o.optString(kKeyField, fDefPart); - if (key == null) { - key = "" + System.currentTimeMillis(); - } - fKey = key; - - fMsg = o.toString().trim(); - - } - - @Override - public String getKey() { - return fKey; - } - - @Override - public String getMessage() { - return fMsg; - } - - @Override - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - @Override - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - @Override - public void setLogDetails(LogDetails logDetails) { - this.logDetails = logDetails; - } - - @Override - public LogDetails getLogDetails() { - return logDetails; - } - } -} diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaRawStreamReader.java deleted file mode 100644 index d562360..0000000 --- a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaRawStreamReader.java +++ /dev/null @@ -1,141 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.resources.streamReaders; - -import java.io.IOException; -import java.io.InputStream; - -import javax.servlet.http.HttpServletResponse; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Publisher.message; -import com.att.nsa.cambria.beans.LogDetails; -import com.att.nsa.cambria.resources.CambriaEventSet.reader; -import com.att.nsa.util.StreamTools; - -/** - * - * This stream reader reads raw bytes creating a single message. - * @author author - * - */ -public class CambriaRawStreamReader implements reader -{ - /** - * This is the constructor of CambriaRawStreamReader, it will basically the read from Input stream - * @param is - * @param defPart - * @throws CambriaApiException - */ - public CambriaRawStreamReader ( InputStream is, String defPart ) throws CambriaApiException - { - fStream = is; - fDefPart = defPart; - fClosed = false; - } - - @Override - /** - * - * next() method reads the bytes and - * iterates through the messages - * @throws CambriaApiException - * - */ - public message next () throws CambriaApiException - { - if ( fClosed ) return null; - - try - { - final byte[] rawBytes = StreamTools.readBytes ( fStream ); - fClosed = true; - return new message () - { - private LogDetails logDetails; - private boolean transactionEnabled; - - /** - * returns boolean value which - * indicates whether transaction is enabled - */ - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - /** - * sets boolean value which - * indicates whether transaction is enabled - */ - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - @Override - /** - * @returns key - * It ch4ecks whether fDefPart value is Null. - * If yes, it will return ystem.currentTimeMillis () else - * it will return fDefPart variable value - */ - public String getKey () - { - return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart; - } - - @Override - /** - * returns the message in String type object - */ - public String getMessage () - { - return new String ( rawBytes ); - } - - /** - * set log details in logDetails variable - */ - @Override - public void setLogDetails(LogDetails logDetails) { - this.logDetails = logDetails; - } - - @Override - /** - * get the log details - */ - public LogDetails getLogDetails() { - return this.logDetails; - } - }; - } - catch ( IOException e ) - { - throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.toString()); - } - } - - private final InputStream fStream; - private final String fDefPart; - private boolean fClosed; - //private String transactionId; -} diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaStreamReader.java deleted file mode 100644 index 38359f0..0000000 --- a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaStreamReader.java +++ /dev/null @@ -1,229 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.resources.streamReaders; - -import java.io.IOException; -import java.io.InputStream; - -import javax.servlet.http.HttpServletResponse; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Publisher.message; -import com.att.nsa.cambria.beans.LogDetails; -import com.att.nsa.cambria.resources.CambriaEventSet.reader; - -/** - * Read an optionally chunked stream in the Cambria app format. This format - * allows for speedier server-side message parsing than pure JSON. It's looks - * like:<br/> - * <br/> - * <keyLength>.<msgLength>.<key><message><br/> - * <br/> - * Whitespace before/after each entry is ignored, so messages can be delivered - * with newlines between them, or not. - * - * @author author - * - */ -public class CambriaStreamReader implements reader { - /** - * constructor initializing InputStream with fStream - * - * @param senderStream - * @throws CambriaApiException - */ - public CambriaStreamReader(InputStream senderStream) throws CambriaApiException { - fStream = senderStream; - } - - @Override - /** - * next method iterates through msg length - * throws IOException - * throws CambriaApiException - * - */ - public message next() throws IOException, CambriaApiException { - final int keyLen = readLength(); - if (keyLen == -1) - return null; - - final int msgLen = readLength(); - final String keyPart = readString(keyLen); - final String msgPart = readString(msgLen); - - return new msg(keyPart, msgPart); - } - - private static class msg implements message { - /** - * constructor initialization - * - * @param key - * @param msg - */ - public msg(String key, String msg) { - // if no key, use the current time. This allows the message to be - // delivered - // in any order without forcing it into a single partition as empty - // string would. - if (key.length() < 1) { - key = "" + System.currentTimeMillis(); - } - - fKey = key; - fMsg = msg; - } - - @Override - /** - * @returns fkey - */ - public String getKey() { - return fKey; - } - - @Override - /** - * returns the message in String type object - */ - public String getMessage() { - return fMsg; - } - - private final String fKey; - private final String fMsg; - private LogDetails logDetails; - private boolean transactionEnabled; - - /** - * returns boolean value which - * indicates whether transaction is enabled - */ - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - /** - * sets boolean value which - * indicates whether transaction is enabled - */ - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - @Override - /** - * set log details in logDetails variable - */ - public void setLogDetails(LogDetails logDetails) { - this.logDetails = logDetails; - } - - @Override - /** - * get the log details - */ - public LogDetails getLogDetails() { - return this.logDetails; - } - - } - - private final InputStream fStream; - - /** - * max cambria length indicates message length - - // This limit is here to prevent the server from spinning on a long string of numbers - // that is delivered with 'application/cambria' as the format. The limit needs to be - // large enough to support the max message length (currently 1MB, the default Kafka - // limit) - * */ - - private static final int kMaxCambriaLength = 4*1000*1024; - - - /** - * - * @return - * @throws IOException - * @throws CambriaApiException - */ - private int readLength() throws IOException, CambriaApiException { - // always ignore leading whitespace - int c = fStream.read(); - while (Character.isWhitespace(c)) { - c = fStream.read(); - } - - if (c == -1) { - return -1; - } - - int result = 0; - while (Character.isDigit(c)) { - result = (result * 10) + (c - '0'); - if (result > kMaxCambriaLength) { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); - } - c = fStream.read(); - } - - if (c != '.') { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); - } - - return result; - } - - /** - * - * @param len - * @return - * @throws IOException - * @throws CambriaApiException - */ - private String readString(int len) throws IOException, CambriaApiException { - final byte[] buffer = new byte[len]; - - final long startMs = System.currentTimeMillis(); - final long timeoutMs = startMs + 30000; // FIXME configurable - - int readTotal = 0; - while (readTotal < len) { - final int read = fStream.read(buffer, readTotal, len - readTotal); - if (read == -1 || System.currentTimeMillis() > timeoutMs) { - // EOF - break; - } - readTotal += read; - } - - if (readTotal < len) { - throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, - "End of stream while reading " + len + " bytes"); - } - - return new String(buffer); - } -} diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaTextStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaTextStreamReader.java deleted file mode 100644 index 41b9275..0000000 --- a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaTextStreamReader.java +++ /dev/null @@ -1,145 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.resources.streamReaders; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.logging.Logger; - -import javax.servlet.http.HttpServletResponse; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Publisher.message; -import com.att.nsa.cambria.beans.LogDetails; -import com.att.nsa.cambria.resources.CambriaEventSet.reader; - -import jline.internal.Log; - -/** - * This stream reader just pulls single lines. It uses the default partition if provided. If - * not, the key is the current time, which does not guarantee ordering. - * - * @author author - * - */ -public class CambriaTextStreamReader implements reader -{ - private Logger log = Logger.getLogger(CambriaTextStreamReader.class.toString()); - /** - * This is the constructor for Cambria Text Reader format - * @param is - * @param defPart - * @throws CambriaApiException - */ - public CambriaTextStreamReader ( InputStream is, String defPart ) throws CambriaApiException - { - fReader = new BufferedReader ( new InputStreamReader ( is ) ); - fDefPart = defPart; - } - - @Override - /** - * next() method iterates through msg length - * throws IOException - * throws CambriaApiException - * - */ - public message next () throws CambriaApiException - { - try - { - final String line = fReader.readLine (); - if ( line == null ) return null; - - return new message () - { - private LogDetails logDetails; - private boolean transactionEnabled; - - /** - * returns boolean value which - * indicates whether transaction is enabled - * @return - */ - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - /** - * sets boolean value which - * indicates whether transaction is enabled - */ - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - @Override - /** - * @returns key - * It ch4ecks whether fDefPart value is Null. - * If yes, it will return ystem.currentTimeMillis () else - * it will return fDefPart variable value - */ - public String getKey () - { - return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart; - } - - @Override - /** - * returns the message in String type object - * @return - */ - public String getMessage () - { - return line; - } - - @Override - /** - * set log details in logDetails variable - */ - public void setLogDetails(LogDetails logDetails) { - this.logDetails = logDetails; - } - - @Override - /** - * get the log details - */ - public LogDetails getLogDetails() { - return this.logDetails; - } - }; - } - catch ( IOException e ) - { - Log.error(e); - throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () ); - } - } - - private final BufferedReader fReader; - private final String fDefPart; -} |