aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java1356
1 files changed, 621 insertions, 735 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
index faa81ce..4f5907f 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
@@ -8,7 +8,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,10 +17,17 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package com.att.nsa.mr.client.impl;
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+import com.att.nsa.mr.client.HostSelector;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -34,7 +41,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-
import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
@@ -44,738 +50,618 @@ import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.aft.dme2.api.DME2Client;
-import com.att.aft.dme2.api.DME2Exception;
-import com.att.nsa.mr.client.HostSelector;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
- private static final String SUCCESS_MESSAGE = "Success";
-
- private Logger log = LoggerFactory.getLogger(this.getClass().getName());
-
- public static List<String> stringToList(String str) {
- final LinkedList<String> set = new LinkedList<String>();
- if (str != null) {
- final String[] parts = str.trim().split(",");
- for (String part : parts) {
- final String trimmed = part.trim();
- if (trimmed.length() > 0) {
- set.add(trimmed);
- }
- }
- }
- return set;
- }
-
- public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
- String apiSecret_password) throws MalformedURLException {
- this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
- false);
- }
-
- public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
- boolean allowSelfSignedCerts) throws MalformedURLException {
- super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
-
- fTopic = topic;
- fGroup = consumerGroup;
- fId = consumerId;
- fTimeoutMs = timeoutMs;
- fLimit = limit;
- fFilter = filter;
-
- fHostSelector = new HostSelector(hostPart);
- }
-
- @Override
- public Iterable<String> fetch() throws IOException, Exception {
- // fetch with the timeout and limit set in constructor
- return fetch(fTimeoutMs, fLimit);
- }
-
- @Override
- public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
- final LinkedList<String> msgs = new LinkedList<String>();
-
- // FIXME: the timeout on the socket needs to be at least as long as the
- // long poll
- // // sanity check for long poll timeout vs. socket read timeout
- // final int maxReasonableTimeoutMs =
- // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
- // if ( timeoutMs > maxReasonableTimeoutMs )
- // {
- // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
- // socket read timeout (" +
- // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
- // timeout to " + maxReasonableTimeoutMs + "." );
- // timeoutMs = maxReasonableTimeoutMs;
- // }
-
- // final String urlPath = createUrlPath ( timeoutMs, limit );
-
- // getLog().info ( "UEB GET " + urlPath );
- try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
- DMEConfigure(timeoutMs, limit);
- try {
- // getLog().info ( "Receiving msgs from: " +
- // url+subContextPath );
- long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs);
- //String reply = sender.sendAndWait(timeout);
- String reply = sender.sendAndWait(timeoutMs + 10000L);
- final JSONObject o = getResponseDataInJson(reply);
- // msgs.add(reply);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- // final int b = o.getInt("status" );
- // if ( a != null && a.length()>0 )
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- // msgs.add("DMAAP response status:
- // "+Integer.toString(b));
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
- // else if(a != null && a.length()<1){
- // msgs.add ("[]");
- // }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- // final String urlPath = createUrlPath
- // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
- // fId,props.getProperty("Protocol")), timeoutMs, limit );
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = get(urlPath, username, password, protocolFlag);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- final int b = o.getInt("status");
- // if ( a != null && a.length()>0 )
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- // msgs.add("DMAAP response status:
- // "+Integer.toString(b));
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
- // else if(a != null && a.length()<1)
- // {
- // msgs.add ("[]");
- // }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
- timeoutMs, limit);
-
- try {
- final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- final int b = o.getInt("status");
- // if ( a != null && a.length()>0)
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- // msgs.add("DMAAP response status:
- // "+Integer.toString(b));
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
- // else if(a != null && a.length()<1){
- // msgs.add ("[]");
- // }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
-
- }
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- final int b = o.getInt("status");
- // if ( a != null && a.length()>0)
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- // msgs.add("DMAAP response status:
- // "+Integer.toString(b));
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
-
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
-
- }
-
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- } catch (Exception e) {
- throw e;
- }
-
- return msgs;
- }
-
- private JSONObject getResponseDataInJson(String response) {
- try {
-
- // log.info("DMAAP response status: " + response.getStatus());
-
- // final String responseData = response.readEntity(String.class);
- JSONTokener jsonTokener = new JSONTokener(response);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- } else {
- jsonObject = new JSONObject(jsonTokener);
- }
-
- return jsonObject;
- } catch (JSONException excp) {
- log.error("DMAAP - Error reading response data.", excp);
- return null;
- }
-
- }
-
- private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
- JSONTokener jsonTokener = new JSONTokener(response);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if (null != response && response.length() == 0) {
- return null;
- }
-
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
- } else if ('{' == firstChar) {
- return null;
- } else if ('<' == firstChar) {
- return null;
- } else {
- jsonObject = new JSONObject(jsonTokener);
- }
-
- return jsonObject;
-
- }
-
- private final String fTopic;
- private final String fGroup;
- private final String fId;
- private final int fTimeoutMs;
- private final int fLimit;
- private String fFilter;
- private String username;
- private String password;
- private String host;
- HostSelector fHostSelector = null;
- private String latitude;
- private String longitude;
- private String version;
- private String serviceName;
- private String env;
- private String partner;
- private String routeOffer;
- private String subContextPath;
- private String protocol;
- private String methodType;
- private String url;
- private String dmeuser;
- private String dmepassword;
- private String contenttype;
- private DME2Client sender;
- public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- public String consumerFilePath;
- private String authKey;
- private String authDate;
- private Properties props;
- private HashMap<String, String> DMETimeOuts;
- private String handlers;
- public static final String routerFilePath = null;
- private long dme2ReplyHandlerTimeoutMs;
- private long longPollingMs;
- private static final long defaultDme2PerEndPointTimeoutMs = 10000L;
- private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L;
-
-
- public static String getRouterFilePath() {
- return routerFilePath;
- }
-
- public static void setRouterFilePath(String routerFilePath) {
- MRSimplerBatchPublisher.routerFilePath = routerFilePath;
- }
-
- public String getConsumerFilePath() {
- return consumerFilePath;
- }
-
- public void setConsumerFilePath(String consumerFilePath) {
- this.consumerFilePath = consumerFilePath;
- }
-
- public String getProtocolFlag() {
- return protocolFlag;
- }
-
- public void setProtocolFlag(String protocolFlag) {
- this.protocolFlag = protocolFlag;
- }
-
- private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
- this.longPollingMs = timeoutMs;
- latitude = props.getProperty("Latitude");
- longitude = props.getProperty("Longitude");
- version = props.getProperty("Version");
- serviceName = props.getProperty("ServiceName");
- env = props.getProperty("Environment");
- partner = props.getProperty("Partner");
- routeOffer = props.getProperty("routeOffer");
-
- subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
- // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
- // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
- // timeoutMs);
-
- protocol = props.getProperty("Protocol");
- methodType = props.getProperty("MethodType");
- dmeuser = props.getProperty("username");
- dmepassword = props.getProperty("password");
- contenttype = props.getProperty("contenttype");
- handlers = props.getProperty("sessionstickinessrequired");
- // url =protocol+"://DME2SEARCH/"+
- // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
- // url = protocol +
- // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
-
- /**
- * Changes to DME2Client url to use Partner for auto failover between
- * data centers When Partner value is not provided use the routeOffer
- * value for auto failover within a cluster
- */
-
- String preferredRouteKey = readRoute("preferredRouteKey");
-
- if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
- + "&routeoffer=" + preferredRouteKey;
- } else if (partner != null && !partner.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
- } else if (routeOffer != null && !routeOffer.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
- + routeOffer;
- }
-
- // log.info("url :"+url);
-
- if (timeoutMs != -1)
- url = url + "&timeout=" + timeoutMs;
- if (limit != -1)
- url = url + "&limit=" + limit;
-
- // Add filter to DME2 Url
- if (fFilter != null && fFilter.length() > 0)
- url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
-
- DMETimeOuts = new HashMap<String, String>();
- DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- DMETimeOuts.put("Content-Type", contenttype);
- System.setProperty("AFT_LATITUDE", latitude);
- System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
- // System.setProperty("DME2.DEBUG", "true");
-
- // SSL changes
- // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
- // "SSLv3,TLSv1,TLSv1.1");
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
- System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
- System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
- // SSL changes
-
- long dme2PerEndPointTimeoutMs;
- try {
- dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
- //backward compatibility
- if ( dme2PerEndPointTimeoutMs <= 0) {
- dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs;
- }
- } catch (NumberFormatException nfe) {
- //backward compatibility
- dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs;
- getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs);
- }
-
- try {
- dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
- } catch (NumberFormatException nfe) {
- try {
- long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
- getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs);
- }catch (NumberFormatException e) {
- //backward compatibility
- dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
- getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
- }
- }
- //backward compatibility
- if ( dme2ReplyHandlerTimeoutMs <= 0) {
- dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
- }
- sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs);
- sender.setAllowAllHttpReturnCodes(true);
- sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
- if (dmeuser != null && dmepassword != null) {
- sender.setCredentials(dmeuser, dmepassword);
- }
- sender.setHeaders(DMETimeOuts);
- sender.setPayload("");
-
- if (handlers!=null&&handlers.equalsIgnoreCase("yes")) {
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
- sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- } else {
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
- }
-
- public Properties getProps() {
- return props;
- }
-
- public void setProps(Properties props) {
- this.props = props;
- }
-
- protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
- final StringBuffer contexturl = new StringBuffer(url);
- // final StringBuffer url = new StringBuffer (
- // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
- final StringBuffer adds = new StringBuffer();
- if (timeoutMs > -1)
- adds.append("timeout=").append(timeoutMs);
- if (limit > -1) {
- if (adds.length() > 0) {
- adds.append("&");
- }
- adds.append("limit=").append(limit);
- }
- if (fFilter != null && fFilter.length() > 0) {
- try {
- if (adds.length() > 0) {
- adds.append("&");
- }
- adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- log.error("exception at createUrlPath () : ", e);
- }
- }
- if (adds.length() > 0) {
- contexturl.append("?").append(adds.toString());
- }
-
- // sender.setSubContext(url.toString());
- return contexturl.toString();
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getAuthKey() {
- return authKey;
- }
-
- public void setAuthKey(String authKey) {
- this.authKey = authKey;
- }
-
- public String getAuthDate() {
- return authDate;
- }
-
- public void setAuthDate(String authDate) {
- this.authDate = authDate;
- }
-
- public String getfFilter() {
- return fFilter;
- }
-
- public void setfFilter(String fFilter) {
- this.fFilter = fFilter;
- }
-
- private String readRoute(String routeKey) {
-
- try {
-
- MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
-
- } catch (Exception ex) {
- log.error("Reply Router Error " + ex);
- }
- String routeOffer = MRClientFactory.prop.getProperty(routeKey);
- return routeOffer;
- }
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse() {
-
- // fetch with the timeout and limit set in constructor
- return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
- }
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
- final LinkedList<String> msgs = new LinkedList<String>();
- MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
- try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
- DMEConfigure(timeoutMs, limit);
-
- long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs);
- String reply = sender.sendAndWait(timeout);
-
- final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
-
- }
- createMRConsumerResponse(reply, mrConsumerResponse);
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- /*
- * final String urlPath = createUrlPath(
- * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
- * props.getProperty("Protocol")), timeoutMs, limit);
- */
-
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
- String response = getResponse(urlPath, username, password, protocolFlag);
-
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
-
- }
- createMRConsumerResponse(response, mrConsumerResponse);
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
- timeoutMs, limit);
-
- String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
-
- }
- createMRConsumerResponse(response, mrConsumerResponse);
- }
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- // final String urlPath = createUrlPath(
- // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
- // props.getProperty("Protocol")), timeoutMs,
- // limit);
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
- final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
-
- }
- }
-
- }
- createMRConsumerResponse(response, mrConsumerResponse);
- }
-
- } catch (JSONException e) {
- mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("json exception: ", e);
- } catch (HttpException e) {
- mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("http exception: ", e);
- } catch (DME2Exception e) {
- mrConsumerResponse.setResponseCode(e.getErrorCode());
- mrConsumerResponse.setResponseMessage(e.getErrorMessage());
- log.error("DME2 exception: ", e);
- } catch (Exception e) {
- mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("exception: ", e);
- }
- mrConsumerResponse.setActualMessages(msgs);
- return mrConsumerResponse;
- }
-
- private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
-
- if (reply.startsWith("{")) {
- JSONObject jObject = new JSONObject(reply);
- String message = jObject.getString("message");
- int status = jObject.getInt("status");
-
- mrConsumerResponse.setResponseCode(Integer.toString(status));
-
- if (null != message) {
- mrConsumerResponse.setResponseMessage(message);
- }
- } else if (reply.startsWith("<")) {
- mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
- mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- } else {
- mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
- }
-
- }
-
+ private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+ public static final String routerFilePath = null;
+
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String consumerFilePath;
+
+ private static final String SUCCESS_MESSAGE = "Success";
+ private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
+ private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
+
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final int fTimeoutMs;
+ private final int fLimit;
+ private String fFilter;
+ private String username;
+ private String password;
+ private String host;
+ private HostSelector fHostSelector = null;
+ private String url;
+ private DME2Client sender;
+ private String authKey;
+ private String authDate;
+ private Properties props;
+ private HashMap<String, String> DMETimeOuts;
+ private long dme2ReplyHandlerTimeoutMs;
+ private long longPollingMs;
+
+ public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
+ String apiSecret_password) throws MalformedURLException {
+ this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
+ false);
+ }
+
+ public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
+ boolean allowSelfSignedCerts) throws MalformedURLException {
+ super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
+
+ fTopic = topic;
+ fGroup = consumerGroup;
+ fId = consumerId;
+ fTimeoutMs = timeoutMs;
+ fLimit = limit;
+ fFilter = filter;
+
+ fHostSelector = new HostSelector(hostPart);
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException, Exception {
+ // fetch with the timeout and limit set in constructor
+ return fetch(fTimeoutMs, fLimit);
+ }
+
+ @Override
+ public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
+ final LinkedList<String> msgs = new LinkedList<>();
+
+ try {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+ dmeConfigure(timeoutMs, limit);
+ try {
+ String reply = sender.sendAndWait(timeoutMs + 10000L);
+ final JSONObject o = getResponseDataInJson(reply);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ try {
+ final JSONObject o = get(urlPath, username, password, protocolFlag);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ timeoutMs, limit);
+
+ try {
+ final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ try {
+ final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ log.error("exception: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ } catch (Exception e) {
+ throw e;
+ }
+
+ return msgs;
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse() {
+ // fetch with the timeout and limit set in constructor
+ return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
+ final LinkedList<String> msgs = new LinkedList<String>();
+ MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
+ try {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+ dmeConfigure(timeoutMs, limit);
+
+ long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
+ : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
+ String reply = sender.sendAndWait(timeout);
+
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(reply, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ String response = getResponse(urlPath, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ timeoutMs, limit);
+
+ String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+ String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ } catch (JSONException e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ log.error("json exception: ", e);
+ } catch (HttpException e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ log.error("http exception: ", e);
+ } catch (DME2Exception e) {
+ mrConsumerResponse.setResponseCode(e.getErrorCode());
+ mrConsumerResponse.setResponseMessage(e.getErrorMessage());
+ log.error("DME2 exception: ", e);
+ } catch (Exception e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ log.error("exception: ", e);
+ }
+ mrConsumerResponse.setActualMessages(msgs);
+ return mrConsumerResponse;
+ }
+
+ private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
+ if (reply.startsWith("{")) {
+ JSONObject jObject = new JSONObject(reply);
+ String message = jObject.getString("message");
+ int status = jObject.getInt("status");
+
+ mrConsumerResponse.setResponseCode(Integer.toString(status));
+
+ if (null != message) {
+ mrConsumerResponse.setResponseMessage(message);
+ }
+ } else if (reply.startsWith("<")) {
+ mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
+ mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ } else {
+ mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
+ }
+ }
+
+ private JSONObject getResponseDataInJson(String response) {
+ try {
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ log.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+ }
+
+ private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if (null != response && response.length() == 0) {
+ return null;
+ }
+
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else if ('{' == firstChar) {
+ return null;
+ } else if ('<' == firstChar) {
+ return null;
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+ }
+
+ private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+ this.longPollingMs = timeoutMs;
+ String latitude = props.getProperty("Latitude");
+ String longitude = props.getProperty("Longitude");
+ String version = props.getProperty("Version");
+ String serviceName = props.getProperty("ServiceName");
+ String env = props.getProperty("Environment");
+ String partner = props.getProperty("Partner");
+ String routeOffer = props.getProperty("routeOffer");
+ String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
+ String protocol = props.getProperty("Protocol");
+ String methodType = props.getProperty("MethodType");
+ String dmeuser = props.getProperty("username");
+ String dmepassword = props.getProperty("password");
+ String contenttype = props.getProperty("contenttype");
+ String handlers = props.getProperty("sessionstickinessrequired");
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
+ * provided use the routeOffer value for auto failover within a cluster
+ */
+
+ String preferredRouteKey = readRoute("preferredRouteKey");
+
+ if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
+ + "&routeoffer=" + preferredRouteKey;
+ } else if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
+ }
+
+ if (timeoutMs != -1)
+ url = url + "&timeout=" + timeoutMs;
+ if (limit != -1)
+ url = url + "&limit=" + limit;
+
+ // Add filter to DME2 Url
+ if (fFilter != null && fFilter.length() > 0)
+ url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+
+ DMETimeOuts = new HashMap<>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contenttype);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+
+ // SSL changes
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+ // SSL changes
+
+ long dme2PerEndPointTimeoutMs;
+ try {
+ dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
+ // backward compatibility
+ if (dme2PerEndPointTimeoutMs <= 0) {
+ dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
+ }
+ } catch (NumberFormatException nfe) {
+ // backward compatibility
+ dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
+ getLog().debug(
+ "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
+ }
+
+ try {
+ dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
+ } catch (NumberFormatException nfe) {
+ try {
+ long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
+ getLog().debug(
+ "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
+ + dme2ReplyHandlerTimeoutMs);
+ } catch (NumberFormatException e) {
+ // backward compatibility
+ dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
+ getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
+ }
+ }
+ // backward compatibility
+ if (dme2ReplyHandlerTimeoutMs <= 0) {
+ dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
+ }
+
+ sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ if (dmeuser != null && dmepassword != null) {
+ sender.setCredentials(dmeuser, dmepassword);
+ }
+ sender.setHeaders(DMETimeOuts);
+ sender.setPayload("");
+ if (handlers != null && handlers.equalsIgnoreCase("yes")) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ }
+
+ protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
+ final StringBuilder contexturl = new StringBuilder(url);
+ final StringBuilder adds = new StringBuilder();
+
+ if (timeoutMs > -1) {
+ adds.append("timeout=").append(timeoutMs);
+ }
+
+ if (limit > -1) {
+ if (adds.length() > 0) {
+ adds.append("&");
+ }
+ adds.append("limit=").append(limit);
+ }
+
+ if (fFilter != null && fFilter.length() > 0) {
+ try {
+ if (adds.length() > 0) {
+ adds.append("&");
+ }
+ adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ log.error("exception at createUrlPath () : ", e);
+ }
+ }
+
+ if (adds.length() > 0) {
+ contexturl.append("?").append(adds.toString());
+ }
+
+ return contexturl.toString();
+ }
+
+ private String readRoute(String routeKey) {
+ try {
+ MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
+ } catch (Exception ex) {
+ log.error("Reply Router Error " + ex);
+ }
+ return MRClientFactory.prop.getProperty(routeKey);
+ }
+
+ public static List<String> stringToList(String str) {
+ final LinkedList<String> set = new LinkedList<>();
+ if (str != null) {
+ final String[] parts = str.trim().split(",");
+ for (String part : parts) {
+ final String trimmed = part.trim();
+ if (trimmed.length() > 0) {
+ set.add(trimmed);
+ }
+ }
+ }
+ return set;
+ }
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+
+ public String getConsumerFilePath() {
+ return consumerFilePath;
+ }
+
+ public void setConsumerFilePath(String consumerFilePath) {
+ this.consumerFilePath = consumerFilePath;
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
+
+ public String getfFilter() {
+ return fFilter;
+ }
+
+ public void setfFilter(String fFilter) {
+ this.fFilter = fFilter;
+ }
}