diff options
Diffstat (limited to 'src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java')
-rw-r--r-- | src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java | 898 |
1 files changed, 420 insertions, 478 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java index 398558d..db982ec 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java @@ -47,6 +47,7 @@ import org.apache.http.HttpException; import org.apache.http.HttpStatus; import org.json.JSONArray; import org.json.JSONObject; +import org.json.JSONTokener; import com.att.aft.dme2.api.DME2Client; import com.att.aft.dme2.api.DME2Exception; @@ -55,76 +56,66 @@ import com.att.nsa.mr.client.MRBatchingPublisher; import com.att.nsa.mr.client.response.MRPublisherResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher -{ +public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); - public static class Builder - { - public Builder () - { + public static class Builder { + public Builder() { } - public Builder againstUrls ( Collection<String> baseUrls ) - { + public Builder againstUrls(Collection<String> baseUrls) { fUrls = baseUrls; return this; } - public Builder onTopic ( String topic ) - { + public Builder onTopic(String topic) { fTopic = topic; return this; } - public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs ) - { + public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { fMaxBatchSize = maxBatchSize; fMaxBatchAgeMs = maxBatchAgeMs; return this; } - public Builder compress ( boolean compress ) - { + public Builder compress(boolean compress) { fCompress = compress; return this; } - - public Builder httpThreadTime ( int threadOccuranceTime ) - { + + public Builder httpThreadTime(int threadOccuranceTime) { this.threadOccuranceTime = threadOccuranceTime; return this; } - - public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts ) - { + + public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) { fAllowSelfSignedCerts = allowSelfSignedCerts; return this; } - - public Builder withResponse ( boolean withResponse) - { + + public Builder withResponse(boolean withResponse) { fWithResponse = withResponse; return this; } - public MRSimplerBatchPublisher build () - { - if(!fWithResponse) - { + + public MRSimplerBatchPublisher build() { + if (!fWithResponse) { try { - return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime); + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, threadOccuranceTime); } catch (MalformedURLException e) { throw new RuntimeException(e); } - } else - { + } else { try { - return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize); + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, fMaxBatchSize); } catch (MalformedURLException e) { throw new RuntimeException(e); } } - + } private Collection<String> fUrls; @@ -135,262 +126,250 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP private int threadOccuranceTime = 50; private boolean fAllowSelfSignedCerts = false; private boolean fWithResponse = false; - + }; @Override - public int send ( String partition, String msg ) - { - return send ( new message ( partition, msg ) ); + public int send(String partition, String msg) { + return send(new message(partition, msg)); } + @Override - public int send ( String msg ) - { - return send ( new message ( null, msg ) ); + public int send(String msg) { + return send(new message(null, msg)); } - @Override - public int send ( message msg ) - { - final LinkedList<message> list = new LinkedList<message> (); - list.add ( msg ); - return send ( list ); + public int send(message msg) { + final LinkedList<message> list = new LinkedList<message>(); + list.add(msg); + return send(list); } - - @Override - public synchronized int send ( Collection<message> msgs ) - { - if ( fClosed ) - { - throw new IllegalStateException ( "The publisher was closed." ); + public synchronized int send(Collection<message> msgs) { + if (fClosed) { + throw new IllegalStateException("The publisher was closed."); } - - for ( message userMsg : msgs ) - { - fPending.add ( new TimestampedMessage ( userMsg ) ); + + for (message userMsg : msgs) { + fPending.add(new TimestampedMessage(userMsg)); } - return getPendingMessageCount (); + return getPendingMessageCount(); } @Override - public synchronized int getPendingMessageCount () - { - return fPending.size (); + public synchronized int getPendingMessageCount() { + return fPending.size(); } @Override - public void close () - { - try - { - final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); - if ( remains.size() > 0 ) - { - getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. " - + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." ); + public void close() { + try { + final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (remains.size() > 0) { + getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " + + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); } - } - catch ( InterruptedException e ) - { - getLog().warn ( "Possible message loss. " + e.getMessage(), e ); - } - catch ( IOException e ) - { - getLog().warn ( "Possible message loss. " + e.getMessage(), e ); + } catch (InterruptedException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + } catch (IOException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); } } @Override - public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException - { - synchronized ( this ) - { + public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException { + synchronized (this) { fClosed = true; // stop the background sender - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); - fExec.shutdown (); + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + fExec.shutdown(); } - final long now = Clock.now (); - final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); + final long now = Clock.now(); + final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); final long timeoutAtMs = now + waitInMs; - while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 ) - { - send ( true ); - Thread.sleep ( 250 ); + while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { + send(true); + Thread.sleep(250); } - synchronized ( this ) - { - final LinkedList<message> result = new LinkedList<message> (); - fPending.drainTo ( result ); + synchronized (this) { + final LinkedList<message> result = new LinkedList<message>(); + fPending.drainTo(result); return result; } } /** - * Possibly send a batch to the MR server. This is called by the background thread - * and the close() method + * Possibly send a batch to the MR server. This is called by the background + * thread and the close() method * * @param force */ - private synchronized void send ( boolean force ) - { - if ( force || shouldSendNow () ) - { - if ( !sendBatch () ) - { - getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); + private synchronized void send(boolean force) { + if (force || shouldSendNow()) { + if (!sendBatch()) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + Clock.now (); + fDontSendUntilMs = sfWaitAfterError + Clock.now(); } } } - private synchronized boolean shouldSendNow () - { + private synchronized boolean shouldSendNow() { boolean shouldSend = false; - if ( fPending.size () > 0 ) - { - final long nowMs = Clock.now (); + if (fPending.size() > 0) { + final long nowMs = Clock.now(); - shouldSend = ( fPending.size() >= fMaxBatchSize ); - if ( !shouldSend ) - { + shouldSend = (fPending.size() >= fMaxBatchSize); + if (!shouldSend) { final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; shouldSend = sendAtMs <= nowMs; } // however, wait after an error - shouldSend = shouldSend && nowMs >= fDontSendUntilMs; + shouldSend = shouldSend && nowMs >= fDontSendUntilMs; } return shouldSend; } - private synchronized boolean sendBatch () - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( fPending.size() < 1 ) - { + /** + * Method to parse published JSON Objects and Arrays + * + * @return JSONArray + */ + private JSONArray parseJSON() { + JSONArray jsonArray = new JSONArray(); + for (TimestampedMessage m : fPending) { + JSONTokener jsonTokener = new JSONTokener(m.fMsg); + JSONObject jsonObject = null; + JSONArray tempjsonArray = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + tempjsonArray = new JSONArray(jsonTokener); + if (null != tempjsonArray) { + for (int i = 0; i < tempjsonArray.length(); i++) { + jsonArray.put(tempjsonArray.getJSONObject(i)); + } + } + } else { + jsonObject = new JSONObject(jsonTokener); + jsonArray.put(jsonObject); + } + + } + return jsonArray; + } + + private synchronized boolean sendBatch() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.size() < 1) { return true; } - final long nowMs = Clock.now (); - + final long nowMs = Clock.now(); + host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); - - - try - { - /*final String contentType = - fCompress ? - MRFormat.CAMBRIA_ZIP.toString () : - MRFormat.CAMBRIA.toString () - ;*/ - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), + props.getProperty("partition")); + + try { + /* + * final String contentType = fCompress ? + * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ; + */ + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); OutputStream os = baseStream; final String contentType = props.getProperty("contenttype"); - if(contentType.equalsIgnoreCase("application/json")){ - JSONArray jsonArray = new JSONArray(); - for ( TimestampedMessage m : fPending ) - { - JSONObject jsonObject = new JSONObject(m.fMsg); - - jsonArray.put(jsonObject); - + if (contentType.equalsIgnoreCase("application/json")) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + os.close(); + + } else if (contentType.equalsIgnoreCase("text/plain")) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else if (contentType.equalsIgnoreCase("application/cambria") + || (contentType.equalsIgnoreCase("application/cambria-zip"))) { + if (contentType.equalsIgnoreCase("application/cambria-zip")) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); } - os.write (jsonArray.toString().getBytes() ); os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); - }else if (contentType.equalsIgnoreCase("text/plain")){ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ - if ( contentType.equalsIgnoreCase("application/cambria-zip") ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : fPending ) - { - - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - }else{ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - - } - os.close (); } - - + os.close(); + } - final long startMs = Clock.now (); + final long startMs = Clock.now(); if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - + DME2Configue(); - + Thread.sleep(5); - getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - sender.setPayload(os.toString()); + getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); - - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" - + dmeResponse.toString(); + + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString(); getLog().info(logLine); fPending.clear(); return true; - } - + } + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - if(result.getInt("status") < 200 || result.getInt("status") > 299) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, + username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { return false; } final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); getLog().info(logLine); fPending.clear(); return true; - } - + } + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); - - - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - if(result.getInt("status") < 200 || result.getInt("status") > 299) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, + protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { return false; } final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); @@ -398,118 +377,100 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP fPending.clear(); return true; } - } - catch ( IllegalArgumentException x ) { - getLog().warn ( x.getMessage(), x ); - } catch ( IOException x ) { - getLog().warn ( x.getMessage(), x ); + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); + } catch (IOException x) { + getLog().warn(x.getMessage(), x); } catch (HttpException x) { - getLog().warn ( x.getMessage(), x ); + getLog().warn(x.getMessage(), x); } catch (Exception x) { getLog().warn(x.getMessage(), x); } return false; } - public synchronized MRPublisherResponse sendBatchWithResponse () - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( fPending.size() < 1 ) - { + public synchronized MRPublisherResponse sendBatchWithResponse() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.size() < 1) { pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage("No Messages to send"); return pubResponse; } - final long nowMs = Clock.now (); - + final long nowMs = Clock.now(); + host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") ); - OutputStream os=null; - try - { - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); - os = baseStream; + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), + props.getProperty("partition")); + OutputStream os = null; + try { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + os = baseStream; final String contentType = props.getProperty("contenttype"); - if(contentType.equalsIgnoreCase("application/json")){ - JSONArray jsonArray = new JSONArray(); - for ( TimestampedMessage m : fPending ) - { - JSONObject jsonObject = new JSONObject(m.fMsg); - - jsonArray.put(jsonObject); - + if (contentType.equalsIgnoreCase("application/json")) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + } else if (contentType.equalsIgnoreCase("text/plain")) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); } - os.write (jsonArray.toString().getBytes() ); - }else if (contentType.equalsIgnoreCase("text/plain")){ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){ - if ( contentType.equalsIgnoreCase("application/cambria-zip") ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : fPending ) - { - - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - }else{ - for ( TimestampedMessage m : fPending ) - { - os.write ( m.fMsg.getBytes() ); - - } + } else if (contentType.equalsIgnoreCase("application/cambria") + || (contentType.equalsIgnoreCase("application/cambria-zip"))) { + if (contentType.equalsIgnoreCase("application/cambria-zip")) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); } - - + os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); - final long startMs = Clock.now (); + } + } + + final long startMs = Clock.now(); if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - + try { - DME2Configue(); - - Thread.sleep(5); - getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - sender.setPayload(os.toString()); - - - String dmeResponse = sender.sendAndWait(5000L); - System.out.println("dmeres->"+dmeResponse); - - - pubResponse = createMRPublisherResponse(dmeResponse,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - final String logLine = String.valueOf((Clock.now() - startMs)) - + dmeResponse.toString(); - getLog().info(logLine); - fPending.clear(); - - } - catch (DME2Exception x) { + DME2Configue(); + + Thread.sleep(5); + getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + sender.setPayload(os.toString()); + + String dmeResponse = sender.sendAndWait(5000L); + + pubResponse = createMRPublisherResponse(dmeResponse, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + + } catch (DME2Exception x) { getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(x.getErrorCode()); pubResponse.setResponseMessage(x.getErrorMessage()); } catch (URISyntaxException x) { - + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); @@ -517,135 +478,127 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - logger.error("exception: ", x); - + logger.error("exception: ", x); + } - + return pubResponse; - } - + } + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag); - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - - - pubResponse = createMRPublisherResponse(result,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, + authDate, username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + return pubResponse; } - + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); getLog().info(logLine); fPending.clear(); return pubResponse; - } - + } + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" ); - final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); - - //System.out.println(result.getInt("status")); - //Here we are checking for error response. If HTTP status - //code is not within the http success response code - //then we consider this as error and return false - pubResponse = createMRPublisherResponse(result,pubResponse); - - if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, + password, protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + return pubResponse; } - + final String logLine = String.valueOf((Clock.now() - startMs)); getLog().info(logLine); fPending.clear(); return pubResponse; } - } - catch ( IllegalArgumentException x ) { - getLog().warn ( x.getMessage(), x ); + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); - - } catch ( IOException x ) { - getLog().warn ( x.getMessage(), x ); + + } catch (IOException x) { + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - + } catch (HttpException x) { - getLog().warn ( x.getMessage(), x ); + getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); - + } catch (Exception x) { getLog().warn(x.getMessage(), x); - + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - + } - + finally { - if (fPending.size()>0) { - getLog().warn ( "Send failed, " + fPending.size() + " message to send." ); + if (fPending.size() > 0) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); pubResponse.setPendingMsgs(fPending.size()); } if (os != null) { try { - os.close(); + os.close(); } catch (Exception x) { getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage("Error in closing Output Stream"); } - } + } } - + return pubResponse; } - -private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { - - if (reply.isEmpty()) - { - - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - mrPubResponse.setResponseMessage("Please verify the Producer properties"); - } - else if(reply.startsWith("{")) - { + + private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { + + if (reply.isEmpty()) { + + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + mrPubResponse.setResponseMessage("Please verify the Producer properties"); + } else if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); - if(jObject.has("message") && jObject.has("status")) - { + if (jObject.has("message") && jObject.has("status")) { String message = jObject.getString("message"); - if(null != message) - { - mrPubResponse.setResponseMessage(message); + if (null != message) { + mrPubResponse.setResponseMessage(message); } mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + } else { + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrPubResponse.setResponseMessage(reply); } - else - { - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrPubResponse.setResponseMessage(reply); - } - } - else if (reply.startsWith("<")) - { - String responseCode = getHTTPErrorResponseCode(reply); - if( responseCode.contains("403")) - { - responseCode = "403"; - } - mrPubResponse.setResponseCode(responseCode); - mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - } - + } else if (reply.startsWith("<")) { + String responseCode = getHTTPErrorResponseCode(reply); + if (responseCode.contains("403")) { + responseCode = "403"; + } + mrPubResponse.setResponseCode(responseCode); + mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } + return mrPubResponse; } @@ -658,10 +611,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR private String username; private String password; private String host; - - //host selector + + // host selector private HostSelector fHostSelector = null; - + private final LinkedBlockingQueue<TimestampedMessage> fPending; private long fDontSendUntilMs; private final ScheduledThreadPoolExecutor fExec; @@ -684,25 +637,24 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR private HashMap<String, String> DMETimeOuts; private DME2Client sender; public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - public String producerFilePath; private String authKey; private String authDate; private String handlers; private Properties props; public static String routerFilePath; - protected static final Map<String, String> headers=new HashMap<String, String>(); + protected static final Map<String, String> headers = new HashMap<String, String>(); public static MultivaluedMap<String, Object> headersMap; - - + private MRPublisherResponse pubResponse; - + public MRPublisherResponse getPubResponse() { return pubResponse; } + public void setPubResponse(MRPublisherResponse pubResponse) { this.pubResponse = pubResponse; } - + public static String getRouterFilePath() { return routerFilePath; } @@ -719,14 +671,6 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR this.props = props; } - public String getProducerFilePath() { - return producerFilePath; - } - - public void setProducerFilePath(String producerFilePath) { - this.producerFilePath = producerFilePath; - } - public String getProtocolFlag() { return protocolFlag; } @@ -734,14 +678,14 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR public void setProtocolFlag(String protocolFlag) { this.protocolFlag = protocolFlag; } - - + private void DME2Configue() throws Exception { try { - - /* FileReader reader = new FileReader(new File (producerFilePath)); - Properties props = new Properties(); - props.load(reader);*/ + + /* + * FileReader reader = new FileReader(new File (producerFilePath)); + * Properties props = new Properties(); props.load(reader); + */ latitude = props.getProperty("Latitude"); longitude = props.getProperty("Longitude"); version = props.getProperty("Version"); @@ -749,41 +693,43 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR env = props.getProperty("Environment"); partner = props.getProperty("Partner"); routeOffer = props.getProperty("routeOffer"); - subContextPath = props.getProperty("SubContextPath")+fTopic; - /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){ - subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition"); - }*/ + subContextPath = props.getProperty("SubContextPath") + fTopic; + /* + * if(props.getProperty("partition")!=null && + * !props.getProperty("partition").equalsIgnoreCase("")){ + * subContextPath=subContextPath+"?partitionKey="+props.getProperty( + * "partition"); } + */ protocol = props.getProperty("Protocol"); methodType = props.getProperty("MethodType"); dmeuser = props.getProperty("username"); dmepassword = props.getProperty("password"); contentType = props.getProperty("contenttype"); handlers = props.getProperty("sessionstickinessrequired"); - routerFilePath= props.getProperty("DME2preferredRouterFilePath"); - + routerFilePath = props.getProperty("DME2preferredRouterFilePath"); + /** - * Changes to DME2Client url to use Partner for auto failover between data centers - * When Partner value is not provided use the routeOffer value for auto failover within a cluster + * Changes to DME2Client url to use Partner for auto failover + * between data centers When Partner value is not provided use the + * routeOffer value for auto failover within a cluster */ - String partitionKey = props.getProperty("partition"); - - if (partner != null && !partner.isEmpty() ) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; - if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ - url = url + "&partitionKey=" + partitionKey; - } - } - else if (routeOffer!=null && !routeOffer.isEmpty()) - { - url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer; - if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){ - url = url + "&partitionKey=" + partitionKey; - } + + if (partner != null && !partner.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + + partner; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } + } else if (routeOffer != null && !routeOffer.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" + + routeOffer; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } } - + DMETimeOuts = new HashMap<String, String>(); DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); @@ -791,56 +737,56 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR DMETimeOuts.put("Content-Type", contentType); System.setProperty("AFT_LATITUDE", latitude); System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT")); - //System.setProperty("DME2.DEBUG", "true"); - // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true"); - //System.out.println("XXXXXX"+url); - - //SSL changes - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); + // System.setProperty("DME2.DEBUG", "true"); + + // SSL changes + // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + // "SSLv3,TLSv1,TLSv1.1"); + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - - //SSL changes - + + // SSL changes + sender = new DME2Client(new URI(url), 5000L); - + sender.setAllowAllHttpReturnCodes(true); sender.setMethod(methodType); - sender.setSubContext(subContextPath); + sender.setSubContext(subContextPath); sender.setCredentials(dmeuser, dmepassword); sender.setHeaders(DMETimeOuts); - if(handlers.equalsIgnoreCase("yes")){ - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + if (handlers.equalsIgnoreCase("yes")) { + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - }else{ - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); - } + } else { + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } } catch (DME2Exception x) { getLog().warn(x.getMessage(), x); - throw new DME2Exception(x.getErrorCode(),x.getErrorMessage()); + throw new DME2Exception(x.getErrorCode(), x.getErrorMessage()); } catch (URISyntaxException x) { - + getLog().warn(x.getMessage(), x); - throw new URISyntaxException(url,x.getMessage()); + throw new URISyntaxException(url, x.getMessage()); } catch (Exception x) { getLog().warn(x.getMessage(), x); throw new Exception(x.getMessage()); } } - - private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException - { - super ( hosts ); - if ( topic == null || topic.length() < 1 ) - { - throw new IllegalArgumentException ( "A topic must be provided." ); + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); } - + fHostSelector = new HostSelector(hosts, null); fClosed = false; fTopic = topic; @@ -848,49 +794,45 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR fMaxBatchAgeMs = maxBatchAgeMs; fCompress = compress; - fPending = new LinkedBlockingQueue<TimestampedMessage> (); + fPending = new LinkedBlockingQueue<TimestampedMessage>(); fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor ( 1 ); + fExec = new ScheduledThreadPoolExecutor(1); pubResponse = new MRPublisherResponse(); - + } - - private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException - { - super ( hosts ); - if ( topic == null || topic.length() < 1 ) - { - throw new IllegalArgumentException ( "A topic must be provided." ); + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); } - + fHostSelector = new HostSelector(hosts, null); fClosed = false; fTopic = topic; fMaxBatchSize = maxBatchSize; fMaxBatchAgeMs = maxBatchAgeMs; fCompress = compress; - threadOccuranceTime=httpThreadOccurnace; - fPending = new LinkedBlockingQueue<TimestampedMessage> (); + threadOccuranceTime = httpThreadOccurnace; + fPending = new LinkedBlockingQueue<TimestampedMessage>(); fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor ( 1 ); - fExec.scheduleAtFixedRate ( new Runnable() - { + fExec = new ScheduledThreadPoolExecutor(1); + fExec.scheduleAtFixedRate(new Runnable() { @Override - public void run () - { - send ( false ); + public void run() { + send(false); } - }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS ); + }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS); } - private static class TimestampedMessage extends message - { - public TimestampedMessage ( message m ) - { - super ( m ); + private static class TimestampedMessage extends message { + public TimestampedMessage(message m) { + super(m); timestamp = Clock.now(); } + public final long timestamp; } @@ -941,5 +883,5 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR public void setAuthDate(String authDate) { this.authDate = authDate; } - + } |