summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorsunil.unnava <su622b@att.com>2019-03-19 19:30:36 -0400
committersunil.unnava <su622b@att.com>2019-03-19 19:30:58 -0400
commit0fe8e6c27fc96764731b5b2be0eb5e373f2028b4 (patch)
treeff1d222550f4915cb28c71d0addbdc6ced3ee2dc /src/main
parent989c4f4bf1e099a866147535a6b2b1f7226511e3 (diff)
update MM agent API
Issue-ID: DMAAP-908 Change-Id: Ie53c9042369fcfac9a4bc08f0e8541c432e792d0 Signed-off-by: sunil.unnava <su622b@att.com>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dmaap/service/MMRestService.java763
1 files changed, 475 insertions, 288 deletions
diff --git a/src/main/java/org/onap/dmaap/service/MMRestService.java b/src/main/java/org/onap/dmaap/service/MMRestService.java
index fc3c9a1..47f5062 100644
--- a/src/main/java/org/onap/dmaap/service/MMRestService.java
+++ b/src/main/java/org/onap/dmaap/service/MMRestService.java
@@ -19,7 +19,7 @@
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
- package org.onap.dmaap.service;
+package org.onap.dmaap.service;
import java.io.IOException;
import java.io.InputStream;
@@ -36,6 +36,8 @@ import javax.ws.rs.core.Context;
import org.json.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpStatus;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.springframework.beans.factory.annotation.Autowired;
@@ -63,6 +65,8 @@ import org.json.JSONException;
import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
+import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
@@ -115,6 +119,10 @@ public class MMRestService {
@Autowired
private DMaaPErrorMessages errorMessages;
+ private ErrorResponse errResJson = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
+ DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "", null, Utils.getFormattedDate(new Date()), topic,
+ null, null, "mirrorMakerAgent", null);
+
private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
/**
@@ -139,11 +147,11 @@ public class MMRestService {
@POST
@Produces("application/json")
@Path("/create")
- public void callCreateMirrorMaker(InputStream msg) {
+ public void callCreateMirrorMaker(InputStream msg) throws Exception {
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
loadProperty();
String input = null;
@@ -152,9 +160,9 @@ public class MMRestService {
InputStream inStream = null;
Gson gson = new Gson();
CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
-
+ LOGGER.info("Starting Create MirrorMaker");
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -166,76 +174,92 @@ public class MMRestService {
} catch (JsonSyntaxException ex) {
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JsonSyntaxException: ", ex);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
- String name = createMirrorMaker.getCreateMirrorMaker()==null? "":createMirrorMaker.getCreateMirrorMaker().getName();
+
// send error message if it is not a CreateMirrorMaker request.
if (createMirrorMaker.getCreateMirrorMaker() == null) {
- sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
- }
- // MirrorMaker whitelist and status should not be passed
- else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
- || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
- sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
+ errResJson.setErrorMessage("This is not a CreateMirrorMaker request. Please try again.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+ } else {
+ createMirrorMaker.validateJSON();
}
+ String name = createMirrorMaker.getCreateMirrorMaker().getName();
+
// if empty, blank name is entered
- else if (StringUtils.isBlank(name)) {
- sendErrResponse(ctx, "Name can not be empty or blank.");
+ if (StringUtils.isBlank(name)) {
+
+ errResJson.setErrorMessage("Name can not be empty or blank.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// Check if the name contains only Alpha Numeric
else if (!isAlphaNumeric(name)) {
- sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
- }
+ errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
- // Validate the IP and Port
- else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
- && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
- && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
- || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
- sendErrResponse(ctx, INVALID_IPPORT);
+ } else {
- }
- // Set a random number as messageID, convert Json Object to
- // InputStream and finally call publisher and subscriber
- else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
- && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
+ if (null == createMirrorMaker.getMessageID() || createMirrorMaker.getMessageID().isEmpty()) {
+ createMirrorMaker.setMessageID(randomStr);
+ }
+ inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8");
+ JSONObject existMirrorMaker = validateMMExists(ctx, name);
+ if (!(boolean) existMirrorMaker.get("exists")) {
+ JSONObject finalJsonObj = callPubSub(createMirrorMaker.getMessageID(), ctx, inStream, name,
+ false);
+ DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
+ } else {
- createMirrorMaker.setMessageID(randomStr);
- inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), UTF_8);
- callPubSub(randomStr, ctx, inStream);
+ errResJson.setErrorMessage("MirrorMaker " + name + " already exists");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
+ }
}
} catch (IOException e) {
- LOGGER.error("IOException: ", e);
+ throw e;
}
}
// Send error response if user does not provide Authorization
else {
- sendErrResponse(ctx, NO_ADMIN_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
+
}
@POST
@Produces("application/json")
@Path("/listall")
- public void callListAllMirrorMaker(InputStream msg) {
+ public void callListAllMirrorMaker(InputStream msg) throws Exception {
+
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
loadProperty();
String input = null;
+ Gson gson = new Gson();
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -248,51 +272,67 @@ public class MMRestService {
jsonOb = new JSONObject(input);
} catch (JSONException ex) {
-
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JSONException: ", ex);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// Check if request has listAllMirrorMaker and
// listAllMirrorMaker is empty
- if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker")
- && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
- jsonOb.put("messageID", randomStr);
+ if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) {
+
+ if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
+ jsonOb.put("messageID", randomStr);
+ }
+
InputStream inStream = null;
+ MirrorMaker mirrormaker = gson.fromJson(input, MirrorMaker.class);
try {
- inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
+ inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+ throw ioe;
}
- callPubSub(randomStr, ctx, inStream);
+ JSONObject responseJson = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name,
+ true);
+ DMaaPResponseBuilder.respondOk(ctx, responseJson);
} else {
- sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ "This is not a ListAllMirrorMaker request. Please try again.", null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+ throw ioe;
}
} else {
- sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
}
@POST
@Produces("application/json")
@Path("/update")
- public void callUpdateMirrorMaker(InputStream msg) {
-
+ public void callUpdateMirrorMaker(InputStream msg) throws Exception {
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
loadProperty();
String input = null;
@@ -301,9 +341,10 @@ public class MMRestService {
InputStream inStream = null;
Gson gson = new Gson();
UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
+ JSONObject jsonOb, jsonObInput = null;
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -312,80 +353,110 @@ public class MMRestService {
// Check if the request has UpdateMirrorMaker
try {
updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
+ jsonOb = new JSONObject(input);
+ jsonObInput = (JSONObject) jsonOb.get("updateMirrorMaker");
} catch (JsonSyntaxException ex) {
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JsonSyntaxException: ", ex);
-
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
- String name = updateMirrorMaker.getUpdateMirrorMaker()==null? "":updateMirrorMaker.getUpdateMirrorMaker().getName();
+
// send error message if it is not a UpdateMirrorMaker request.
if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
- sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
- }
- // MirrorMaker whitelist and status should not be passed
- else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
- || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
- sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
+ errResJson.setErrorMessage("This is not a UpdateMirrorMaker request. Please try again.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+ } else {
+ updateMirrorMaker.validateJSON(jsonObInput);
}
+ String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
// if empty, blank name is entered
- else if (StringUtils.isBlank(name)) {
- sendErrResponse(ctx, "Name can not be empty or blank.");
+ if (StringUtils.isBlank(name)) {
+
+ errResJson.setErrorMessage("Name can not be empty or blank.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// Check if the name contains only Alpha Numeric
else if (!isAlphaNumeric(name)) {
- sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
+ errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
- // Validate the IP and Port
- else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
- && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
- && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
- || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
- sendErrResponse(ctx, INVALID_IPPORT);
-
- }
// Set a random number as messageID, convert Json Object to
// InputStream and finally call publisher and subscriber
- else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
- && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
+ else {
+
+ if (null == updateMirrorMaker.getMessageID() || updateMirrorMaker.getMessageID().isEmpty()) {
+ updateMirrorMaker.setMessageID(randomStr);
+ }
+
+ JSONObject existMirrorMaker = validateMMExists(ctx, name);
+
+ if ((boolean) existMirrorMaker.get("exists")) {
+ JSONObject existMM = (JSONObject) existMirrorMaker.get("listMirrorMaker");
+
+ if (!jsonObInput.has("numStreams")) {
+ updateMirrorMaker.getUpdateMirrorMaker().setNumStreams(existMM.getInt("numStreams"));
+ }
+ if (!jsonObInput.has("enablelogCheck")) {
+ updateMirrorMaker.getUpdateMirrorMaker()
+ .setEnablelogCheck(existMM.getBoolean("enablelogCheck"));
+ }
+ inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8");
+ JSONObject finalJsonObj = callPubSub(updateMirrorMaker.getMessageID(), ctx, inStream, name,
+ false);
+ DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
+ } else {
+
+ errResJson.setErrorMessage("MirrorMaker " + name + " does not exist");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
+ }
- updateMirrorMaker.setMessageID(randomStr);
- inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), UTF_8);
- callPubSub(randomStr, ctx, inStream);
}
} catch (IOException e) {
- LOGGER.error("IOException: ", e);
+ e.printStackTrace();
}
}
// Send error response if user does not provide Authorization
else {
- sendErrResponse(ctx, NO_ADMIN_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
}
@POST
@Produces("application/json")
@Path("/delete")
- public void callDeleteMirrorMaker(InputStream msg) {
+ public void callDeleteMirrorMaker(InputStream msg) throws JSONException, Exception {
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
loadProperty();
String input = null;
+ Gson gson = new Gson();
+ MirrorMaker mirrormaker = new MirrorMaker();
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -396,46 +467,74 @@ public class MMRestService {
try {
jsonOb = new JSONObject(input);
+ mirrormaker = gson.fromJson(input, MirrorMaker.class);
} catch (JSONException ex) {
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JSONException: ", ex);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// Check if request has DeleteMirrorMaker and
// DeleteMirrorMaker has MirrorMaker object with name variable
// and check if the name contain only alpha numeric
- if ((jsonOb != null)
- && (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
- && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
- && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
- && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
- jsonOb.put("messageID", randomStr);
+ if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
+ && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
+ && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
+ && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) {
+
+ if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
+ jsonOb.put("messageID", randomStr);
+ }
+
InputStream inStream = null;
try {
- inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
+ inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+ ioe.printStackTrace();
}
+ JSONObject deleteMM = (JSONObject) jsonOb.getJSONObject("deleteMirrorMaker");
- callPubSub(randomStr, ctx, inStream);
+ JSONObject existMirrorMaker = validateMMExists(ctx, deleteMM.getString("name"));
+
+ if ((boolean) existMirrorMaker.get("exists")) {
+
+ JSONObject finalJsonObj = callPubSub(jsonOb.getString("messageID"), ctx, inStream,
+ mirrormaker.name, false);
+ DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
+ } else {
+
+ errResJson.setErrorMessage("MirrorMaker " + deleteMM.getString("name") + " does not exist");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
+ }
} else {
- sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
+ errResJson.setErrorMessage("This is not a DeleteMirrorMaker request. Please try again.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
}
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+
+ throw ioe;
}
} else {
- sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
}
@@ -503,17 +602,6 @@ public class MMRestService {
return false;
}
- // This method validate IPv4
- private boolean validateIPPort(String ipPort) {
- String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
- + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
- + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
- if (ipPort.matches(pattern)) {
- return true;
- }
- return false;
- }
-
private String checkJsonFormate(String jsonStr) {
String json = jsonStr;
@@ -533,51 +621,67 @@ public class MMRestService {
return hasPermission;
}
- private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
+ public JSONObject callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream, String name, boolean listAll)
+ throws Exception {
+ loadProperty();
+ JSONObject jsonObj = new JSONObject();
+ JSONObject finalJsonObj = new JSONObject();
+ JSONArray jsonArray = null;
try {
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
mirrorService.pushEvents(ctx, topic, inStream, null, null);
long startTime = System.currentTimeMillis();
- String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
- && (System.currentTimeMillis() - startTime) < timeout) {
+ && ((System.currentTimeMillis() - startTime) < timeout)) {
msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
- }
- JSONObject jsonObj;
- JSONObject finalJsonObj = new JSONObject();
- JSONArray jsonArray;
+ }
if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
&& isListMirrorMaker(msgFrmSubscribe, randomstr)) {
msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
- jsonArray = new JSONArray(msgFrmSubscribe);
- for (int i = 0; i < jsonArray.length(); i++) {
- jsonObj = jsonArray.getJSONObject(i);
+ jsonArray = new JSONArray(msgFrmSubscribe);
+ jsonObj = jsonArray.getJSONObject(0);
+ if (jsonObj.has("listMirrorMaker")) {
+ jsonArray = (JSONArray) jsonObj.get("listMirrorMaker");
+ if (true == listAll) {
+ return jsonObj;
+ } else {
+ for (int i = 0; i < jsonArray.length(); i++) {
+ jsonObj = jsonArray.getJSONObject(i);
+ if (null != name && !name.isEmpty()) {
+ if (jsonObj.getString("name").equals(name)) {
+ finalJsonObj.put("listMirrorMaker", jsonObj);
+ break;
+ }
+ } else {
+ finalJsonObj.put("listMirrorMaker", jsonObj);
+ }
- JSONObject obj = new JSONObject();
- if (jsonObj.has(MESSAGE)) {
- obj = jsonObj.getJSONObject(MESSAGE);
- }
- if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has(LISTMIRRORMAKER)) {
- finalJsonObj.put(LISTMIRRORMAKER, obj.get(LISTMIRRORMAKER));
- break;
+ }
}
}
-
- DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
+ return finalJsonObj;
} else {
- JSONObject err = new JSONObject();
- err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
- DMaaPResponseBuilder.respondOk(ctx, err);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
+ "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
}
} catch (Exception e) {
- LOGGER.error("Exception: ", e);
+
+ throw e;
}
+
}
private void sendErrResponse(DMaaPContext ctx, String errMsg) {
@@ -597,17 +701,17 @@ public class MMRestService {
@POST
@Produces("application/json")
@Path("/listallwhitelist")
- public void listWhiteList(InputStream msg) {
+ public void listWhiteList(InputStream msg) throws Exception {
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
loadProperty();
String input = null;
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -621,20 +725,22 @@ public class MMRestService {
} catch (JSONException ex) {
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JSONException: ", ex);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
}
// Check if the request has name and name contains only alpha
// numeric
// and check if the request has namespace and namespace contains
// only alpha numeric
- if (jsonOb != null && jsonOb.length() == 2 && jsonOb.has("name")
- && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
- && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
+ if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
+ && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
+ && !StringUtils.isBlank(jsonOb.getString("namespace"))) {
String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
+ "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
// Check if the user have create permission for the
// namespace
@@ -649,7 +755,9 @@ public class MMRestService {
} catch (JSONException e) {
- LOGGER.error("JSONException: ", e);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// set a random number as messageID
@@ -659,34 +767,22 @@ public class MMRestService {
// convert listAll Json object to InputStream object
try {
- inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+ ioe.printStackTrace();
}
- // call listAllMirrorMaker
- mirrorService.pushEvents(ctx, topic, inStream, null, null);
-
- // subscribe for listMirrorMaker
- long startTime = System.currentTimeMillis();
- String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
-
- while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
- && (System.currentTimeMillis() - startTime) < timeout) {
- msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
- }
-
- if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
- && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
-
- JSONArray listMirrorMaker;
- listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
+ JSONObject listMirrorMaker = new JSONObject();
+ listMirrorMaker = callPubSub(randomStr, ctx, inStream, null, true);
- String whitelist = null;
- for (int i = 0; i < listMirrorMaker.length(); i++) {
+ String whitelist = null;
+ JSONArray listMMArray = new JSONArray();
+ if (listMirrorMaker.has("listMirrorMaker")) {
+ listMMArray = (JSONArray) listMirrorMaker.get("listMirrorMaker");
+ for (int i = 0; i < listMMArray.length(); i++) {
- JSONObject mm;
- mm = listMirrorMaker.getJSONObject(i);
+ JSONObject mm = new JSONObject();
+ mm = listMMArray.getJSONObject(i);
String name = mm.getString("name");
if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
@@ -697,13 +793,13 @@ public class MMRestService {
if (!StringUtils.isBlank(whitelist)) {
- List<String> topicList = new ArrayList<>();
- List<String> finalTopicList = new ArrayList<>();
+ List<String> topicList = new ArrayList<String>();
+ List<String> finalTopicList = new ArrayList<String>();
topicList = Arrays.asList(whitelist.split(","));
for (String topic : topicList) {
if (topic != null && !topic.equals("null")
- && getNamespace(topic).equals(jsonOb.getString(NAMESPACE))) {
+ && getNamespace(topic).equals(jsonOb.getString("namespace"))) {
finalTopicList.add(topic);
}
@@ -711,7 +807,7 @@ public class MMRestService {
String topicNames = "";
- if (!finalTopicList.isEmpty()) {
+ if (finalTopicList.size() > 0) {
topicNames = StringUtils.join(finalTopicList, ",");
}
@@ -724,28 +820,40 @@ public class MMRestService {
} else {
- JSONObject err = new JSONObject();
- err.append(ERROR,
+ errResJson.setErrorMessage(
"listWhiteList is not available, please make sure MirrorMakerAgent is running");
- DMaaPResponseBuilder.respondOk(ctx, err);
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
}
} else {
- sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} else {
- sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
+ errResJson.setErrorMessage("This is not a ListAllWhitelist request. Please try again.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
- } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
- | TopicExistsException | missingReqdSetting | UnavailableException e) {
+ } catch (IOException e) {
- LOGGER.error("IOException: ", e);
+ e.printStackTrace();
}
} else {
- sendErrResponse(ctx, NO_USER_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
}
@@ -753,17 +861,17 @@ public class MMRestService {
@POST
@Produces("application/json")
@Path("/createwhitelist")
- public void createWhiteList(InputStream msg) {
+ public void createWhiteList(InputStream msg) throws Exception {
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
loadProperty();
String input = null;
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -776,9 +884,9 @@ public class MMRestService {
jsonOb = new JSONObject(input);
} catch (JSONException ex) {
-
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JSONException: ", ex);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// Check if the request has name and name contains only alpha
@@ -786,17 +894,16 @@ public class MMRestService {
// check if the request has namespace and
// check if the request has whitelistTopicName
// check if the topic name contains only alpha numeric
- if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
- && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
- && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
- && jsonOb.has("whitelistTopicName")
+ if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
+ && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
+ && !StringUtils.isBlank(jsonOb.getString("namespace")) && jsonOb.has("whitelistTopicName")
&& !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
&& isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
- jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
- jsonOb.getString("whitelistTopicName").length()))) {
+ jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb
+ .getString("whitelistTopicName").length()))) {
String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
+ "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
// Check if the user have create permission for the
// namespace
@@ -811,7 +918,9 @@ public class MMRestService {
} catch (JSONException e) {
- LOGGER.error("JSONException: ", e);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// set a random number as messageID
@@ -821,24 +930,24 @@ public class MMRestService {
// convert listAll Json object to InputStream object
try {
- inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+ ioe.printStackTrace();
}
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
// call listAllMirrorMaker
mirrorService.pushEvents(ctx, topic, inStream, null, null);
// subscribe for listMirrorMaker
long startTime = System.currentTimeMillis();
- String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
&& (System.currentTimeMillis() - startTime) < timeout) {
msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
}
- JSONArray listMirrorMaker;
+ JSONArray listMirrorMaker = null;
if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
&& isListMirrorMaker(msgFrmSubscribe, randomStr)) {
@@ -847,7 +956,7 @@ public class MMRestService {
String whitelist = null;
for (int i = 0; i < listMirrorMaker.length(); i++) {
- JSONObject mm;
+ JSONObject mm = new JSONObject();
mm = listMirrorMaker.getJSONObject(i);
String name = mm.getString("name");
@@ -857,8 +966,8 @@ public class MMRestService {
}
}
- List<String> topicList = new ArrayList<>();
- List<String> finalTopicList = new ArrayList<>();
+ List<String> topicList = new ArrayList<String>();
+ List<String> finalTopicList = new ArrayList<String>();
if (whitelist != null) {
topicList = Arrays.asList(whitelist.split(","));
@@ -873,7 +982,7 @@ public class MMRestService {
String newTopic = jsonOb.getString("whitelistTopicName");
if (!topicList.contains(newTopic)
- && getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
+ && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
UpdateWhiteList updateWhiteList = new UpdateWhiteList();
MirrorMaker mirrorMaker = new MirrorMaker();
@@ -881,7 +990,7 @@ public class MMRestService {
finalTopicList.add(newTopic);
String newWhitelist = "";
- if (!finalTopicList.isEmpty()) {
+ if (finalTopicList.size() > 0) {
newWhitelist = StringUtils.join(finalTopicList, ",");
}
@@ -893,44 +1002,70 @@ public class MMRestService {
Gson g = new Gson();
g.toJson(updateWhiteList);
- InputStream inputStream;
- inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
+ InputStream inputStream = null;
+ inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
// callPubSub(newRandom, ctx, inputStream);
- callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString(NAMESPACE));
+ callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
} else if (topicList.contains(newTopic)) {
- sendErrResponse(ctx, "The topic already exist.");
-
- } else if (!getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
- sendErrResponse(ctx,
- "The namespace of the topic does not match with the namespace you provided.");
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
+ DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The topic already exist.",
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
+ DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
+ "The namespace of the topic does not match with the namespace you provided.",
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} else {
- JSONObject err = new JSONObject();
- err.append(ERROR,
- "listWhiteList is not available, please make sure MirrorMakerAgent is running");
- DMaaPResponseBuilder.respondOk(ctx, err);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ "listWhiteList is not available, please make sure MirrorMakerAgent is running",
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} else {
- sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_CREATE_PERMISSION,
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} else {
- sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
+ errResJson.setErrorMessage("This is not a createWhitelist request. Please try again.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
} catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
| TopicExistsException | missingReqdSetting | UnavailableException e) {
- LOGGER.error("IOException: ", e);
+ throw e;
}
}
// Send error response if user does not provide Authorization
else {
- sendErrResponse(ctx, NO_USER_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
}
}
@@ -938,17 +1073,17 @@ public class MMRestService {
@POST
@Produces("application/json")
@Path("/deletewhitelist")
- public void deleteWhiteList(InputStream msg) {
+ public void deleteWhiteList(InputStream msg) throws Exception {
DMaaPContext ctx = getDmaapContext();
if (checkMirrorMakerPermission(ctx,
- AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
loadProperty();
String input = null;
try {
- input = IOUtils.toString(msg, UTF_8);
+ input = IOUtils.toString(msg, "UTF-8");
if (input != null && input.length() > 0) {
input = removeExtraChar(input);
@@ -962,23 +1097,24 @@ public class MMRestService {
} catch (JSONException ex) {
- sendErrResponse(ctx, errorMessages.getIncorrectJson());
- LOGGER.error("JSONException: ", ex);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
}
// Check if the request has name and name contains only alpha
// numeric,
// check if the request has namespace and
// check if the request has whitelistTopicName
- if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
- && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
- && jsonOb.has("whitelistTopicName")
+ if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
+ && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName")
&& isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
jsonOb.getString("whitelistTopicName").length()))) {
String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
+ "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
// Check if the user have create permission for the
// namespace
@@ -993,7 +1129,9 @@ public class MMRestService {
} catch (JSONException e) {
- LOGGER.error("JSONException: ", e);
+ errResJson.setErrorMessage(errorMessages.getIncorrectJson());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
// set a random number as messageID
@@ -1003,10 +1141,10 @@ public class MMRestService {
// convert listAll Json object to InputStream object
try {
- inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
} catch (IOException ioe) {
- LOGGER.error("IOException: ", ioe);
+ ioe.printStackTrace();
}
// call listAllMirrorMaker
mirrorService.pushEvents(ctx, topic, inStream, null, null);
@@ -1020,8 +1158,8 @@ public class MMRestService {
msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
}
- JSONObject jsonObj;
- JSONArray jsonArray;
+ JSONObject jsonObj = new JSONObject();
+ JSONArray jsonArray = null;
JSONArray listMirrorMaker = null;
if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
@@ -1032,32 +1170,26 @@ public class MMRestService {
for (int i = 0; i < jsonArray.length(); i++) {
jsonObj = jsonArray.getJSONObject(i);
- JSONObject obj = new JSONObject();
- if (jsonObj.has(MESSAGE)) {
- obj = jsonObj.getJSONObject(MESSAGE);
- }
- if (obj.has("messageID") && obj.get("messageID").equals(randomStr)
- && obj.has(LISTMIRRORMAKER)) {
- listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
+ if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
+ && jsonObj.has("listMirrorMaker")) {
+ listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
break;
}
}
String whitelist = null;
- if (listMirrorMaker != null) {
- for (int i = 0; i < listMirrorMaker.length(); i++) {
+ for (int i = 0; i < listMirrorMaker.length(); i++) {
- JSONObject mm = new JSONObject();
- mm = listMirrorMaker.getJSONObject(i);
- String name = mm.getString("name");
+ JSONObject mm = new JSONObject();
+ mm = listMirrorMaker.getJSONObject(i);
+ String name = mm.getString("name");
- if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
- whitelist = mm.getString("whitelist");
- break;
- }
+ if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
+ whitelist = mm.getString("whitelist");
+ break;
}
}
- List<String> topicList = new ArrayList<>();
+ List<String> topicList = new ArrayList<String>();
if (whitelist != null) {
topicList = Arrays.asList(whitelist.split(","));
@@ -1068,7 +1200,9 @@ public class MMRestService {
if (topicList.contains(topicToRemove)) {
removeTopic = true;
} else {
- sendErrResponse(ctx, "The topic does not exist.");
+ errResJson.setErrorMessage(errorMessages.getTopicNotExist());
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
}
if (removeTopic) {
@@ -1076,7 +1210,10 @@ public class MMRestService {
MirrorMaker mirrorMaker = new MirrorMaker();
mirrorMaker.setName(jsonOb.getString("name"));
- mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
+
+ if (StringUtils.isNotBlank((removeTopic(whitelist, topicToRemove)))) {
+ mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
+ }
String newRandom = getRandomNum();
@@ -1086,37 +1223,55 @@ public class MMRestService {
Gson g = new Gson();
g.toJson(updateWhiteList);
- InputStream inputStream;
- inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
- callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
+ InputStream inputStream = null;
+ inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
+ callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
+ // mmAgentUtil.getNamespace(topicToRemove));
}
} else {
- JSONObject err = new JSONObject();
- err.append(ERROR,
- "listWhiteList is not available, please make sure MirrorMakerAgent is running");
- DMaaPResponseBuilder.respondOk(ctx, err);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ "listWhiteList is not available, please make sure MirrorMakerAgent is running",
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} else {
- sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
+ null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} else {
- sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
+ errResJson.setErrorMessage("This is not a DeleteAllWhitelist request. Please try again.");
+ LOGGER.info(errResJson.toString());
+ throw new CambriaApiException(errResJson);
+
}
} catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
| TopicExistsException | missingReqdSetting | UnavailableException e) {
- LOGGER.error("IOException: ", e);
+ throw e;
}
}
// Send error response if user does not provide Authorization
else {
- sendErrResponse(ctx, NO_USER_PERMISSION);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
}
}
@@ -1146,20 +1301,24 @@ public class MMRestService {
return newWhitelist;
}
- private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
+ public void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, JSONObject jsonOb) {
+ loadProperty();
try {
+ String namespace = jsonOb.getString("namespace");
+ String mmName = jsonOb.getString("name");
+
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
mirrorService.pushEvents(ctx, topic, inStream, null, null);
long startTime = System.currentTimeMillis();
- String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
&& (System.currentTimeMillis() - startTime) < timeout) {
msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
}
- JSONObject jsonObj;
- JSONArray jsonArray;
+ JSONObject jsonObj = new JSONObject();
+ JSONArray jsonArray = null;
JSONArray jsonArrayNamespace = null;
if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
@@ -1170,12 +1329,9 @@ public class MMRestService {
for (int i = 0; i < jsonArray.length(); i++) {
jsonObj = jsonArray.getJSONObject(i);
- JSONObject obj = new JSONObject();
- if (jsonObj.has(MESSAGE)) {
- obj = jsonObj.getJSONObject(MESSAGE);
- }
- if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
- jsonArrayNamespace = obj.getJSONArray(LISTMIRRORMAKER);
+ if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
+ && jsonObj.has("listMirrorMaker")) {
+ jsonArrayNamespace = jsonObj.getJSONArray("listMirrorMaker");
}
}
JSONObject finalJasonObj = new JSONObject();
@@ -1183,35 +1339,31 @@ public class MMRestService {
for (int i = 0; i < jsonArrayNamespace.length(); i++) {
- JSONObject mmObj;
+ JSONObject mmObj = new JSONObject();
mmObj = jsonArrayNamespace.getJSONObject(i);
- String whitelist;
-
- if (mmObj.has("whitelist")) {
- whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
+ if (mmObj.has("name") && mmName.equals(mmObj.getString("name"))) {
- if (whitelist != null) {
- mmObj.remove("whitelist");
- mmObj.put("whitelist", whitelist);
- } else {
- mmObj.remove("whitelist");
- }
+ finalJsonArray.put(mmObj);
}
- finalJsonArray.put(mmObj);
+
}
- finalJasonObj.put(LISTMIRRORMAKER, finalJsonArray);
+ finalJasonObj.put("listMirrorMaker", finalJsonArray);
DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
} else {
- JSONObject err = new JSONObject();
- err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
- DMaaPResponseBuilder.respondOk(ctx, err);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
+ "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
+ Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
+ ctx.getRequest().getRemoteHost());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
}
} catch (Exception e) {
- LOGGER.error("Exception: ", e);
+ e.printStackTrace();
}
}
@@ -1256,4 +1408,39 @@ public class MMRestService {
}
return listMirrorMaker;
}
+
+ public JSONObject validateMMExists(DMaaPContext ctx, String name) throws Exception {
+ // Create a listAllMirrorMaker Json object
+ JSONObject listAll = new JSONObject();
+ try {
+ listAll.put("listAllMirrorMaker", new JSONObject());
+
+ } catch (JSONException e) {
+
+ e.printStackTrace();
+ }
+
+ // set a random number as messageID
+ String randomStr = getRandomNum();
+ listAll.put("messageID", randomStr);
+ InputStream inStream = null;
+
+ // convert listAll Json object to InputStream object
+ try {
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ JSONObject listMirrorMaker = new JSONObject();
+ listMirrorMaker = callPubSub(randomStr, ctx, inStream, name, false);
+ if (null != listMirrorMaker && listMirrorMaker.length() > 0) {
+ listMirrorMaker.put("exists", true);
+ return listMirrorMaker;
+
+ }
+ listMirrorMaker.put("exists", false);
+ return listMirrorMaker;
+
+ }
}