diff options
author | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:52:33 -0500 |
---|---|---|
committer | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:52:50 -0500 |
commit | 3fc19dc9157f4d05bdbd6fd05a52f0592268c4e7 (patch) | |
tree | 69355ec5a2a03a1867862e6b757b51c45763ef1a /src/main/java/com/att/nsa/cambria/resources | |
parent | ca63da6e0cb7fb63e231343d0b52a40036f6b6aa (diff) |
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven
group id only.
Issue-id: DMAAP-74
Change-Id: Ic741b602ade60f108d940c0571a1d94b7be2abc2
Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/resources')
6 files changed, 1312 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/cambria/resources/CambriaEventSet.java b/src/main/java/com/att/nsa/cambria/resources/CambriaEventSet.java new file mode 100644 index 0000000..85cc902 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/resources/CambriaEventSet.java @@ -0,0 +1,114 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.resources; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + +import javax.servlet.http.HttpServletResponse; + +import com.att.nsa.apiServer.streams.ChunkedInputStream; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.resources.streamReaders.CambriaJsonStreamReader; +import com.att.nsa.cambria.resources.streamReaders.CambriaRawStreamReader; +import com.att.nsa.cambria.resources.streamReaders.CambriaStreamReader; +import com.att.nsa.cambria.resources.streamReaders.CambriaTextStreamReader; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; + +/** + * An inbound event set. + * + * @author author + */ +public class CambriaEventSet { + private final reader fReader; + + /** + * constructor initialization + * + * @param mediaType + * @param originalStream + * @param chunked + * @param defPartition + * @throws CambriaApiException + */ + public CambriaEventSet(String mediaType, InputStream originalStream, + boolean chunked, String defPartition) throws CambriaApiException { + InputStream is = originalStream; + if (chunked) { + is = new ChunkedInputStream(originalStream); + } + + if (("application/json").equals(mediaType)) { + if (chunked) { + throw new CambriaApiException( + HttpServletResponse.SC_BAD_REQUEST, + "The JSON stream reader doesn't support chunking."); + } + fReader = new CambriaJsonStreamReader(is, defPartition); + } else if (("application/cambria").equals(mediaType)) { + fReader = new CambriaStreamReader(is); + } else if (("application/cambria-zip").equals(mediaType)) { + try { + is = new GZIPInputStream(is); + } catch (IOException e) { + throw new CambriaApiException(HttpStatusCodes.k400_badRequest, + "Couldn't read compressed format: " + e); + } + fReader = new CambriaStreamReader(is); + } else if (("text/plain").equals(mediaType)) { + fReader = new CambriaTextStreamReader(is, defPartition); + } else { + fReader = new CambriaRawStreamReader(is, defPartition); + } + } + + /** + * Get the next message from this event set. Returns null when the end of + * stream is reached. Will block until a message arrives (or the stream is + * closed/broken). + * + * @return a message, or null + * @throws IOException + * @throws CambriaApiException + */ + public message next() throws IOException, CambriaApiException { + return fReader.next(); + } + + /** + * + * @author author + * + */ + public interface reader { + /** + * + * @return + * @throws IOException + * @throws CambriaApiException + */ + message next() throws IOException, CambriaApiException; + } +} diff --git a/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java new file mode 100644 index 0000000..e519f71 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java @@ -0,0 +1,516 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.resources; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Date; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.Consumer.Message; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter; +import com.att.nsa.cambria.utils.Utils; + + +/** + * class used to write the consumed messages + * + * @author author + * + */ +public class CambriaOutboundEventStream implements StreamWriter { + private static final int kTopLimit = 1024 * 4; + + /** + * + * static innerclass it takes all the input parameter for kafka consumer + * like limit, timeout, meta, pretty + * + * @author author + * + */ + public static class Builder { + + // Required + private final Consumer fConsumer; + //private final rrNvReadable fSettings; // used during write to tweak + // format, decide to explicitly + // close stream or not + + // Optional + private int fLimit; + private int fTimeoutMs; + private String fTopicFilter; + private boolean fPretty; + private boolean fWithMeta; + + // private int fOffset; + /** + * constructor it initializes all the consumer parameters + * + * @param c + * @param settings + */ + public Builder(Consumer c) { + this.fConsumer = c; + //this.fSettings = settings; + + fLimit = CambriaConstants.kNoTimeout; + fTimeoutMs = CambriaConstants.kNoLimit; + fTopicFilter = CambriaConstants.kNoFilter; + fPretty = false; + fWithMeta = false; + // fOffset = CambriaEvents.kNextOffset; + } + + /** + * + * constructor initializes with limit + * + * @param l + * only l no of messages will be consumed + * @return + */ + public Builder limit(int l) { + this.fLimit = l; + return this; + } + + /** + * constructor initializes with timeout + * + * @param t + * if there is no message to consume, them DMaaP will wait + * for t time + * @return + */ + public Builder timeout(int t) { + this.fTimeoutMs = t; + return this; + } + + /** + * constructor initializes with filter + * + * @param f + * filter + * @return + */ + public Builder filter(String f) { + this.fTopicFilter = f; + return this; + } + + /** + * constructor initializes with boolean value pretty + * + * @param p + * messages print in new line + * @return + */ + public Builder pretty(boolean p) { + fPretty = p; + return this; + } + + /** + * constructor initializes with boolean value meta + * + * @param withMeta, + * along with messages offset will print + * @return + */ + public Builder withMeta(boolean withMeta) { + fWithMeta = withMeta; + return this; + } + + // public Builder atOffset ( int pos ) + // { + // fOffset = pos; + // return this; + // } + /** + * method returs object of CambriaOutboundEventStream + * + * @return + * @throws CambriaApiException + */ + public CambriaOutboundEventStream build() throws CambriaApiException { + return new CambriaOutboundEventStream(this); + } + } + + @SuppressWarnings("unchecked") + /** + * + * @param builder + * @throws CambriaApiException + * + */ + private CambriaOutboundEventStream(Builder builder) throws CambriaApiException { + fConsumer = builder.fConsumer; + fLimit = builder.fLimit; + fTimeoutMs = builder.fTimeoutMs; + //fSettings = builder.fSettings; + fSent = 0; + fPretty = builder.fPretty; + fWithMeta = builder.fWithMeta; + +// if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) { +// fHpAlarmFilter = null; +// fHppe = null; +// } else { +// try { +// final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter)); +// HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>(); +// fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter); +// final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory(); +// fHppe = new HpProcessingEngine<HpJsonEvent>(ef); +// } catch (HpReaderException e) { +// // JSON was okay, but the filter engine says it's bogus +// throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, +// "Couldn't create filter: " + e.getMessage()); +// } catch (JSONException e) { +// // user sent a bogus JSON object +// throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, +// "Couldn't parse JSON: " + e.getMessage()); +// } +// } + } + + /** + * + * interface provides onWait and onMessage methods + * + */ + public interface operation { + /** + * Call thread.sleep + * @throws IOException + */ + void onWait() throws IOException; +/** + * provides the output based in the consumer paramter + * @param count + * @param msg + * @throws IOException + */ + void onMessage(int count, Message msg) throws IOException; + } + + /** + * + * @return + */ + public int getSentCount() { + return fSent; + } + + @Override + /** + * + * @param os + * throws IOException + */ + public void write(final OutputStream os) throws IOException { + //final boolean transactionEnabled = topic.isTransactionEnabled(); + //final boolean transactionEnabled = isTransEnabled(); + final boolean transactionEnabled = istransEnable; + os.write('['); + + fSent = forEachMessage(new operation() { + @Override + public void onMessage(int count, Message msg) throws IOException, JSONException { + + String message = ""; + JSONObject jsonMessage = null; + if (transactionEnabled) { + jsonMessage = new JSONObject(msg.getMessage()); + message = jsonMessage.getString("message"); + } + + if (count > 0) { + os.write(','); + } + + if (fWithMeta) { + final JSONObject entry = new JSONObject(); + entry.put("offset", msg.getOffset()); + entry.put("message", message); + os.write(entry.toString().getBytes()); + } else { + //os.write(message.getBytes()); + String jsonString = ""; + if(transactionEnabled){ + jsonString= JSONObject.valueToString(message); + }else{ + jsonString = JSONObject.valueToString (msg.getMessage()); + } + os.write ( jsonString.getBytes () ); + } + + if (fPretty) { + os.write('\n'); + } + + + String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); + if (null==metricTopicname) + metricTopicname="msgrtr.apinode.metrics.dmaap"; + + if (!metricTopicname.equalsIgnoreCase(topic.getName())) { + if (transactionEnabled) { + final String transactionId = jsonMessage.getString("transactionId"); + responseTransactionId = transactionId; + + StringBuilder consumerInfo = new StringBuilder(); + if (null != dmaapContext && null != dmaapContext.getRequest()) { + final HttpServletRequest request = dmaapContext.getRequest(); + consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\","); + consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\","); + consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\","); + consumerInfo.append( + "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\","); + consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\","); + } + + log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId + + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]"); + } + } + + } + + @Override + /** + * + * It makes thread to wait + * @throws IOException + */ + public void onWait() throws IOException { + os.flush(); // likely totally unnecessary for a network socket + try { + // FIXME: would be good to wait/signal + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + } + }); + + //if (null != dmaapContext && isTransactionEnabled()) { + if (null != dmaapContext && istransEnable) { + + dmaapContext.getResponse().setHeader("transactionId", + Utils.getResponseTransactionId(responseTransactionId)); + } + + os.write(']'); + os.flush(); + + boolean close_out_stream = true; + String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream"); + if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream); + + //if (fSettings.getBoolean("close.output.stream", true)) { + if (close_out_stream) { + os.close(); + } + } + + /** + * + * @param requestURI + * @return + */ + private String getConsumerGroupFromRequest(String requestURI) { + if (null != requestURI && !requestURI.isEmpty()) { + + String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7); + + int startIndex = consumerDetails.indexOf("/") + 1; + int endIndex = consumerDetails.lastIndexOf("/"); + return consumerDetails.substring(startIndex, endIndex); + } + return null; + } +/** + * + * @param op + * @return + * @throws IOException + * @throws JSONException + */ + public int forEachMessage(operation op) throws IOException, JSONException { + final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit); + + int count = 0; + boolean firstPing = true; + + final long startMs = System.currentTimeMillis(); + final long timeoutMs = fTimeoutMs + startMs; + + while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) { + if (!firstPing) { + op.onWait(); + } + firstPing = false; + + Consumer.Message msg = null; + while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) { + + + String message = ""; + // if (topic.isTransactionEnabled() || true) { + if (istransEnable) { + // As part of DMaaP changes we are wrapping the original + // message into a json object + // and then this json object is further wrapped into message + // object before publishing, + // so extracting the original message from the message + // object for matching with filter. + final JSONObject jsonMessage = new JSONObject(msg.getMessage()); + message = jsonMessage.getString("message"); + } else { + message = msg.getMessage(); + } + + // If filters are enabled/set, message should be in JSON format + // for filters to work for + // otherwise filter will automatically ignore message in + // non-json format. + if (filterMatches(message)) { + op.onMessage(count, msg); + count++; + } + } + } + + return count; + } + + /** + * + * Checks whether filter is initialized + */ +// private boolean isFilterInitialized() { +// return (fHpAlarmFilter != null && fHppe != null); +// } + + /** + * + * @param msg + * @return + */ + private boolean filterMatches(String msg) { + boolean result = true; +// if (isFilterInitialized()) { +// try { +// final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg)); +// result = fHpAlarmFilter.matches(fHppe, e); +// } catch (JSONException x) { +// // the msg may not be JSON +// result = false; +// log.error("Failed due to " + x.getMessage()); +// } catch (Exception x) { +// log.error("Error using filter: " + x.getMessage(), x); +// } +// } + + return result; + } + + public DMaaPContext getDmaapContext() { + return dmaapContext; + } + + public void setDmaapContext(DMaaPContext dmaapContext) { + this.dmaapContext = dmaapContext; + } + + public Topic getTopic() { + return topic; + } + + public void setTopic(Topic topic) { + this.topic = topic; + } + + public void setTopicStyle(boolean aaftopic) { + this.isAAFTopic = aaftopic; + } + + public void setTransEnabled ( boolean transEnable) { + this.istransEnable = transEnable; + } + + /*private boolean isTransactionEnabled() { + //return topic.isTransactionEnabled(); + return true; // let metrics creates for all the topics + }*/ + + private boolean isTransEnabled() { + String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd"); + boolean istransidreqd=false; + if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){ + istransidreqd = true; + } + + return istransidreqd; + + } + + private final Consumer fConsumer; + private final int fLimit; + private final int fTimeoutMs; + //private final rrNvReadable fSettings; + private final boolean fPretty; + private final boolean fWithMeta; + private int fSent; +// private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter; +// private final HpProcessingEngine<HpJsonEvent> fHppe; + private DMaaPContext dmaapContext; + private String responseTransactionId; + private Topic topic; + private boolean isAAFTopic = false; + private boolean istransEnable = false; + + + //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class); + + private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class); +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaJsonStreamReader.java new file mode 100644 index 0000000..9d727ad --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaJsonStreamReader.java @@ -0,0 +1,172 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.resources.streamReaders; + +import java.io.InputStream; + +import javax.servlet.http.HttpServletResponse; + +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.beans.LogDetails; +import com.att.nsa.cambria.resources.CambriaEventSet.reader; + +/** + * + * @author author + * + */ +public class CambriaJsonStreamReader implements reader { + private final JSONTokener fTokens; + private final boolean fIsList; + private long fCount; + private final String fDefPart; + public static final String kKeyField = "cambria.partition"; + + /** + * + * @param is + * @param defPart + * @throws CambriaApiException + */ + public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException { + try { + fTokens = new JSONTokener(is); + fCount = 0; + fDefPart = defPart; + + final int c = fTokens.next(); + if (c == '[') { + fIsList = true; + } else if (c == '{') { + fTokens.back(); + fIsList = false; + } else { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object."); + } + } catch (JSONException e) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); + } + } + + @Override + public message next() throws CambriaApiException { + try { + if (!fTokens.more()) { + return null; + } + + final int c = fTokens.next(); + + /*if (c ==','){ + fCloseCount++; + System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount); + }*/ + if (fIsList) { + if (c == ']' || (fCount > 0 && c == 10)) + return null; + + + if (fCount > 0 && c != ',' && c!= 10) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, + "Expected ',' or closing ']' after last object."); + } + + if (fCount == 0 && c != '{' && c!= 10 && c!=32) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object."); + } + } else if (fCount != 0 || c != '{') { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object."); + } + + if (c == '{') { + fTokens.back(); + } + final JSONObject o = new JSONObject(fTokens); + fCount++; + return new msg(o); + } catch (JSONException e) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); + + } + } + + private class msg implements message { + private final String fKey; + private String fMsg; + private LogDetails logDetails; + private boolean transactionEnabled; + + /** + * constructor + * + * @param o + */ + //public msg(JSONObject o){} + + + public msg(JSONObject o) { + String key = o.optString(kKeyField, fDefPart); + if (key == null) { + key = "" + System.currentTimeMillis(); + } + fKey = key; + + fMsg = o.toString().trim(); + + } + + @Override + public String getKey() { + return fKey; + } + + @Override + public String getMessage() { + return fMsg; + } + + @Override + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + @Override + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + @Override + public void setLogDetails(LogDetails logDetails) { + this.logDetails = logDetails; + } + + @Override + public LogDetails getLogDetails() { + return logDetails; + } + } +} diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaRawStreamReader.java new file mode 100644 index 0000000..16f6785 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaRawStreamReader.java @@ -0,0 +1,141 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.resources.streamReaders; + +import java.io.IOException; +import java.io.InputStream; + +import javax.servlet.http.HttpServletResponse; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.beans.LogDetails; +import com.att.nsa.cambria.resources.CambriaEventSet.reader; +import com.att.nsa.util.StreamTools; + +/** + * + * This stream reader reads raw bytes creating a single message. + * @author author + * + */ +public class CambriaRawStreamReader implements reader +{ + /** + * This is the constructor of CambriaRawStreamReader, it will basically the read from Input stream + * @param is + * @param defPart + * @throws CambriaApiException + */ + public CambriaRawStreamReader ( InputStream is, String defPart ) throws CambriaApiException + { + fStream = is; + fDefPart = defPart; + fClosed = false; + } + + @Override + /** + * + * next() method reads the bytes and + * iterates through the messages + * @throws CambriaApiException + * + */ + public message next () throws CambriaApiException + { + if ( fClosed ) return null; + + try + { + final byte[] rawBytes = StreamTools.readBytes ( fStream ); + fClosed = true; + return new message () + { + private LogDetails logDetails; + private boolean transactionEnabled; + + /** + * returns boolean value which + * indicates whether transaction is enabled + */ + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + /** + * sets boolean value which + * indicates whether transaction is enabled + */ + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + @Override + /** + * @returns key + * It ch4ecks whether fDefPart value is Null. + * If yes, it will return ystem.currentTimeMillis () else + * it will return fDefPart variable value + */ + public String getKey () + { + return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart; + } + + @Override + /** + * returns the message in String type object + */ + public String getMessage () + { + return new String ( rawBytes ); + } + + /** + * set log details in logDetails variable + */ + @Override + public void setLogDetails(LogDetails logDetails) { + this.logDetails = logDetails; + } + + @Override + /** + * get the log details + */ + public LogDetails getLogDetails() { + return this.logDetails; + } + }; + } + catch ( IOException e ) + { + throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () ); + } + } + + private final InputStream fStream; + private final String fDefPart; + private boolean fClosed; + //private String transactionId; +} diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaStreamReader.java new file mode 100644 index 0000000..38359f0 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaStreamReader.java @@ -0,0 +1,229 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.resources.streamReaders; + +import java.io.IOException; +import java.io.InputStream; + +import javax.servlet.http.HttpServletResponse; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.beans.LogDetails; +import com.att.nsa.cambria.resources.CambriaEventSet.reader; + +/** + * Read an optionally chunked stream in the Cambria app format. This format + * allows for speedier server-side message parsing than pure JSON. It's looks + * like:<br/> + * <br/> + * <keyLength>.<msgLength>.<key><message><br/> + * <br/> + * Whitespace before/after each entry is ignored, so messages can be delivered + * with newlines between them, or not. + * + * @author author + * + */ +public class CambriaStreamReader implements reader { + /** + * constructor initializing InputStream with fStream + * + * @param senderStream + * @throws CambriaApiException + */ + public CambriaStreamReader(InputStream senderStream) throws CambriaApiException { + fStream = senderStream; + } + + @Override + /** + * next method iterates through msg length + * throws IOException + * throws CambriaApiException + * + */ + public message next() throws IOException, CambriaApiException { + final int keyLen = readLength(); + if (keyLen == -1) + return null; + + final int msgLen = readLength(); + final String keyPart = readString(keyLen); + final String msgPart = readString(msgLen); + + return new msg(keyPart, msgPart); + } + + private static class msg implements message { + /** + * constructor initialization + * + * @param key + * @param msg + */ + public msg(String key, String msg) { + // if no key, use the current time. This allows the message to be + // delivered + // in any order without forcing it into a single partition as empty + // string would. + if (key.length() < 1) { + key = "" + System.currentTimeMillis(); + } + + fKey = key; + fMsg = msg; + } + + @Override + /** + * @returns fkey + */ + public String getKey() { + return fKey; + } + + @Override + /** + * returns the message in String type object + */ + public String getMessage() { + return fMsg; + } + + private final String fKey; + private final String fMsg; + private LogDetails logDetails; + private boolean transactionEnabled; + + /** + * returns boolean value which + * indicates whether transaction is enabled + */ + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + /** + * sets boolean value which + * indicates whether transaction is enabled + */ + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + @Override + /** + * set log details in logDetails variable + */ + public void setLogDetails(LogDetails logDetails) { + this.logDetails = logDetails; + } + + @Override + /** + * get the log details + */ + public LogDetails getLogDetails() { + return this.logDetails; + } + + } + + private final InputStream fStream; + + /** + * max cambria length indicates message length + + // This limit is here to prevent the server from spinning on a long string of numbers + // that is delivered with 'application/cambria' as the format. The limit needs to be + // large enough to support the max message length (currently 1MB, the default Kafka + // limit) + * */ + + private static final int kMaxCambriaLength = 4*1000*1024; + + + /** + * + * @return + * @throws IOException + * @throws CambriaApiException + */ + private int readLength() throws IOException, CambriaApiException { + // always ignore leading whitespace + int c = fStream.read(); + while (Character.isWhitespace(c)) { + c = fStream.read(); + } + + if (c == -1) { + return -1; + } + + int result = 0; + while (Character.isDigit(c)) { + result = (result * 10) + (c - '0'); + if (result > kMaxCambriaLength) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); + } + c = fStream.read(); + } + + if (c != '.') { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length."); + } + + return result; + } + + /** + * + * @param len + * @return + * @throws IOException + * @throws CambriaApiException + */ + private String readString(int len) throws IOException, CambriaApiException { + final byte[] buffer = new byte[len]; + + final long startMs = System.currentTimeMillis(); + final long timeoutMs = startMs + 30000; // FIXME configurable + + int readTotal = 0; + while (readTotal < len) { + final int read = fStream.read(buffer, readTotal, len - readTotal); + if (read == -1 || System.currentTimeMillis() > timeoutMs) { + // EOF + break; + } + readTotal += read; + } + + if (readTotal < len) { + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, + "End of stream while reading " + len + " bytes"); + } + + return new String(buffer); + } +} diff --git a/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaTextStreamReader.java b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaTextStreamReader.java new file mode 100644 index 0000000..2b76a61 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/resources/streamReaders/CambriaTextStreamReader.java @@ -0,0 +1,140 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.resources.streamReaders; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import javax.servlet.http.HttpServletResponse; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.beans.LogDetails; +import com.att.nsa.cambria.resources.CambriaEventSet.reader; + +/** + * This stream reader just pulls single lines. It uses the default partition if provided. If + * not, the key is the current time, which does not guarantee ordering. + * + * @author author + * + */ +public class CambriaTextStreamReader implements reader +{ + /** + * This is the constructor for Cambria Text Reader format + * @param is + * @param defPart + * @throws CambriaApiException + */ + public CambriaTextStreamReader ( InputStream is, String defPart ) throws CambriaApiException + { + fReader = new BufferedReader ( new InputStreamReader ( is ) ); + fDefPart = defPart; + } + + @Override + /** + * next() method iterates through msg length + * throws IOException + * throws CambriaApiException + * + */ + public message next () throws CambriaApiException + { + try + { + final String line = fReader.readLine (); + if ( line == null ) return null; + + return new message () + { + private LogDetails logDetails; + private boolean transactionEnabled; + + /** + * returns boolean value which + * indicates whether transaction is enabled + * @return + */ + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + /** + * sets boolean value which + * indicates whether transaction is enabled + */ + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + @Override + /** + * @returns key + * It ch4ecks whether fDefPart value is Null. + * If yes, it will return ystem.currentTimeMillis () else + * it will return fDefPart variable value + */ + public String getKey () + { + return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart; + } + + @Override + /** + * returns the message in String type object + * @return + */ + public String getMessage () + { + return line; + } + + @Override + /** + * set log details in logDetails variable + */ + public void setLogDetails(LogDetails logDetails) { + this.logDetails = logDetails; + } + + @Override + /** + * get the log details + */ + public LogDetails getLogDetails() { + return this.logDetails; + } + }; + } + catch ( IOException e ) + { + throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () ); + } + } + + private final BufferedReader fReader; + private final String fDefPart; +} |