From 2721925ea824afb932b09aa9575fdb35311325d7 Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Thu, 14 Jun 2018 18:05:40 -0400 Subject: fixes for null pointer exceptions Issue-ID: DMAAP-519 Change-Id: Ia32d0bd58c5f438b50e361221a96b988b00a1120 Signed-off-by: sunil unnava --- .../com/att/nsa/mr/client/impl/MRConsumerImpl.java | 46 ++++++++++++++++++++-- .../mr/client/impl/MRSimplerBatchPublisher.java | 16 +++++++- 2 files changed, 57 insertions(+), 5 deletions(-) (limited to 'src/main/java/com/att/nsa/mr/client/impl') diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java index b54fedb..faa81ce 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java @@ -127,6 +127,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { try { // getLog().info ( "Receiving msgs from: " + // url+subContextPath ); + long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); + //String reply = sender.sendAndWait(timeout); String reply = sender.sendAndWait(timeoutMs + 10000L); final JSONObject o = getResponseDataInJson(reply); // msgs.add(reply); @@ -362,6 +364,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { private HashMap DMETimeOuts; private String handlers; public static final String routerFilePath = null; + private long dme2ReplyHandlerTimeoutMs; + private long longPollingMs; + private static final long defaultDme2PerEndPointTimeoutMs = 10000L; + private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L; + public static String getRouterFilePath() { return routerFilePath; @@ -388,6 +395,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException { + this.longPollingMs = timeoutMs; latitude = props.getProperty("Latitude"); longitude = props.getProperty("Longitude"); version = props.getProperty("Version"); @@ -459,7 +467,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); // SSL changes - sender = new DME2Client(new URI(url), timeoutMs + 10000L); + long dme2PerEndPointTimeoutMs; + try { + dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS")); + //backward compatibility + if ( dme2PerEndPointTimeoutMs <= 0) { + dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs; + } + } catch (NumberFormatException nfe) { + //backward compatibility + dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs; + getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs); + } + + try { + dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS")); + } catch (NumberFormatException nfe) { + try { + long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs; + getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs); + }catch (NumberFormatException e) { + //backward compatibility + dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs; + getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs); + } + } + //backward compatibility + if ( dme2ReplyHandlerTimeoutMs <= 0) { + dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs; + } + sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs); sender.setAllowAllHttpReturnCodes(true); sender.setMethod(methodType); sender.setSubContext(subContextPath); @@ -469,7 +508,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { sender.setHeaders(DMETimeOuts); sender.setPayload(""); - if (handlers.equalsIgnoreCase("yes")) { + if (handlers!=null&&handlers.equalsIgnoreCase("yes")) { sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); @@ -594,7 +633,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { DMEConfigure(timeoutMs, limit); - String reply = sender.sendAndWait(timeoutMs + 10000L); + long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); + String reply = sender.sendAndWait(timeout); final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java index 2f7680b..5bb5087 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java @@ -67,6 +67,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP fUrls = baseUrls; return this; } + + public Builder againstUrlsOrServiceName ( Collection baseUrls, Collection serviceName, String transportype ) + { + fUrls = baseUrls; + fServiceName = serviceName; + fTransportype = transportype; + return this; + } public Builder onTopic(String topic) { fTopic = topic; @@ -119,6 +127,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP } private Collection fUrls; + private Collection fServiceName; + private String fTransportype; private String fTopic; private int fMaxBatchSize = 100; private long fMaxBatchAgeMs = 1000; @@ -278,7 +288,9 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP final long nowMs = Clock.now(); - host = this.fHostSelector.selectBaseHost(); + if (this.fHostSelector != null) { + host = this.fHostSelector.selectBaseHost(); + } final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), props.getProperty("partition")); @@ -796,7 +808,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP sender.setSubContext(subContextPath); sender.setCredentials(dmeuser, dmepassword); sender.setHeaders(DMETimeOuts); - if (handlers.equalsIgnoreCase("yes")) { + if (handlers != null &&handlers.equalsIgnoreCase("yes")) { sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", -- cgit 1.2.3-korg