diff options
author | Lee, Tian (tl5884) <TianL@amdocs.com> | 2018-07-10 14:15:52 +0100 |
---|---|---|
committer | Lee, Tian (tl5884) <TianL@amdocs.com> | 2018-07-10 14:48:25 +0100 |
commit | 1f70b26884ba61f466eac21635eae125acd29a1f (patch) | |
tree | d6f31ff2c650b96c1ddd3af57c2b5eb07ac33054 /src | |
parent | 6953e6169bb7b92a41a902e3ad932c88aab38bb5 (diff) |
Fix various Sonar issues in MRConsumerImpl
Amendment 1: Restructure order of methods and fields according to Java convention.
Amendment 2: Remove bug fix that was accidentally included. This will be committed in a separate commit.
These changes do not affect the original functionality of the class.
Change-Id: I974a5e3fa94ca184704c4e26636bcbbfb5ce3320
Issue-ID: DMAAP-540
Signed-off-by: Lee, Tian (tl5884) <TianL@amdocs.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java | 1356 |
1 files changed, 621 insertions, 735 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java index faa81ce..4f5907f 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java @@ -8,7 +8,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,10 +17,17 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package com.att.nsa.mr.client.impl; +import com.att.aft.dme2.api.DME2Client; +import com.att.aft.dme2.api.DME2Exception; +import com.att.nsa.mr.client.HostSelector; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -34,7 +41,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Properties; - import org.apache.http.HttpException; import org.apache.http.HttpStatus; import org.json.JSONArray; @@ -44,738 +50,618 @@ import org.json.JSONTokener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.aft.dme2.api.DME2Client; -import com.att.aft.dme2.api.DME2Exception; -import com.att.nsa.mr.client.HostSelector; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.response.MRConsumerResponse; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - public class MRConsumerImpl extends MRBaseClient implements MRConsumer { - private static final String SUCCESS_MESSAGE = "Success"; - - private Logger log = LoggerFactory.getLogger(this.getClass().getName()); - - public static List<String> stringToList(String str) { - final LinkedList<String> set = new LinkedList<String>(); - if (str != null) { - final String[] parts = str.trim().split(","); - for (String part : parts) { - final String trimmed = part.trim(); - if (trimmed.length() > 0) { - set.add(trimmed); - } - } - } - return set; - } - - public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, - String apiSecret_password) throws MalformedURLException { - this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, - false); - } - - public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, - boolean allowSelfSignedCerts) throws MalformedURLException { - super(hostPart, topic + "::" + consumerGroup + "::" + consumerId); - - fTopic = topic; - fGroup = consumerGroup; - fId = consumerId; - fTimeoutMs = timeoutMs; - fLimit = limit; - fFilter = filter; - - fHostSelector = new HostSelector(hostPart); - } - - @Override - public Iterable<String> fetch() throws IOException, Exception { - // fetch with the timeout and limit set in constructor - return fetch(fTimeoutMs, fLimit); - } - - @Override - public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception { - final LinkedList<String> msgs = new LinkedList<String>(); - - // FIXME: the timeout on the socket needs to be at least as long as the - // long poll - // // sanity check for long poll timeout vs. socket read timeout - // final int maxReasonableTimeoutMs = - // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10; - // if ( timeoutMs > maxReasonableTimeoutMs ) - // { - // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. - // socket read timeout (" + - // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll - // timeout to " + maxReasonableTimeoutMs + "." ); - // timeoutMs = maxReasonableTimeoutMs; - // } - - // final String urlPath = createUrlPath ( timeoutMs, limit ); - - // getLog().info ( "UEB GET " + urlPath ); - try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - DMEConfigure(timeoutMs, limit); - try { - // getLog().info ( "Receiving msgs from: " + - // url+subContextPath ); - long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); - //String reply = sender.sendAndWait(timeout); - String reply = sender.sendAndWait(timeoutMs + 10000L); - final JSONObject o = getResponseDataInJson(reply); - // msgs.add(reply); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - // final int b = o.getInt("status" ); - // if ( a != null && a.length()>0 ) - if (a != null) { - for (int i = 0; i < a.length(); i++) { - // msgs.add("DMAAP response status: - // "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - // else if(a != null && a.length()<1){ - // msgs.add ("[]"); - // } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - // final String urlPath = createUrlPath - // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, - // fId,props.getProperty("Protocol")), timeoutMs, limit ); - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - try { - final JSONObject o = get(urlPath, username, password, protocolFlag); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - final int b = o.getInt("status"); - // if ( a != null && a.length()>0 ) - if (a != null) { - for (int i = 0; i < a.length(); i++) { - // msgs.add("DMAAP response status: - // "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - // else if(a != null && a.length()<1) - // { - // msgs.add ("[]"); - // } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), - timeoutMs, limit); - - try { - final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - final int b = o.getInt("status"); - // if ( a != null && a.length()>0) - if (a != null) { - for (int i = 0; i < a.length(); i++) { - // msgs.add("DMAAP response status: - // "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - // else if(a != null && a.length()<1){ - // msgs.add ("[]"); - // } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - - } - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - try { - final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - final int b = o.getInt("status"); - // if ( a != null && a.length()>0) - if (a != null) { - for (int i = 0; i < a.length(); i++) { - // msgs.add("DMAAP response status: - // "+Integer.toString(b)); - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - - } - - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } catch (Exception e) { - throw e; - } - - return msgs; - } - - private JSONObject getResponseDataInJson(String response) { - try { - - // log.info("DMAAP response status: " + response.getStatus()); - - // final String responseData = response.readEntity(String.class); - JSONTokener jsonTokener = new JSONTokener(response); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else { - jsonObject = new JSONObject(jsonTokener); - } - - return jsonObject; - } catch (JSONException excp) { - log.error("DMAAP - Error reading response data.", excp); - return null; - } - - } - - private JSONObject getResponseDataInJsonWithResponseReturned(String response) { - JSONTokener jsonTokener = new JSONTokener(response); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if (null != response && response.length() == 0) { - return null; - } - - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - } else if ('{' == firstChar) { - return null; - } else if ('<' == firstChar) { - return null; - } else { - jsonObject = new JSONObject(jsonTokener); - } - - return jsonObject; - - } - - private final String fTopic; - private final String fGroup; - private final String fId; - private final int fTimeoutMs; - private final int fLimit; - private String fFilter; - private String username; - private String password; - private String host; - HostSelector fHostSelector = null; - private String latitude; - private String longitude; - private String version; - private String serviceName; - private String env; - private String partner; - private String routeOffer; - private String subContextPath; - private String protocol; - private String methodType; - private String url; - private String dmeuser; - private String dmepassword; - private String contenttype; - private DME2Client sender; - public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - public String consumerFilePath; - private String authKey; - private String authDate; - private Properties props; - private HashMap<String, String> DMETimeOuts; - private String handlers; - public static final String routerFilePath = null; - private long dme2ReplyHandlerTimeoutMs; - private long longPollingMs; - private static final long defaultDme2PerEndPointTimeoutMs = 10000L; - private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L; - - - public static String getRouterFilePath() { - return routerFilePath; - } - - public static void setRouterFilePath(String routerFilePath) { - MRSimplerBatchPublisher.routerFilePath = routerFilePath; - } - - public String getConsumerFilePath() { - return consumerFilePath; - } - - public void setConsumerFilePath(String consumerFilePath) { - this.consumerFilePath = consumerFilePath; - } - - public String getProtocolFlag() { - return protocolFlag; - } - - public void setProtocolFlag(String protocolFlag) { - this.protocolFlag = protocolFlag; - } - - private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException { - this.longPollingMs = timeoutMs; - latitude = props.getProperty("Latitude"); - longitude = props.getProperty("Longitude"); - version = props.getProperty("Version"); - serviceName = props.getProperty("ServiceName"); - env = props.getProperty("Environment"); - partner = props.getProperty("Partner"); - routeOffer = props.getProperty("routeOffer"); - - subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId; - // subContextPath=createUrlPath (subContextPath, timeoutMs, limit); - // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, - // timeoutMs); - - protocol = props.getProperty("Protocol"); - methodType = props.getProperty("MethodType"); - dmeuser = props.getProperty("username"); - dmepassword = props.getProperty("password"); - contenttype = props.getProperty("contenttype"); - handlers = props.getProperty("sessionstickinessrequired"); - // url =protocol+"://DME2SEARCH/"+ - // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner; - // url = protocol + - // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner; - - /** - * Changes to DME2Client url to use Partner for auto failover between - * data centers When Partner value is not provided use the routeOffer - * value for auto failover within a cluster - */ - - String preferredRouteKey = readRoute("preferredRouteKey"); - - if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner - + "&routeoffer=" + preferredRouteKey; - } else if (partner != null && !partner.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner; - } else if (routeOffer != null && !routeOffer.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" - + routeOffer; - } - - // log.info("url :"+url); - - if (timeoutMs != -1) - url = url + "&timeout=" + timeoutMs; - if (limit != -1) - url = url + "&limit=" + limit; - - // Add filter to DME2 Url - if (fFilter != null && fFilter.length() > 0) - url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8"); - - DMETimeOuts = new HashMap<String, String>(); - DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); - DMETimeOuts.put("Content-Type", contenttype); - System.setProperty("AFT_LATITUDE", latitude); - System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); - // System.setProperty("DME2.DEBUG", "true"); - - // SSL changes - // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - // "SSLv3,TLSv1,TLSv1.1"); - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); - System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); - System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - // SSL changes - - long dme2PerEndPointTimeoutMs; - try { - dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS")); - //backward compatibility - if ( dme2PerEndPointTimeoutMs <= 0) { - dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs; - } - } catch (NumberFormatException nfe) { - //backward compatibility - dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs; - getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs); - } - - try { - dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS")); - } catch (NumberFormatException nfe) { - try { - long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); - dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs; - getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs); - }catch (NumberFormatException e) { - //backward compatibility - dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs; - getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs); - } - } - //backward compatibility - if ( dme2ReplyHandlerTimeoutMs <= 0) { - dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs; - } - sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs); - sender.setAllowAllHttpReturnCodes(true); - sender.setMethod(methodType); - sender.setSubContext(subContextPath); - if (dmeuser != null && dmepassword != null) { - sender.setCredentials(dmeuser, dmepassword); - } - sender.setHeaders(DMETimeOuts); - sender.setPayload(""); - - if (handlers!=null&&handlers.equalsIgnoreCase("yes")) { - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", - props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); - sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - } else { - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); - } - } - - public Properties getProps() { - return props; - } - - public void setProps(Properties props) { - this.props = props; - } - - protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException { - final StringBuffer contexturl = new StringBuffer(url); - // final StringBuffer url = new StringBuffer ( - // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) ); - final StringBuffer adds = new StringBuffer(); - if (timeoutMs > -1) - adds.append("timeout=").append(timeoutMs); - if (limit > -1) { - if (adds.length() > 0) { - adds.append("&"); - } - adds.append("limit=").append(limit); - } - if (fFilter != null && fFilter.length() > 0) { - try { - if (adds.length() > 0) { - adds.append("&"); - } - adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); - } catch (UnsupportedEncodingException e) { - log.error("exception at createUrlPath () : ", e); - } - } - if (adds.length() > 0) { - contexturl.append("?").append(adds.toString()); - } - - // sender.setSubContext(url.toString()); - return contexturl.toString(); - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public String getAuthKey() { - return authKey; - } - - public void setAuthKey(String authKey) { - this.authKey = authKey; - } - - public String getAuthDate() { - return authDate; - } - - public void setAuthDate(String authDate) { - this.authDate = authDate; - } - - public String getfFilter() { - return fFilter; - } - - public void setfFilter(String fFilter) { - this.fFilter = fFilter; - } - - private String readRoute(String routeKey) { - - try { - - MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath))); - - } catch (Exception ex) { - log.error("Reply Router Error " + ex); - } - String routeOffer = MRClientFactory.prop.getProperty(routeKey); - return routeOffer; - } - - @Override - public MRConsumerResponse fetchWithReturnConsumerResponse() { - - // fetch with the timeout and limit set in constructor - return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit); - } - - @Override - public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) { - final LinkedList<String> msgs = new LinkedList<String>(); - MRConsumerResponse mrConsumerResponse = new MRConsumerResponse(); - try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - DMEConfigure(timeoutMs, limit); - - long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); - String reply = sender.sendAndWait(timeout); - - final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(reply, mrConsumerResponse); - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - /* - * final String urlPath = createUrlPath( - * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, - * props.getProperty("Protocol")), timeoutMs, limit); - */ - - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - String response = getResponse(urlPath, username, password, protocolFlag); - - final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(response, mrConsumerResponse); - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), - timeoutMs, limit); - - String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag); - final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(response, mrConsumerResponse); - } - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - // final String urlPath = createUrlPath( - // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, - // props.getProperty("Protocol")), timeoutMs, - // limit); - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - String response = getNoAuthResponse(urlPath, username, password, protocolFlag); - final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - - } - } - - } - createMRConsumerResponse(response, mrConsumerResponse); - } - - } catch (JSONException e) { - mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("json exception: ", e); - } catch (HttpException e) { - mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("http exception: ", e); - } catch (DME2Exception e) { - mrConsumerResponse.setResponseCode(e.getErrorCode()); - mrConsumerResponse.setResponseMessage(e.getErrorMessage()); - log.error("DME2 exception: ", e); - } catch (Exception e) { - mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("exception: ", e); - } - mrConsumerResponse.setActualMessages(msgs); - return mrConsumerResponse; - } - - private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) { - - if (reply.startsWith("{")) { - JSONObject jObject = new JSONObject(reply); - String message = jObject.getString("message"); - int status = jObject.getInt("status"); - - mrConsumerResponse.setResponseCode(Integer.toString(status)); - - if (null != message) { - mrConsumerResponse.setResponseMessage(message); - } - } else if (reply.startsWith("<")) { - mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply)); - mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - } else { - mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); - } - - } - + private Logger log = LoggerFactory.getLogger(this.getClass().getName()); + + public static final String routerFilePath = null; + + public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); + public String consumerFilePath; + + private static final String SUCCESS_MESSAGE = "Success"; + private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L; + private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L; + + private final String fTopic; + private final String fGroup; + private final String fId; + private final int fTimeoutMs; + private final int fLimit; + private String fFilter; + private String username; + private String password; + private String host; + private HostSelector fHostSelector = null; + private String url; + private DME2Client sender; + private String authKey; + private String authDate; + private Properties props; + private HashMap<String, String> DMETimeOuts; + private long dme2ReplyHandlerTimeoutMs; + private long longPollingMs; + + public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, + String apiSecret_password) throws MalformedURLException { + this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, + false); + } + + public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, + boolean allowSelfSignedCerts) throws MalformedURLException { + super(hostPart, topic + "::" + consumerGroup + "::" + consumerId); + + fTopic = topic; + fGroup = consumerGroup; + fId = consumerId; + fTimeoutMs = timeoutMs; + fLimit = limit; + fFilter = filter; + + fHostSelector = new HostSelector(hostPart); + } + + @Override + public Iterable<String> fetch() throws IOException, Exception { + // fetch with the timeout and limit set in constructor + return fetch(fTimeoutMs, fLimit); + } + + @Override + public Iterable<String> fetch(int timeoutMs, int limit) throws Exception { + final LinkedList<String> msgs = new LinkedList<>(); + + try { + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + dmeConfigure(timeoutMs, limit); + try { + String reply = sender.sendAndWait(timeoutMs + 10000L); + final JSONObject o = getResponseDataInJson(reply); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); + } + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + try { + final JSONObject o = get(urlPath, username, password, protocolFlag); + + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); + } + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), + timeoutMs, limit); + + try { + final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); + } + } + + if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + try { + final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); + } + } + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + log.error("exception: ", e); + } catch (HttpException e) { + throw new IOException(e); + } catch (Exception e) { + throw e; + } + + return msgs; + } + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse() { + // fetch with the timeout and limit set in constructor + return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit); + } + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) { + final LinkedList<String> msgs = new LinkedList<String>(); + MRConsumerResponse mrConsumerResponse = new MRConsumerResponse(); + try { + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + dmeConfigure(timeoutMs, limit); + + long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs + : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS); + String reply = sender.sendAndWait(timeout); + + final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); + + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + createMRConsumerResponse(reply, mrConsumerResponse); + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + String response = getResponse(urlPath, username, password, protocolFlag); + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + createMRConsumerResponse(response, mrConsumerResponse); + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), + timeoutMs, limit); + + String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag); + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + createMRConsumerResponse(response, mrConsumerResponse); + } + + if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + + String response = getNoAuthResponse(urlPath, username, password, protocolFlag); + final JSONObject o = getResponseDataInJsonWithResponseReturned(response); + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + createMRConsumerResponse(response, mrConsumerResponse); + } + + } catch (JSONException e) { + mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + mrConsumerResponse.setResponseMessage(e.getMessage()); + log.error("json exception: ", e); + } catch (HttpException e) { + mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + mrConsumerResponse.setResponseMessage(e.getMessage()); + log.error("http exception: ", e); + } catch (DME2Exception e) { + mrConsumerResponse.setResponseCode(e.getErrorCode()); + mrConsumerResponse.setResponseMessage(e.getErrorMessage()); + log.error("DME2 exception: ", e); + } catch (Exception e) { + mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + mrConsumerResponse.setResponseMessage(e.getMessage()); + log.error("exception: ", e); + } + mrConsumerResponse.setActualMessages(msgs); + return mrConsumerResponse; + } + + private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) { + if (reply.startsWith("{")) { + JSONObject jObject = new JSONObject(reply); + String message = jObject.getString("message"); + int status = jObject.getInt("status"); + + mrConsumerResponse.setResponseCode(Integer.toString(status)); + + if (null != message) { + mrConsumerResponse.setResponseMessage(message); + } + } else if (reply.startsWith("<")) { + mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply)); + mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } else { + mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); + } + } + + private JSONObject getResponseDataInJson(String response) { + try { + JSONTokener jsonTokener = new JSONTokener(response); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + } else { + jsonObject = new JSONObject(jsonTokener); + } + + return jsonObject; + } catch (JSONException excp) { + log.error("DMAAP - Error reading response data.", excp); + return null; + } + } + + private JSONObject getResponseDataInJsonWithResponseReturned(String response) { + JSONTokener jsonTokener = new JSONTokener(response); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if (null != response && response.length() == 0) { + return null; + } + + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put("result", jsonArray); + } else if ('{' == firstChar) { + return null; + } else if ('<' == firstChar) { + return null; + } else { + jsonObject = new JSONObject(jsonTokener); + } + + return jsonObject; + } + + private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException { + this.longPollingMs = timeoutMs; + String latitude = props.getProperty("Latitude"); + String longitude = props.getProperty("Longitude"); + String version = props.getProperty("Version"); + String serviceName = props.getProperty("ServiceName"); + String env = props.getProperty("Environment"); + String partner = props.getProperty("Partner"); + String routeOffer = props.getProperty("routeOffer"); + String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId; + String protocol = props.getProperty("Protocol"); + String methodType = props.getProperty("MethodType"); + String dmeuser = props.getProperty("username"); + String dmepassword = props.getProperty("password"); + String contenttype = props.getProperty("contenttype"); + String handlers = props.getProperty("sessionstickinessrequired"); + + /** + * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not + * provided use the routeOffer value for auto failover within a cluster + */ + + String preferredRouteKey = readRoute("preferredRouteKey"); + + if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner + + "&routeoffer=" + preferredRouteKey; + } else if (partner != null && !partner.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner; + } else if (routeOffer != null && !routeOffer.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" + + routeOffer; + } + + if (timeoutMs != -1) + url = url + "&timeout=" + timeoutMs; + if (limit != -1) + url = url + "&limit=" + limit; + + // Add filter to DME2 Url + if (fFilter != null && fFilter.length() > 0) + url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8"); + + DMETimeOuts = new HashMap<>(); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + DMETimeOuts.put("Content-Type", contenttype); + System.setProperty("AFT_LATITUDE", latitude); + System.setProperty("AFT_LONGITUDE", longitude); + System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); + + // SSL changes + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); + System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); + System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); + // SSL changes + + long dme2PerEndPointTimeoutMs; + try { + dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS")); + // backward compatibility + if (dme2PerEndPointTimeoutMs <= 0) { + dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS; + } + } catch (NumberFormatException nfe) { + // backward compatibility + dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS; + getLog().debug( + "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS); + } + + try { + dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS")); + } catch (NumberFormatException nfe) { + try { + long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs; + getLog().debug( + "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + + dme2ReplyHandlerTimeoutMs); + } catch (NumberFormatException e) { + // backward compatibility + dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS; + getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs); + } + } + // backward compatibility + if (dme2ReplyHandlerTimeoutMs <= 0) { + dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS; + } + + sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs); + sender.setAllowAllHttpReturnCodes(true); + sender.setMethod(methodType); + sender.setSubContext(subContextPath); + if (dmeuser != null && dmepassword != null) { + sender.setCredentials(dmeuser, dmepassword); + } + sender.setHeaders(DMETimeOuts); + sender.setPayload(""); + if (handlers != null && handlers.equalsIgnoreCase("yes")) { + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); + } else { + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } + } + + protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException { + final StringBuilder contexturl = new StringBuilder(url); + final StringBuilder adds = new StringBuilder(); + + if (timeoutMs > -1) { + adds.append("timeout=").append(timeoutMs); + } + + if (limit > -1) { + if (adds.length() > 0) { + adds.append("&"); + } + adds.append("limit=").append(limit); + } + + if (fFilter != null && fFilter.length() > 0) { + try { + if (adds.length() > 0) { + adds.append("&"); + } + adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); + } catch (UnsupportedEncodingException e) { + log.error("exception at createUrlPath () : ", e); + } + } + + if (adds.length() > 0) { + contexturl.append("?").append(adds.toString()); + } + + return contexturl.toString(); + } + + private String readRoute(String routeKey) { + try { + MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath))); + } catch (Exception ex) { + log.error("Reply Router Error " + ex); + } + return MRClientFactory.prop.getProperty(routeKey); + } + + public static List<String> stringToList(String str) { + final LinkedList<String> set = new LinkedList<>(); + if (str != null) { + final String[] parts = str.trim().split(","); + for (String part : parts) { + final String trimmed = part.trim(); + if (trimmed.length() > 0) { + set.add(trimmed); + } + } + } + return set; + } + + public static String getRouterFilePath() { + return routerFilePath; + } + + public static void setRouterFilePath(String routerFilePath) { + MRSimplerBatchPublisher.routerFilePath = routerFilePath; + } + + public String getConsumerFilePath() { + return consumerFilePath; + } + + public void setConsumerFilePath(String consumerFilePath) { + this.consumerFilePath = consumerFilePath; + } + + public String getProtocolFlag() { + return protocolFlag; + } + + public void setProtocolFlag(String protocolFlag) { + this.protocolFlag = protocolFlag; + } + + public Properties getProps() { + return props; + } + + public void setProps(Properties props) { + this.props = props; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getAuthKey() { + return authKey; + } + + public void setAuthKey(String authKey) { + this.authKey = authKey; + } + + public String getAuthDate() { + return authDate; + } + + public void setAuthDate(String authDate) { + this.authDate = authDate; + } + + public String getfFilter() { + return fFilter; + } + + public void setfFilter(String fFilter) { + this.fFilter = fFilter; + } } |