diff options
Diffstat (limited to 'src/main/java/com/att/dmf')
-rw-r--r-- | src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java index 86a6ef8..9ef2139 100644 --- a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java +++ b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java @@ -171,7 +171,7 @@ public class CambriaOutboundEventStream implements StreamWriter { } // public Builder atOffset ( int pos ) - // { + // fOffset = pos; // return this; // } @@ -197,7 +197,7 @@ public class CambriaOutboundEventStream implements StreamWriter { fConsumer = builder.fConsumer; fLimit = builder.fLimit; fTimeoutMs = builder.fTimeoutMs; - // fSettings = builder.fSettings; + fSent = 0; fPretty = builder.fPretty; fWithMeta = builder.fWithMeta; @@ -244,7 +244,7 @@ public class CambriaOutboundEventStream implements StreamWriter { * @param msg * @throws IOException */ - // void onMessage(int count, Message msg) throws IOException; + void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException; } @@ -263,7 +263,7 @@ public class CambriaOutboundEventStream implements StreamWriter { * throws IOException */ public void write(final OutputStream os) throws IOException { - // final boolean transactionEnabled = topic.isTransactionEnabled(); + // final boolean transactionEnabled = isTransEnabled(); // final boolean transactionEnabled = istransEnable; // synchronized(this){ @@ -282,7 +282,7 @@ public class CambriaOutboundEventStream implements StreamWriter { entry.put("message", msg); os.write(entry.toString().getBytes()); } else { - // os.write(message.getBytes()); + String jsonString = JSONObject.valueToString(msg); os.write(jsonString.getBytes()); } @@ -299,7 +299,7 @@ public class CambriaOutboundEventStream implements StreamWriter { try { if (istransEnable && istransType) { // final String transactionId = - // jsonMessage.getString("transactionId"); + // responseTransactionId = transId; StringBuilder consumerInfo = new StringBuilder(); if (null != dmaapContext && null != dmaapContext.getRequest()) { @@ -338,7 +338,7 @@ public class CambriaOutboundEventStream implements StreamWriter { } }); - // if (null != dmaapContext && isTransactionEnabled()) { + if (null != dmaapContext && istransEnable && istransType) { dmaapContext.getResponse().setHeader("transactionId", @@ -353,10 +353,10 @@ public class CambriaOutboundEventStream implements StreamWriter { if (null != strclose_out_stream) close_out_stream = Boolean.parseBoolean(strclose_out_stream); - // if (fSettings.getBoolean("close.output.stream", true)) { + if (close_out_stream) { os.close(); - // } + } } |