aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java')
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java898
1 files changed, 420 insertions, 478 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
index 398558d..db982ec 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -47,6 +47,7 @@ import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
import org.json.JSONObject;
+import org.json.JSONTokener;
import com.att.aft.dme2.api.DME2Client;
import com.att.aft.dme2.api.DME2Exception;
@@ -55,76 +56,66 @@ import com.att.nsa.mr.client.MRBatchingPublisher;
import com.att.nsa.mr.client.response.MRPublisherResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
-{
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
- public static class Builder
- {
- public Builder ()
- {
+ public static class Builder {
+ public Builder() {
}
- public Builder againstUrls ( Collection<String> baseUrls )
- {
+ public Builder againstUrls(Collection<String> baseUrls) {
fUrls = baseUrls;
return this;
}
- public Builder onTopic ( String topic )
- {
+ public Builder onTopic(String topic) {
fTopic = topic;
return this;
}
- public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
- {
+ public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
fMaxBatchSize = maxBatchSize;
fMaxBatchAgeMs = maxBatchAgeMs;
return this;
}
- public Builder compress ( boolean compress )
- {
+ public Builder compress(boolean compress) {
fCompress = compress;
return this;
}
-
- public Builder httpThreadTime ( int threadOccuranceTime )
- {
+
+ public Builder httpThreadTime(int threadOccuranceTime) {
this.threadOccuranceTime = threadOccuranceTime;
return this;
}
-
- public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
- {
+
+ public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
fAllowSelfSignedCerts = allowSelfSignedCerts;
return this;
}
-
- public Builder withResponse ( boolean withResponse)
- {
+
+ public Builder withResponse(boolean withResponse) {
fWithResponse = withResponse;
return this;
}
- public MRSimplerBatchPublisher build ()
- {
- if(!fWithResponse)
- {
+
+ public MRSimplerBatchPublisher build() {
+ if (!fWithResponse) {
try {
- return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, threadOccuranceTime);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- } else
- {
+ } else {
try {
- return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, fMaxBatchSize);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
-
+
}
private Collection<String> fUrls;
@@ -135,262 +126,250 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
private int threadOccuranceTime = 50;
private boolean fAllowSelfSignedCerts = false;
private boolean fWithResponse = false;
-
+
};
@Override
- public int send ( String partition, String msg )
- {
- return send ( new message ( partition, msg ) );
+ public int send(String partition, String msg) {
+ return send(new message(partition, msg));
}
+
@Override
- public int send ( String msg )
- {
- return send ( new message ( null, msg ) );
+ public int send(String msg) {
+ return send(new message(null, msg));
}
-
@Override
- public int send ( message msg )
- {
- final LinkedList<message> list = new LinkedList<message> ();
- list.add ( msg );
- return send ( list );
+ public int send(message msg) {
+ final LinkedList<message> list = new LinkedList<message>();
+ list.add(msg);
+ return send(list);
}
-
-
@Override
- public synchronized int send ( Collection<message> msgs )
- {
- if ( fClosed )
- {
- throw new IllegalStateException ( "The publisher was closed." );
+ public synchronized int send(Collection<message> msgs) {
+ if (fClosed) {
+ throw new IllegalStateException("The publisher was closed.");
}
-
- for ( message userMsg : msgs )
- {
- fPending.add ( new TimestampedMessage ( userMsg ) );
+
+ for (message userMsg : msgs) {
+ fPending.add(new TimestampedMessage(userMsg));
}
- return getPendingMessageCount ();
+ return getPendingMessageCount();
}
@Override
- public synchronized int getPendingMessageCount ()
- {
- return fPending.size ();
+ public synchronized int getPendingMessageCount() {
+ return fPending.size();
}
@Override
- public void close ()
- {
- try
- {
- final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
- if ( remains.size() > 0 )
- {
- getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+ public void close() {
+ try {
+ final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.size() > 0) {
+ getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
}
- }
- catch ( InterruptedException e )
- {
- getLog().warn ( "Possible message loss. " + e.getMessage(), e );
- }
- catch ( IOException e )
- {
- getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+ } catch (InterruptedException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ } catch (IOException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
}
}
@Override
- public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
- {
- synchronized ( this )
- {
+ public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+ synchronized (this) {
fClosed = true;
// stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
- fExec.shutdown ();
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
}
- final long now = Clock.now ();
- final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+ final long now = Clock.now();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
final long timeoutAtMs = now + waitInMs;
- while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
- {
- send ( true );
- Thread.sleep ( 250 );
+ while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+ send(true);
+ Thread.sleep(250);
}
- synchronized ( this )
- {
- final LinkedList<message> result = new LinkedList<message> ();
- fPending.drainTo ( result );
+ synchronized (this) {
+ final LinkedList<message> result = new LinkedList<message>();
+ fPending.drainTo(result);
return result;
}
}
/**
- * Possibly send a batch to the MR server. This is called by the background thread
- * and the close() method
+ * Possibly send a batch to the MR server. This is called by the background
+ * thread and the close() method
*
* @param force
*/
- private synchronized void send ( boolean force )
- {
- if ( force || shouldSendNow () )
- {
- if ( !sendBatch () )
- {
- getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+ private synchronized void send(boolean force) {
+ if (force || shouldSendNow()) {
+ if (!sendBatch()) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
// note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+ fDontSendUntilMs = sfWaitAfterError + Clock.now();
}
}
}
- private synchronized boolean shouldSendNow ()
- {
+ private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if ( fPending.size () > 0 )
- {
- final long nowMs = Clock.now ();
+ if (fPending.size() > 0) {
+ final long nowMs = Clock.now();
- shouldSend = ( fPending.size() >= fMaxBatchSize );
- if ( !shouldSend )
- {
+ shouldSend = (fPending.size() >= fMaxBatchSize);
+ if (!shouldSend) {
final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
shouldSend = sendAtMs <= nowMs;
}
// however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
}
return shouldSend;
}
- private synchronized boolean sendBatch ()
- {
- // it's possible for this call to be made with an empty list. in this case, just return.
- if ( fPending.size() < 1 )
- {
+ /**
+ * Method to parse published JSON Objects and Arrays
+ *
+ * @return JSONArray
+ */
+ private JSONArray parseJSON() {
+ JSONArray jsonArray = new JSONArray();
+ for (TimestampedMessage m : fPending) {
+ JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+ JSONObject jsonObject = null;
+ JSONArray tempjsonArray = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ tempjsonArray = new JSONArray(jsonTokener);
+ if (null != tempjsonArray) {
+ for (int i = 0; i < tempjsonArray.length(); i++) {
+ jsonArray.put(tempjsonArray.getJSONObject(i));
+ }
+ }
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonArray.put(jsonObject);
+ }
+
+ }
+ return jsonArray;
+ }
+
+ private synchronized boolean sendBatch() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
return true;
}
- final long nowMs = Clock.now ();
-
+ final long nowMs = Clock.now();
+
host = this.fHostSelector.selectBaseHost();
-
- final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-
-
- try
- {
- /*final String contentType =
- fCompress ?
- MRFormat.CAMBRIA_ZIP.toString () :
- MRFormat.CAMBRIA.toString ()
- ;*/
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+ props.getProperty("partition"));
+
+ try {
+ /*
+ * final String contentType = fCompress ?
+ * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
+ */
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
final String contentType = props.getProperty("contenttype");
- if(contentType.equalsIgnoreCase("application/json")){
- JSONArray jsonArray = new JSONArray();
- for ( TimestampedMessage m : fPending )
- {
- JSONObject jsonObject = new JSONObject(m.fMsg);
-
- jsonArray.put(jsonObject);
-
+ if (contentType.equalsIgnoreCase("application/json")) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ os.close();
+
+ } else if (contentType.equalsIgnoreCase("text/plain")) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else if (contentType.equalsIgnoreCase("application/cambria")
+ || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+ if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
}
- os.write (jsonArray.toString().getBytes() );
os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
- }else if (contentType.equalsIgnoreCase("text/plain")){
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
- if ( contentType.equalsIgnoreCase("application/cambria-zip") )
- {
- os = new GZIPOutputStream ( baseStream );
- }
- for ( TimestampedMessage m : fPending )
- {
-
- os.write ( ( "" + m.fPartition.length () ).getBytes() );
- os.write ( '.' );
- os.write ( ( "" + m.fMsg.length () ).getBytes() );
- os.write ( '.' );
- os.write ( m.fPartition.getBytes() );
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- }else{
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
-
- }
- os.close ();
}
-
-
+ os.close();
+ }
- final long startMs = Clock.now ();
+ final long startMs = Clock.now();
if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
-
+
DME2Configue();
-
+
Thread.sleep(5);
- getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- sender.setPayload(os.toString());
+ getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
-
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
- + dmeResponse.toString();
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
getLog().info(logLine);
fPending.clear();
return true;
- }
-
+ }
+
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
- if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
+ username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
return false;
}
final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
getLog().info(logLine);
fPending.clear();
return true;
- }
-
+ }
+
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-
-
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
- if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+ protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
return false;
}
final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
@@ -398,118 +377,100 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
fPending.clear();
return true;
}
- }
- catch ( IllegalArgumentException x ) {
- getLog().warn ( x.getMessage(), x );
- } catch ( IOException x ) {
- getLog().warn ( x.getMessage(), x );
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
} catch (HttpException x) {
- getLog().warn ( x.getMessage(), x );
+ getLog().warn(x.getMessage(), x);
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
}
return false;
}
- public synchronized MRPublisherResponse sendBatchWithResponse ()
- {
- // it's possible for this call to be made with an empty list. in this case, just return.
- if ( fPending.size() < 1 )
- {
+ public synchronized MRPublisherResponse sendBatchWithResponse() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage("No Messages to send");
return pubResponse;
}
- final long nowMs = Clock.now ();
-
+ final long nowMs = Clock.now();
+
host = this.fHostSelector.selectBaseHost();
-
- final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
- OutputStream os=null;
- try
- {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
- os = baseStream;
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+ props.getProperty("partition"));
+ OutputStream os = null;
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ os = baseStream;
final String contentType = props.getProperty("contenttype");
- if(contentType.equalsIgnoreCase("application/json")){
- JSONArray jsonArray = new JSONArray();
- for ( TimestampedMessage m : fPending )
- {
- JSONObject jsonObject = new JSONObject(m.fMsg);
-
- jsonArray.put(jsonObject);
-
+ if (contentType.equalsIgnoreCase("application/json")) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ } else if (contentType.equalsIgnoreCase("text/plain")) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
}
- os.write (jsonArray.toString().getBytes() );
- }else if (contentType.equalsIgnoreCase("text/plain")){
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
- if ( contentType.equalsIgnoreCase("application/cambria-zip") )
- {
- os = new GZIPOutputStream ( baseStream );
- }
- for ( TimestampedMessage m : fPending )
- {
-
- os.write ( ( "" + m.fPartition.length () ).getBytes() );
- os.write ( '.' );
- os.write ( ( "" + m.fMsg.length () ).getBytes() );
- os.write ( '.' );
- os.write ( m.fPartition.getBytes() );
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- }else{
- for ( TimestampedMessage m : fPending )
- {
- os.write ( m.fMsg.getBytes() );
-
- }
+ } else if (contentType.equalsIgnoreCase("application/cambria")
+ || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+ if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
}
-
-
+ os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
- final long startMs = Clock.now ();
+ }
+ }
+
+ final long startMs = Clock.now();
if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
-
+
try {
- DME2Configue();
-
- Thread.sleep(5);
- getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- sender.setPayload(os.toString());
-
-
- String dmeResponse = sender.sendAndWait(5000L);
- System.out.println("dmeres->"+dmeResponse);
-
-
- pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
-
- if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
- final String logLine = String.valueOf((Clock.now() - startMs))
- + dmeResponse.toString();
- getLog().info(logLine);
- fPending.clear();
-
- }
- catch (DME2Exception x) {
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ sender.setPayload(os.toString());
+
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+ final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+
+ } catch (DME2Exception x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(x.getErrorCode());
pubResponse.setResponseMessage(x.getErrorMessage());
} catch (URISyntaxException x) {
-
+
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
@@ -517,135 +478,127 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
- logger.error("exception: ", x);
-
+ logger.error("exception: ", x);
+
}
-
+
return pubResponse;
- }
-
+ }
+
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
-
-
- pubResponse = createMRPublisherResponse(result,pubResponse);
-
- if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
+ authDate, username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
return pubResponse;
}
-
+
final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
getLog().info(logLine);
fPending.clear();
return pubResponse;
- }
-
+ }
+
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
- final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-
- //System.out.println(result.getInt("status"));
- //Here we are checking for error response. If HTTP status
- //code is not within the http success response code
- //then we consider this as error and return false
- pubResponse = createMRPublisherResponse(result,pubResponse);
-
- if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
+ password, protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
return pubResponse;
}
-
+
final String logLine = String.valueOf((Clock.now() - startMs));
getLog().info(logLine);
fPending.clear();
return pubResponse;
}
- }
- catch ( IllegalArgumentException x ) {
- getLog().warn ( x.getMessage(), x );
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
-
- } catch ( IOException x ) {
- getLog().warn ( x.getMessage(), x );
+
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
-
+
} catch (HttpException x) {
- getLog().warn ( x.getMessage(), x );
+ getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
-
+
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
-
+
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
-
+
}
-
+
finally {
- if (fPending.size()>0) {
- getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+ if (fPending.size() > 0) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
pubResponse.setPendingMsgs(fPending.size());
}
if (os != null) {
try {
- os.close();
+ os.close();
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage("Error in closing Output Stream");
}
- }
+ }
}
-
+
return pubResponse;
}
-
-private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-
- if (reply.isEmpty())
- {
-
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- mrPubResponse.setResponseMessage("Please verify the Producer properties");
- }
- else if(reply.startsWith("{"))
- {
+
+ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+ if (reply.isEmpty()) {
+
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ mrPubResponse.setResponseMessage("Please verify the Producer properties");
+ } else if (reply.startsWith("{")) {
JSONObject jObject = new JSONObject(reply);
- if(jObject.has("message") && jObject.has("status"))
- {
+ if (jObject.has("message") && jObject.has("status")) {
String message = jObject.getString("message");
- if(null != message)
- {
- mrPubResponse.setResponseMessage(message);
+ if (null != message) {
+ mrPubResponse.setResponseMessage(message);
}
mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ } else {
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrPubResponse.setResponseMessage(reply);
}
- else
- {
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrPubResponse.setResponseMessage(reply);
- }
- }
- else if (reply.startsWith("<"))
- {
- String responseCode = getHTTPErrorResponseCode(reply);
- if( responseCode.contains("403"))
- {
- responseCode = "403";
- }
- mrPubResponse.setResponseCode(responseCode);
- mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- }
-
+ } else if (reply.startsWith("<")) {
+ String responseCode = getHTTPErrorResponseCode(reply);
+ if (responseCode.contains("403")) {
+ responseCode = "403";
+ }
+ mrPubResponse.setResponseCode(responseCode);
+ mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }
+
return mrPubResponse;
}
@@ -658,10 +611,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
private String username;
private String password;
private String host;
-
- //host selector
+
+ // host selector
private HostSelector fHostSelector = null;
-
+
private final LinkedBlockingQueue<TimestampedMessage> fPending;
private long fDontSendUntilMs;
private final ScheduledThreadPoolExecutor fExec;
@@ -684,25 +637,24 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
private HashMap<String, String> DMETimeOuts;
private DME2Client sender;
public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- public String producerFilePath;
private String authKey;
private String authDate;
private String handlers;
private Properties props;
public static String routerFilePath;
- protected static final Map<String, String> headers=new HashMap<String, String>();
+ protected static final Map<String, String> headers = new HashMap<String, String>();
public static MultivaluedMap<String, Object> headersMap;
-
-
+
private MRPublisherResponse pubResponse;
-
+
public MRPublisherResponse getPubResponse() {
return pubResponse;
}
+
public void setPubResponse(MRPublisherResponse pubResponse) {
this.pubResponse = pubResponse;
}
-
+
public static String getRouterFilePath() {
return routerFilePath;
}
@@ -719,14 +671,6 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
this.props = props;
}
- public String getProducerFilePath() {
- return producerFilePath;
- }
-
- public void setProducerFilePath(String producerFilePath) {
- this.producerFilePath = producerFilePath;
- }
-
public String getProtocolFlag() {
return protocolFlag;
}
@@ -734,14 +678,14 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
public void setProtocolFlag(String protocolFlag) {
this.protocolFlag = protocolFlag;
}
-
-
+
private void DME2Configue() throws Exception {
try {
-
- /* FileReader reader = new FileReader(new File (producerFilePath));
- Properties props = new Properties();
- props.load(reader);*/
+
+ /*
+ * FileReader reader = new FileReader(new File (producerFilePath));
+ * Properties props = new Properties(); props.load(reader);
+ */
latitude = props.getProperty("Latitude");
longitude = props.getProperty("Longitude");
version = props.getProperty("Version");
@@ -749,41 +693,43 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
env = props.getProperty("Environment");
partner = props.getProperty("Partner");
routeOffer = props.getProperty("routeOffer");
- subContextPath = props.getProperty("SubContextPath")+fTopic;
- /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
- subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
- }*/
+ subContextPath = props.getProperty("SubContextPath") + fTopic;
+ /*
+ * if(props.getProperty("partition")!=null &&
+ * !props.getProperty("partition").equalsIgnoreCase("")){
+ * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
+ * "partition"); }
+ */
protocol = props.getProperty("Protocol");
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
dmepassword = props.getProperty("password");
contentType = props.getProperty("contenttype");
handlers = props.getProperty("sessionstickinessrequired");
- routerFilePath= props.getProperty("DME2preferredRouterFilePath");
-
+ routerFilePath = props.getProperty("DME2preferredRouterFilePath");
+
/**
- * Changes to DME2Client url to use Partner for auto failover between data centers
- * When Partner value is not provided use the routeOffer value for auto failover within a cluster
+ * Changes to DME2Client url to use Partner for auto failover
+ * between data centers When Partner value is not provided use the
+ * routeOffer value for auto failover within a cluster
*/
-
String partitionKey = props.getProperty("partition");
-
- if (partner != null && !partner.isEmpty() )
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
- if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
- url = url + "&partitionKey=" + partitionKey;
- }
- }
- else if (routeOffer!=null && !routeOffer.isEmpty())
- {
- url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
- if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
- url = url + "&partitionKey=" + partitionKey;
- }
+
+ if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+ + partner;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
}
-
+
DMETimeOuts = new HashMap<String, String>();
DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
@@ -791,56 +737,56 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
DMETimeOuts.put("Content-Type", contentType);
System.setProperty("AFT_LATITUDE", latitude);
System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
- //System.setProperty("DME2.DEBUG", "true");
- // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
- //System.out.println("XXXXXX"+url);
-
- //SSL changes
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
- "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+ // System.setProperty("DME2.DEBUG", "true");
+
+ // SSL changes
+ // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+ // "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-
- //SSL changes
-
+
+ // SSL changes
+
sender = new DME2Client(new URI(url), 5000L);
-
+
sender.setAllowAllHttpReturnCodes(true);
sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
+ sender.setSubContext(subContextPath);
sender.setCredentials(dmeuser, dmepassword);
sender.setHeaders(DMETimeOuts);
- if(handlers.equalsIgnoreCase("yes")){
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ if (handlers.equalsIgnoreCase("yes")) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- }else{
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+ }
} catch (DME2Exception x) {
getLog().warn(x.getMessage(), x);
- throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+ throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
} catch (URISyntaxException x) {
-
+
getLog().warn(x.getMessage(), x);
- throw new URISyntaxException(url,x.getMessage());
+ throw new URISyntaxException(url, x.getMessage());
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
throw new Exception(x.getMessage());
}
}
-
- private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
- {
- super ( hosts );
- if ( topic == null || topic.length() < 1 )
- {
- throw new IllegalArgumentException ( "A topic must be provided." );
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
}
-
+
fHostSelector = new HostSelector(hosts, null);
fClosed = false;
fTopic = topic;
@@ -848,49 +794,45 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
- fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor ( 1 );
+ fExec = new ScheduledThreadPoolExecutor(1);
pubResponse = new MRPublisherResponse();
-
+
}
-
- private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
- {
- super ( hosts );
- if ( topic == null || topic.length() < 1 )
- {
- throw new IllegalArgumentException ( "A topic must be provided." );
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
}
-
+
fHostSelector = new HostSelector(hosts, null);
fClosed = false;
fTopic = topic;
fMaxBatchSize = maxBatchSize;
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
- threadOccuranceTime=httpThreadOccurnace;
- fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ threadOccuranceTime = httpThreadOccurnace;
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor ( 1 );
- fExec.scheduleAtFixedRate ( new Runnable()
- {
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(new Runnable() {
@Override
- public void run ()
- {
- send ( false );
+ public void run() {
+ send(false);
}
- }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+ }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
}
- private static class TimestampedMessage extends message
- {
- public TimestampedMessage ( message m )
- {
- super ( m );
+ private static class TimestampedMessage extends message {
+ public TimestampedMessage(message m) {
+ super(m);
timestamp = Clock.now();
}
+
public final long timestamp;
}
@@ -941,5 +883,5 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
public void setAuthDate(String authDate) {
this.authDate = authDate;
}
-
+
}