summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java
index 86a6ef8..9ef2139 100644
--- a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java
+++ b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java
@@ -171,7 +171,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
}
// public Builder atOffset ( int pos )
- // {
+
// fOffset = pos;
// return this;
// }
@@ -197,7 +197,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
fConsumer = builder.fConsumer;
fLimit = builder.fLimit;
fTimeoutMs = builder.fTimeoutMs;
- // fSettings = builder.fSettings;
+
fSent = 0;
fPretty = builder.fPretty;
fWithMeta = builder.fWithMeta;
@@ -244,7 +244,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
* @param msg
* @throws IOException
*/
- // void onMessage(int count, Message msg) throws IOException;
+
void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
}
@@ -263,7 +263,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
* throws IOException
*/
public void write(final OutputStream os) throws IOException {
- // final boolean transactionEnabled = topic.isTransactionEnabled();
+
// final boolean transactionEnabled = isTransEnabled();
// final boolean transactionEnabled = istransEnable;
// synchronized(this){
@@ -282,7 +282,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
entry.put("message", msg);
os.write(entry.toString().getBytes());
} else {
- // os.write(message.getBytes());
+
String jsonString = JSONObject.valueToString(msg);
os.write(jsonString.getBytes());
}
@@ -299,7 +299,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
try {
if (istransEnable && istransType) {
// final String transactionId =
- // jsonMessage.getString("transactionId");
+
// responseTransactionId = transId;
StringBuilder consumerInfo = new StringBuilder();
if (null != dmaapContext && null != dmaapContext.getRequest()) {
@@ -338,7 +338,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
}
});
- // if (null != dmaapContext && isTransactionEnabled()) {
+
if (null != dmaapContext && istransEnable && istransType) {
dmaapContext.getResponse().setHeader("transactionId",
@@ -353,10 +353,10 @@ public class CambriaOutboundEventStream implements StreamWriter {
if (null != strclose_out_stream)
close_out_stream = Boolean.parseBoolean(strclose_out_stream);
- // if (fSettings.getBoolean("close.output.stream", true)) {
+
if (close_out_stream) {
os.close();
- // }
+
}
}