diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java | 288 |
1 files changed, 103 insertions, 185 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java index 0b06f77..6c67313 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java @@ -28,19 +28,13 @@ import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.UnsupportedEncodingException; + +import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.http.HttpException; import org.apache.http.HttpStatus; @@ -55,15 +49,27 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { private Logger log = LoggerFactory.getLogger(this.getClass().getName()); - public static final String routerFilePath = null; + public static final String ROUTER_FILE_PATH = null; public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); public String consumerFilePath; + private static final String JSON_RESULT = "result"; + private static final String PROPS_PROTOCOL = "Protocol"; + + private static final String EXECPTION_MESSAGE = "exception: "; private static final String SUCCESS_MESSAGE = "Success"; private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L; private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L; + private static final String URL_PARAM_ROUTE_OFFER = "routeoffer"; + private static final String URL_PARAM_PARTNER = "partner"; + private static final String URL_PARAM_ENV_CONTEXT = "envContext"; + private static final String URL_PARAM_VERSION = "version"; + private static final String URL_PARAM_FILTER = "filter"; + private static final String URL_PARAM_LIMIT = "limit"; + private static final String URL_PARAM_TIMEOUT = "timeout"; + private final String fTopic; private final String fGroup; private final String fId; @@ -186,124 +192,67 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { public Iterable<String> fetch(int timeoutMs, int limit) throws Exception { final LinkedList<String> msgs = new LinkedList<>(); - try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - dmeConfigure(timeoutMs, limit); - try { - String reply = sender.sendAndWait(timeoutMs + 10000L); - final JSONObject o = getResponseDataInJson(reply); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - try { - final JSONObject o = get(urlPath, username, password, protocolFlag); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), - timeoutMs, limit); - - try { - final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } + ProtocolTypeConstants protocolFlagEnum = null; + for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) { + if (type.getValue().equalsIgnoreCase(protocolFlag)) { + protocolFlagEnum = type; } + } + if (protocolFlagEnum == null) { + return msgs; + } - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - try { - final JSONObject o = getNoAuth(urlPath); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } + try { + switch (protocolFlagEnum) { + case DME2: + dmeConfigure(timeoutMs, limit); + String reply = sender.sendAndWait(timeoutMs + 10000L); + readJsonData(msgs, getResponseDataInJson(reply)); + break; + case AAF_AUTH: + String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + final JSONObject o = get(urlAuthPath, username, password, protocolFlag); + readJsonData(msgs, o); + break; + case AUTH_KEY: + final String urlKeyPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)), + timeoutMs, limit); + final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag); + readJsonData(msgs, authObject); + break; + case HTTPNOAUTH: + final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + readJsonData(msgs, getNoAuth(urlNoAuthPath)); + break; } } catch (JSONException e) { // unexpected response reportProblemWithResponse(); - log.error("exception: ", e); + log.error(EXECPTION_MESSAGE, e); } catch (HttpException e) { throw new IOException(e); - } catch (Exception e) { - throw e; } return msgs; } + private void readJsonData(LinkedList<String> msgs, JSONObject o) { + if (o != null) { + final JSONArray a = o.getJSONArray(JSON_RESULT); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + } + @Override public MRConsumerResponse fetchWithReturnConsumerResponse() { // fetch with the timeout and limit set in constructor @@ -324,82 +273,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(reply, mrConsumerResponse); } if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); String response = getResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(response, mrConsumerResponse); } if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(response, mrConsumerResponse); } if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); String response = getNoAuthResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(response, mrConsumerResponse); } @@ -418,7 +323,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } catch (Exception e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("exception: ", e); + log.error(EXECPTION_MESSAGE, e); } mrConsumerResponse.setActualMessages(msgs); return mrConsumerResponse; @@ -460,7 +365,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); + jsonObject.put(JSON_RESULT, jsonArray); } else { jsonObject = new JSONObject(jsonTokener); } @@ -484,7 +389,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); + jsonObject.put(JSON_RESULT, jsonArray); } else if ('{' == firstChar) { return null; } else if ('<' == firstChar) { @@ -506,7 +411,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { String partner = props.getProperty("Partner"); String routeOffer = props.getProperty("routeOffer"); String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId; - String protocol = props.getProperty("Protocol"); + String protocol = props.getProperty(PROPS_PROTOCOL); String methodType = props.getProperty("MethodType"); String dmeuser = props.getProperty("username"); String dmepassword = props.getProperty("password"); @@ -519,25 +424,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { */ String preferredRouteKey = readRoute("preferredRouteKey"); - + StringBuilder contextUrl = new StringBuilder(); if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner - + "&routeoffer=" + preferredRouteKey; + contextUrl.append(protocol).append("://").append(serviceName).append("?") + .append(URL_PARAM_VERSION).append("=").append(version).append("&") + .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&") + .append(URL_PARAM_PARTNER).append("=").append(partner).append("&") + .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey); } else if (partner != null && !partner.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner; + contextUrl.append(protocol).append("://").append(serviceName).append("?") + .append(URL_PARAM_VERSION).append("=").append(version).append("&") + .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&") + .append(URL_PARAM_PARTNER).append("=").append(partner); } else if (routeOffer != null && !routeOffer.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" - + routeOffer; + contextUrl.append(protocol).append("://").append(serviceName).append("?") + .append(URL_PARAM_VERSION).append("=").append(version).append("&") + .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&") + .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer); } - if (timeoutMs != -1) - url = url + "&timeout=" + timeoutMs; - if (limit != -1) - url = url + "&limit=" + limit; + if (timeoutMs != -1) { + contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs); + } + if (limit != -1) { + contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit); + } // Add filter to DME2 Url - if (fFilter != null && fFilter.length() > 0) - url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8"); + if (fFilter != null && fFilter.length() > 0) { + contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8")); + } + + url = contextUrl.toString(); DMETimeOuts = new HashMap<>(); DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); @@ -642,8 +560,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } private String readRoute(String routeKey) { - try { - MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath))); + try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) { + MRClientFactory.prop.load(input); } catch (Exception ex) { log.error("Reply Router Error " + ex); } @@ -665,7 +583,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } public static String getRouterFilePath() { - return routerFilePath; + return ROUTER_FILE_PATH; } public static void setRouterFilePath(String routerFilePath) { |