aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java288
1 files changed, 103 insertions, 185 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
index 0b06f77..6c67313 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
@@ -28,19 +28,13 @@ import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+
+import java.io.*;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
@@ -55,15 +49,27 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
private Logger log = LoggerFactory.getLogger(this.getClass().getName());
- public static final String routerFilePath = null;
+ public static final String ROUTER_FILE_PATH = null;
public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
public String consumerFilePath;
+ private static final String JSON_RESULT = "result";
+ private static final String PROPS_PROTOCOL = "Protocol";
+
+ private static final String EXECPTION_MESSAGE = "exception: ";
private static final String SUCCESS_MESSAGE = "Success";
private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
+ private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
+ private static final String URL_PARAM_PARTNER = "partner";
+ private static final String URL_PARAM_ENV_CONTEXT = "envContext";
+ private static final String URL_PARAM_VERSION = "version";
+ private static final String URL_PARAM_FILTER = "filter";
+ private static final String URL_PARAM_LIMIT = "limit";
+ private static final String URL_PARAM_TIMEOUT = "timeout";
+
private final String fTopic;
private final String fGroup;
private final String fId;
@@ -186,124 +192,67 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
final LinkedList<String> msgs = new LinkedList<>();
- try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
- dmeConfigure(timeoutMs, limit);
- try {
- String reply = sender.sendAndWait(timeoutMs + 10000L);
- final JSONObject o = getResponseDataInJson(reply);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = get(urlPath, username, password, protocolFlag);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
- timeoutMs, limit);
-
- try {
- final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
+ ProtocolTypeConstants protocolFlagEnum = null;
+ for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) {
+ if (type.getValue().equalsIgnoreCase(protocolFlag)) {
+ protocolFlagEnum = type;
}
+ }
+ if (protocolFlagEnum == null) {
+ return msgs;
+ }
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
- try {
- final JSONObject o = getNoAuth(urlPath);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- log.error("exception: ", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
+ try {
+ switch (protocolFlagEnum) {
+ case DME2:
+ dmeConfigure(timeoutMs, limit);
+ String reply = sender.sendAndWait(timeoutMs + 10000L);
+ readJsonData(msgs, getResponseDataInJson(reply));
+ break;
+ case AAF_AUTH:
+ String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
+ readJsonData(msgs, o);
+ break;
+ case AUTH_KEY:
+ final String urlKeyPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
+ timeoutMs, limit);
+ final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
+ readJsonData(msgs, authObject);
+ break;
+ case HTTPNOAUTH:
+ final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ readJsonData(msgs, getNoAuth(urlNoAuthPath));
+ break;
}
} catch (JSONException e) {
// unexpected response
reportProblemWithResponse();
- log.error("exception: ", e);
+ log.error(EXECPTION_MESSAGE, e);
} catch (HttpException e) {
throw new IOException(e);
- } catch (Exception e) {
- throw e;
}
return msgs;
}
+ private void readJsonData(LinkedList<String> msgs, JSONObject o) {
+ if (o != null) {
+ final JSONArray a = o.getJSONArray(JSON_RESULT);
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+ }
+ }
+ }
+ }
+
@Override
public MRConsumerResponse fetchWithReturnConsumerResponse() {
// fetch with the timeout and limit set in constructor
@@ -324,82 +273,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(reply, mrConsumerResponse);
}
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
String response = getResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(response, mrConsumerResponse);
}
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
timeoutMs, limit);
String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(response, mrConsumerResponse);
}
if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
-
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
- msgs.add(a.getString(i));
- else
- msgs.add(a.getJSONObject(i).toString());
- }
- }
- }
+ readJsonData(msgs, o);
createMRConsumerResponse(response, mrConsumerResponse);
}
@@ -418,7 +323,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
} catch (Exception e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("exception: ", e);
+ log.error(EXECPTION_MESSAGE, e);
}
mrConsumerResponse.setActualMessages(msgs);
return mrConsumerResponse;
@@ -460,7 +365,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
+ jsonObject.put(JSON_RESULT, jsonArray);
} else {
jsonObject = new JSONObject(jsonTokener);
}
@@ -484,7 +389,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
if ('[' == firstChar) {
JSONArray jsonArray = new JSONArray(jsonTokener);
jsonObject = new JSONObject();
- jsonObject.put("result", jsonArray);
+ jsonObject.put(JSON_RESULT, jsonArray);
} else if ('{' == firstChar) {
return null;
} else if ('<' == firstChar) {
@@ -506,7 +411,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
String partner = props.getProperty("Partner");
String routeOffer = props.getProperty("routeOffer");
String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
- String protocol = props.getProperty("Protocol");
+ String protocol = props.getProperty(PROPS_PROTOCOL);
String methodType = props.getProperty("MethodType");
String dmeuser = props.getProperty("username");
String dmepassword = props.getProperty("password");
@@ -519,25 +424,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
*/
String preferredRouteKey = readRoute("preferredRouteKey");
-
+ StringBuilder contextUrl = new StringBuilder();
if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
- + "&routeoffer=" + preferredRouteKey;
+ contextUrl.append(protocol).append("://").append(serviceName).append("?")
+ .append(URL_PARAM_VERSION).append("=").append(version).append("&")
+ .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
+ .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
+ .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
} else if (partner != null && !partner.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+ contextUrl.append(protocol).append("://").append(serviceName).append("?")
+ .append(URL_PARAM_VERSION).append("=").append(version).append("&")
+ .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
+ .append(URL_PARAM_PARTNER).append("=").append(partner);
} else if (routeOffer != null && !routeOffer.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
- + routeOffer;
+ contextUrl.append(protocol).append("://").append(serviceName).append("?")
+ .append(URL_PARAM_VERSION).append("=").append(version).append("&")
+ .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
+ .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
}
- if (timeoutMs != -1)
- url = url + "&timeout=" + timeoutMs;
- if (limit != -1)
- url = url + "&limit=" + limit;
+ if (timeoutMs != -1) {
+ contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
+ }
+ if (limit != -1) {
+ contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
+ }
// Add filter to DME2 Url
- if (fFilter != null && fFilter.length() > 0)
- url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+ if (fFilter != null && fFilter.length() > 0) {
+ contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8"));
+ }
+
+ url = contextUrl.toString();
DMETimeOuts = new HashMap<>();
DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
@@ -642,8 +560,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
private String readRoute(String routeKey) {
- try {
- MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
+ try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
+ MRClientFactory.prop.load(input);
} catch (Exception ex) {
log.error("Reply Router Error " + ex);
}
@@ -665,7 +583,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
public static String getRouterFilePath() {
- return routerFilePath;
+ return ROUTER_FILE_PATH;
}
public static void setRouterFilePath(String routerFilePath) {