summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorSunil Unnava <su622b@att.com>2018-02-28 14:21:40 -0500
committerSunil Unnava <su622b@att.com>2018-02-28 14:26:14 -0500
commitfbe26a20e749c3177f84cd05c67e757ac107b9a8 (patch)
tree6d452c8b04e67d9d85312b964d19d396f79cbbf0 /src/main/java
parent0d2012df4ac79e41371fba15241c5efcb327a7ad (diff)
changes for code coverage
Issue-ID: DMAAP-271 Change-Id: Ib8fd245fdbe9ef3457e8303c5379977b237ec09a Signed-off-by: Sunil Unnava <su622b@att.com>
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/EventsRestService.java116
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/MMRestService.java96
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/TopicRestService.java662
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java43
4 files changed, 427 insertions, 490 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
index cda431c..6fbfd01 100644
--- a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
+++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
@@ -56,9 +56,10 @@ import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+
/**
- * This class is a CXF REST service which acts
- * as gateway for MR Event Service.
+ * This class is a CXF REST service which acts as gateway for MR Event Service.
+ *
* @author author
*
*/
@@ -69,7 +70,8 @@ public class EventsRestService {
/**
* Logger obj
*/
- //private Logger log = Logger.getLogger(EventsRestService.class.toString());
+ // private Logger log =
+ // Logger.getLogger(EventsRestService.class.toString());
private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
/**
* HttpServletRequest obj
@@ -83,7 +85,6 @@ public class EventsRestService {
@Context
private HttpServletResponse response;
-
/**
* Config Reader
*/
@@ -97,6 +98,8 @@ public class EventsRestService {
@Autowired
private DMaaPErrorMessages errorMessages;
+ private DMaaPContext dmaapContext = new DMaaPContext();
+
/**
* This method is used to consume messages.Taking three parameter
* topic,consumerGroup and consumerId .Consumer decide to which topic they
@@ -118,50 +121,44 @@ public class EventsRestService {
*/
@GET
@Path("/{topic}/{consumergroup}/{consumerid}")
- public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
- String consumergroup,
+ public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup,
@PathParam("consumerid") String consumerid) throws CambriaApiException {
// log.info("Consuming message from topic " + topic );
- DMaaPContext dMaaPContext = getDmaapContext();
- dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+ dmaapContext = getDmaapContext();
+ dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
try {
- eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
- }
- catch (TopicExistsException e) {
+ eventsService.getEvents(dmaapContext, topic, consumergroup, consumerid);
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
- consumerid,
- request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
+ errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, null, null, consumerid, request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
- consumerid,
- request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
+ errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, null, null, consumerid, request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
+
catch (ConfigDbException | UnavailableException | IOException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
-
+
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
- consumerid,
- request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
+ errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, null, null, consumerid, request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
@@ -192,36 +189,33 @@ public class EventsRestService {
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
- }
- catch ( TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
@@ -250,40 +244,37 @@ public class EventsRestService {
// );
try {
- eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
- partitionKey,
+ eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey,
Utils.getFormattedDate(new Date()));
- }
-
- catch ( TopicExistsException e) {
+ }
+
+ catch (TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic : " + topic, e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
- + errorMessages.getPublishMsgError() + e.getMessage(), null,
- Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
- request.getRemoteHost(),
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ "Transaction-" + errorMessages.getPublishMsgError() + e.getMessage(), null,
+ Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), request.getRemoteHost(),
null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
@@ -302,7 +293,6 @@ public class EventsRestService {
*/
private DMaaPContext getDmaapContext() {
- DMaaPContext dmaapContext = new DMaaPContext();
dmaapContext.setRequest(request);
dmaapContext.setResponse(response);
dmaapContext.setConfigReader(configReader);
diff --git a/src/main/java/com/att/nsa/dmaap/service/MMRestService.java b/src/main/java/com/att/nsa/dmaap/service/MMRestService.java
index ab457e5..a715e1f 100644
--- a/src/main/java/com/att/nsa/dmaap/service/MMRestService.java
+++ b/src/main/java/com/att/nsa/dmaap/service/MMRestService.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.POST;
@@ -115,6 +114,8 @@ public class MMRestService {
@Autowired
private DMaaPErrorMessages errorMessages;
+ private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
+
/**
* This method is used for taking Configuration Object,HttpServletRequest
* Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
@@ -178,7 +179,7 @@ public class MMRestService {
|| createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
}
-
+
// if empty, blank name is entered
else if (StringUtils.isBlank(name)) {
sendErrResponse(ctx, "Name can not be empty or blank.");
@@ -253,8 +254,8 @@ public class MMRestService {
// Check if request has listAllMirrorMaker and
// listAllMirrorMaker is empty
- if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker") &&
- jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
+ if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker")
+ && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
jsonOb.put("messageID", randomStr);
InputStream inStream = null;
@@ -329,7 +330,7 @@ public class MMRestService {
|| updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
}
-
+
// if empty, blank name is entered
else if (StringUtils.isBlank(name)) {
sendErrResponse(ctx, "Name can not be empty or blank.");
@@ -405,11 +406,11 @@ public class MMRestService {
// Check if request has DeleteMirrorMaker and
// DeleteMirrorMaker has MirrorMaker object with name variable
// and check if the name contain only alpha numeric
- if ((jsonOb != null) && (jsonOb.has("deleteMirrorMaker")
- && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
- && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
- && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
- && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
+ if ((jsonOb != null)
+ && (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
+ && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
+ && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
+ && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
jsonOb.put("messageID", randomStr);
InputStream inStream = null;
@@ -451,7 +452,7 @@ public class MMRestService {
for (int i = 0; i < jArray.length(); i++) {
jObj = jArray.getJSONObject(i);
-
+
JSONObject obj = new JSONObject();
if (jObj.has(MESSAGE)) {
obj = jObj.getJSONObject(MESSAGE);
@@ -526,9 +527,7 @@ public class MMRestService {
boolean hasPermission = false;
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-
- if (aaf.aafAuthentication(ctx.getRequest(), permission)) {
+ if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
hasPermission = true;
}
return hasPermission;
@@ -556,7 +555,7 @@ public class MMRestService {
for (int i = 0; i < jsonArray.length(); i++) {
jsonObj = jsonArray.getJSONObject(i);
-
+
JSONObject obj = new JSONObject();
if (jsonObj.has(MESSAGE)) {
obj = jsonObj.getJSONObject(MESSAGE);
@@ -577,7 +576,7 @@ public class MMRestService {
}
} catch (Exception e) {
- LOGGER.error("Exception: ", e);
+ LOGGER.error("Exception: ", e);
}
}
@@ -630,11 +629,9 @@ public class MMRestService {
// numeric
// and check if the request has namespace and namespace contains
// only alpha numeric
- if (jsonOb != null
- && jsonOb.length() == 2 && jsonOb.has("name")
- && !StringUtils.isBlank(jsonOb.getString("name"))
- && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
- && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
+ if (jsonOb != null && jsonOb.length() == 2 && jsonOb.has("name")
+ && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
+ && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
@@ -681,7 +678,7 @@ public class MMRestService {
if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
&& isListMirrorMaker(msgFrmSubscribe, randomStr)) {
-
+
JSONArray listMirrorMaker;
listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
@@ -784,16 +781,18 @@ public class MMRestService {
LOGGER.error("JSONException: ", ex);
}
- // Check if the request has name and name contains only alpha numeric,
+ // Check if the request has name and name contains only alpha
+ // numeric,
// check if the request has namespace and
// check if the request has whitelistTopicName
// check if the topic name contains only alpha numeric
if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
- && !StringUtils.isBlank(jsonOb.getString("name"))
- && isAlphaNumeric(jsonOb.getString("name"))
+ && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
&& jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
- && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
- && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
+ && jsonOb.has("whitelistTopicName")
+ && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
+ && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
+ jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
jsonOb.getString("whitelistTopicName").length()))) {
String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -843,7 +842,7 @@ public class MMRestService {
if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
&& isListMirrorMaker(msgFrmSubscribe, randomStr)) {
-
+
listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
String whitelist = null;
@@ -967,12 +966,15 @@ public class MMRestService {
LOGGER.error("JSONException: ", ex);
}
- // Check if the request has name and name contains only alpha numeric,
+ // Check if the request has name and name contains only alpha
+ // numeric,
// check if the request has namespace and
// check if the request has whitelistTopicName
- if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
- && jsonOb.has(NAMESPACE) && jsonOb.has("whitelistTopicName")
- && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
+ if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
+ && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
+ && jsonOb.has("whitelistTopicName")
+ && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
+ jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
jsonOb.getString("whitelistTopicName").length()))) {
String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -1029,12 +1031,13 @@ public class MMRestService {
for (int i = 0; i < jsonArray.length(); i++) {
jsonObj = jsonArray.getJSONObject(i);
-
+
JSONObject obj = new JSONObject();
if (jsonObj.has(MESSAGE)) {
obj = jsonObj.getJSONObject(MESSAGE);
}
- if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
+ if (obj.has("messageID") && obj.get("messageID").equals(randomStr)
+ && obj.has(LISTMIRRORMAKER)) {
listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
break;
}
@@ -1068,22 +1071,21 @@ public class MMRestService {
sendErrResponse(ctx, "The topic does not exist.");
}
-
if (removeTopic) {
UpdateWhiteList updateWhiteList = new UpdateWhiteList();
MirrorMaker mirrorMaker = new MirrorMaker();
-
+
mirrorMaker.setName(jsonOb.getString("name"));
mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
-
+
String newRandom = getRandomNum();
-
+
updateWhiteList.setMessageID(newRandom);
updateWhiteList.setUpdateWhiteList(mirrorMaker);
-
+
Gson g = new Gson();
g.toJson(updateWhiteList);
-
+
InputStream inputStream;
inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
@@ -1132,7 +1134,7 @@ public class MMRestService {
}
if (topicList.contains(topicToRemove)) {
- for (String topic: topicList) {
+ for (String topic : topicList) {
if (!topic.equals(topicToRemove)) {
newTopicList.add(topic);
}
@@ -1145,7 +1147,7 @@ public class MMRestService {
}
private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
-
+
try {
mirrorService.pushEvents(ctx, topic, inStream, null, null);
long startTime = System.currentTimeMillis();
@@ -1167,7 +1169,7 @@ public class MMRestService {
for (int i = 0; i < jsonArray.length(); i++) {
jsonObj = jsonArray.getJSONObject(i);
-
+
JSONObject obj = new JSONObject();
if (jsonObj.has(MESSAGE)) {
obj = jsonObj.getJSONObject(MESSAGE);
@@ -1231,18 +1233,18 @@ public class MMRestService {
return whitelist;
}
-
+
private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
JSONObject jsonObj;
JSONArray jsonArray;
JSONArray listMirrorMaker = new JSONArray();
-
+
msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
jsonArray = new JSONArray(msgFrmSubscribe);
for (int i = 0; i < jsonArray.length(); i++) {
jsonObj = jsonArray.getJSONObject(i);
-
+
JSONObject obj = new JSONObject();
if (jsonObj.has(MESSAGE)) {
obj = jsonObj.getJSONObject(MESSAGE);
@@ -1252,6 +1254,6 @@ public class MMRestService {
break;
}
}
- return listMirrorMaker;
+ return listMirrorMaker;
}
}
diff --git a/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java b/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java
index 6742cd5..50bd069 100644
--- a/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java
+++ b/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java
@@ -62,8 +62,8 @@ import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
/**
- * This class is a CXF REST service which acts
- * as gateway for MR Topic Service.
+ * This class is a CXF REST service which acts as gateway for MR Topic Service.
+ *
* @author author
*
*/
@@ -75,7 +75,8 @@ public class TopicRestService {
/**
* Logger obj
*/
- //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class);
+ // private static final Logger LOGGER = Logger
+ // .getLogger(TopicRestService.class);
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
/**
* Config Reader
@@ -100,76 +101,72 @@ public class TopicRestService {
* TopicService obj
*/
@Autowired
- private TopicService topicService;
-
+ private TopicService tService;
+
/**
* DMaaPErrorMessages obj
*/
@Autowired
private DMaaPErrorMessages errorMessages;
-
+
+ private DMaaPContext dmaapContext = new DMaaPContext();
+
/**
* mrNamespace
*/
- //@Value("${msgRtr.namespace.aaf}")
-// private String mrNamespace;
-
+ // @Value("${msgRtr.namespace.aaf}")
+ // private String mrNamespace;
/**
* Fetches a list of topics from the current kafka instance and converted
* into json object.
*
* @return list of the topics in json format
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
* @throws IOException
* @throws JSONException
- * */
+ */
@GET
- //@Produces(MediaType.TEXT_PLAIN)
+ // @Produces(MediaType.TEXT_PLAIN)
public void getTopics() throws CambriaApiException {
try {
-
+
LOGGER.info("Authenticating the user before fetching the topics");
- //String permission = "com.att.dmaap.mr.topic|*|view";
- String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
- String permission =mrNameS+"|"+"*"+"|"+"view";
+ // String permission = "com.att.dmaap.mr.topic|*|view";
+ String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
+ "msgRtr.namespace.aaf");
+ String permission = mrNameS + "|" + "*" + "|" + "view";
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- //Check if client is using AAF CADI Basic Authorization
- //If yes then check for AAF role authentication else display all topics
- if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
- {
- if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
- {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ // Check if client is using AAF CADI Basic Authorization
+ // If yes then check for AAF role authentication else display all
+ // topics
+ if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
+ if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
LOGGER.info(errRes.toString());
throw new DMaaPAccessDeniedException(errRes);
-
-
+
}
- }
-
- LOGGER.info("Fetching all Topics");
-
- topicService.getTopics(getDmaapContext());
-
- LOGGER.info("Returning List of all Topics");
-
-
+ }
+
+ LOGGER.info("Fetching all Topics");
+
+ tService.getTopics(getDmaapContext());
+
+ LOGGER.info("Returning List of all Topics");
+
} catch (JSONException | ConfigDbException | IOException excp) {
- LOGGER.error(
- "Failed to retrieve list of all topics: "
- + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
- errorMessages.getTopicsfailure()+ excp.getMessage());
+ LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
@@ -179,62 +176,56 @@ public class TopicRestService {
* into json object.
*
* @return list of the topics in json format
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
* @throws IOException
* @throws JSONException
- * */
+ */
@GET
@Path("/listAll")
- //@Produces(MediaType.TEXT_PLAIN)
+ // @Produces(MediaType.TEXT_PLAIN)
public void getAllTopics() throws CambriaApiException {
try {
-
+
LOGGER.info("Authenticating the user before fetching the topics");
- //String permission = "com.att.dmaap.mr.topic|*|view";
- String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
- String permission =mrNameS+"|"+"*"+"|"+"view";
+ // String permission = "com.att.dmaap.mr.topic|*|view";
+ String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
+ "msgRtr.namespace.aaf");
+ String permission = mrNameS + "|" + "*" + "|" + "view";
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- //Check if client is using AAF CADI Basic Authorization
- //If yes then check for AAF role authentication else display all topics
- if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
- {
- if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
- {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ // Check if client is using AAF CADI Basic Authorization
+ // If yes then check for AAF role authentication else display all
+ // topics
+ if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
+ if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
LOGGER.info(errRes.toString());
throw new DMaaPAccessDeniedException(errRes);
-
-
+
}
- }
-
- LOGGER.info("Fetching all Topics");
-
- topicService.getAllTopics(getDmaapContext());
-
- LOGGER.info("Returning List of all Topics");
-
-
+ }
+
+ LOGGER.info("Fetching all Topics");
+
+ tService.getAllTopics(getDmaapContext());
+
+ LOGGER.info("Returning List of all Topics");
+
} catch (JSONException | ConfigDbException | IOException excp) {
- LOGGER.error(
- "Failed to retrieve list of all topics: "
- + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
- errorMessages.getTopicsfailure()+ excp.getMessage());
+ LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
-
/**
* Returns details of the topic whose name is passed as a parameter
*
@@ -242,59 +233,55 @@ public class TopicRestService {
* - name of the topic
* @return details of a topic whose name is mentioned in the request in json
* format.
- * @throws AccessDeniedException
- * @throws DMaaPAccessDeniedException
+ * @throws AccessDeniedException
+ * @throws DMaaPAccessDeniedException
* @throws IOException
- * */
+ */
@GET
@Path("/{topicName}")
- //@Produces(MediaType.TEXT_PLAIN)
+ // @Produces(MediaType.TEXT_PLAIN)
public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
- LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
+
+ LOGGER.info("Authenticating the user before fetching the details about topic = " + topicName);
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-
- //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
-
- //Check if client is using AAF CADI Basic Authorization
- //If yes then check for AAF role authentication else display all topics
- if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
- {
+
+ // String permission=
+ // "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
+
+ // Check if client is using AAF CADI Basic Authorization
+ // If yes then check for AAF role authentication else display all
+ // topics
+ if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
String permission = aaf.aafPermissionString(topicName, "view");
- if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
- {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
- LOGGER.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
- }
- }
-
- LOGGER.info("Fetching Topic: " + topicName);
-
- topicService.getTopic(getDmaapContext(), topicName);
-
- LOGGER.info("Fetched details of topic: " + topicName);
-
+ if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+ }
+
+ LOGGER.info("Fetching Topic: " + topicName);
+
+ tService.getTopic(getDmaapContext(), topicName);
+
+ LOGGER.info("Fetched details of topic: " + topicName);
+
} catch (ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Failed to retrieve details of topic: " + topicName,
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
- errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
+ LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
+ errorMessages.getTopicDetailsFail() + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
-
+
}
}
-
-
/**
* This method is still not working. Need to check on post call and how to
* accept parameters for post call
@@ -303,57 +290,50 @@ public class TopicRestService {
* it will have the bean object
* @throws TopicExistsException
* @throws CambriaApiException
- * @throws JSONException
+ * @throws JSONException
* @throws IOException
* @throws AccessDeniedException
*
- * */
+ */
@POST
@Path("/create")
@Consumes({ MediaType.APPLICATION_JSON })
- //@Produces(MediaType.TEXT_PLAIN)
+ // @Produces(MediaType.TEXT_PLAIN)
public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
- try {
- LOGGER.info("Creating Topic."+topicBean.getTopicName());
-
- topicService.createTopic(getDmaapContext(), topicBean);
+ try {
+ LOGGER.info("Creating Topic." + topicBean.getTopicName());
+
+ tService.createTopic(getDmaapContext(), topicBean);
LOGGER.info("Topic created Successfully.");
- }
- catch (TopicExistsException ex){
-
- LOGGER.error("Error while creating a topic: " + ex.getMessage(),
- ex);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ ex.getMessage());
- LOGGER.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
-
-
-
- }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ } catch (TopicExistsException ex) {
+
+ LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + ex.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch (CambriaApiException | IOException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+
+ } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
+ } catch (CambriaApiException | IOException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
}
}
@@ -362,45 +342,42 @@ public class TopicRestService {
*
* @param topicName
* topic
- * @throws CambriaApiException
+ * @throws CambriaApiException
* @throws IOException
- * */
+ */
@DELETE
@Path("/{topicName}")
- //@Produces(MediaType.TEXT_PLAIN)
+ // @Produces(MediaType.TEXT_PLAIN)
public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
LOGGER.info("Deleting Topic: " + topicName);
- topicService.deleteTopic(getDmaapContext(), topicName);
+ tService.deleteTopic(getDmaapContext(), topicName);
LOGGER.info("Topic [" + topicName + "] deleted successfully.");
- } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch (IOException | ConfigDbException
- | CambriaApiException | TopicExistsException excp) {
+
+ } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
LOGGER.error("Error while deleting topic: " + topicName, excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getDeleteTopicFail() + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
private DMaaPContext getDmaapContext() {
- DMaaPContext dmaapContext = new DMaaPContext();
dmaapContext.setRequest(request);
dmaapContext.setResponse(response);
dmaapContext.setConfigReader(configReader);
@@ -413,49 +390,48 @@ public class TopicRestService {
* This method will fetch the details of publisher by giving topic name
*
* @param topicName
- * @throws CambriaApiException
- * @throws AccessDeniedException
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
*/
@GET
@Path("/{topicName}/producers")
- //@Produces(MediaType.TEXT_PLAIN)
- public void getPublishersByTopicName(
- @PathParam("topicName") String topicName) throws CambriaApiException {
+ // @Produces(MediaType.TEXT_PLAIN)
+ public void getPublishersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
-// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
-// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-// String permission = aaf.aafPermissionString(topicName, "view");
-// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
-// {
- LOGGER.info("Fetching list of all the publishers for topic "
- + topicName);
-
- topicService.getPublishersByTopicName(getDmaapContext(), topicName);
-
- LOGGER.info("Returning list of all the publishers for topic "
- + topicName);
-// }else{
-// LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
-//
-// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-// errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
-// LOGGER.info(errRes);
-// throw new DMaaPAccessDeniedException(errRes);
-//
-// }
-
+
+ // String permission =
+ // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+ // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ // String permission = aaf.aafPermissionString(topicName, "view");
+ // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
+ // permission))
+ // {
+ LOGGER.info("Fetching list of all the publishers for topic " + topicName);
+
+ tService.getPublishersByTopicName(getDmaapContext(), topicName);
+
+ LOGGER.info("Returning list of all the publishers for topic " + topicName);
+ // }else{
+ // LOGGER.error("Error while fetching list of publishers for topic
+ // "+ topicName);
+ //
+ // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ // errorMessages.getNotPermitted1()+" fetch list of publishers
+ // "+errorMessages.getNotPermitted2());
+ // LOGGER.info(errRes);
+ // throw new DMaaPAccessDeniedException(errRes);
+ //
+ // }
+
} catch (IOException | ConfigDbException | TopicExistsException excp) {
- LOGGER.error("Error while fetching list of publishers for topic "
- + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
- "Error while fetching list of publishers for topic: "
- + topicName + excp.getMessage());
+ LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
@@ -464,44 +440,38 @@ public class TopicRestService {
*
* @param topicName
* @param producerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@PUT
@Path("/{topicName}/producers/{producerId}")
- public void permitPublisherForTopic(
- @PathParam("topicName") String topicName,
+ public void permitPublisherForTopic(@PathParam("topicName") String topicName,
@PathParam("producerId") String producerId) throws CambriaApiException {
try {
- LOGGER.info("Granting write access to producer [" + producerId
- + "] for topic " + topicName);
+ LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
- topicService.permitPublisherForTopic(getDmaapContext(), topicName,
- producerId);
+ tService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
- LOGGER.info("Write access has been granted to producer ["
- + producerId + "] for topic " + topicName);
+ LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
} catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch ( ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while granting write access to producer ["
- + producerId + "] for topic " + topicName, excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
- "Error while granting write access to producer ["
- + producerId + "] for topic " + topicName + excp.getMessage());
+
+ } catch (ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Error while granting write access to producer [" + producerId + "] for topic " + topicName,
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while granting write access to producer [" + producerId + "] for topic " + topicName
+ + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
@@ -510,39 +480,34 @@ public class TopicRestService {
*
* @param topicName
* @param producerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@DELETE
@Path("/{topicName}/producers/{producerId}")
public void denyPublisherForTopic(@PathParam("topicName") String topicName,
@PathParam("producerId") String producerId) throws CambriaApiException {
try {
- LOGGER.info("Revoking write access to producer [" + producerId
- + "] for topic " + topicName);
+ LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
- topicService.denyPublisherForTopic(getDmaapContext(), topicName,
- producerId);
+ tService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
- LOGGER.info("Write access revoked for producer [" + producerId
- + "] for topic " + topicName);
+ LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
} catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch ( ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while revoking write access for producer ["
- + producerId + "] for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
- "Error while revoking write access to producer ["
- + producerId + "] for topic " + topicName + excp.getMessage());
+
+ } catch (ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName,
+ excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking write access to producer [" + producerId + "] for topic " + topicName
+ + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
}
@@ -552,55 +517,51 @@ public class TopicRestService {
* Get the consumer details by the topic name
*
* @param topicName
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
*/
@GET
@Path("/{topicName}/consumers")
- //@Produces(MediaType.TEXT_PLAIN)
- public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException,
- CambriaApiException {
+ // @Produces(MediaType.TEXT_PLAIN)
+ public void getConsumersByTopicName(@PathParam("topicName") String topicName)
+ throws AccessDeniedException, CambriaApiException {
try {
-
-
-// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
-// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-// String permission = aaf.aafPermissionString(topicName, "view");
-// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
-// {
- LOGGER.info("Fetching list of all consumers for topic " + topicName);
-
- topicService.getConsumersByTopicName(getDmaapContext(), topicName);
-
- LOGGER.info("Returning list of all consumers for topic "
- + topicName);
-
-// }else{
-// LOGGER.error(
-// "Error while fetching list of all consumers for topic "
-// + topicName);
-// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-// errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
-// LOGGER.info(errRes);
-// throw new DMaaPAccessDeniedException(errRes);
-//
-//
-// }
-
-
-
+
+ // String permission =
+ // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
+ // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ // String permission = aaf.aafPermissionString(topicName, "view");
+ // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
+ // permission))
+ // {
+ LOGGER.info("Fetching list of all consumers for topic " + topicName);
+
+ tService.getConsumersByTopicName(getDmaapContext(), topicName);
+
+ LOGGER.info("Returning list of all consumers for topic " + topicName);
+
+ // }else{
+ // LOGGER.error(
+ // "Error while fetching list of all consumers for topic "
+ // + topicName);
+ // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ // errorMessages.getNotPermitted1()+" fetch list of consumers
+ // "+errorMessages.getNotPermitted2());
+ // LOGGER.info(errRes);
+ // throw new DMaaPAccessDeniedException(errRes);
+ //
+ //
+ // }
+
} catch (IOException | ConfigDbException | TopicExistsException excp) {
- LOGGER.error(
- "Error while fetching list of all consumers for topic "
- + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
- "Error while fetching list of all consumers for topic: "
- + topicName+ excp.getMessage());
+ LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of all consumers for topic: " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
@@ -609,33 +570,28 @@ public class TopicRestService {
*
* @param topicName
* @param consumerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@PUT
@Path("/{topicName}/consumers/{consumerId}")
- public void permitConsumerForTopic(
- @PathParam("topicName") String topicName,
+ public void permitConsumerForTopic(@PathParam("topicName") String topicName,
@PathParam("consumerId") String consumerId) throws CambriaApiException {
try {
- LOGGER.info("Granting read access to consumer [" + consumerId
- + "] for topic " + topicName);
-
- topicService.permitConsumerForTopic(getDmaapContext(), topicName,
- consumerId);
-
- LOGGER.info("Read access granted to consumer [" + consumerId
- + "] for topic " + topicName);
- } catch (AccessDeniedException | ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while granting read access to consumer ["
- + consumerId + "] for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
- "Error while granting read access to consumer ["
- + consumerId + "] for topic " + topicName+ excp.getMessage());
+ LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
+
+ tService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
+
+ LOGGER.info("Read access granted to consumer [" + consumerId + "] for topic " + topicName);
+ } catch (AccessDeniedException | ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName,
+ excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while granting read access to consumer [" + consumerId + "] for topic " + topicName
+ + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
@@ -644,45 +600,37 @@ public class TopicRestService {
*
* @param topicName
* @param consumerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@DELETE
@Path("/{topicName}/consumers/{consumerId}")
public void denyConsumerForTopic(@PathParam("topicName") String topicName,
@PathParam("consumerId") String consumerId) throws CambriaApiException {
try {
- LOGGER.info("Revoking read access to consumer [" + consumerId
- + "] for topic " + topicName);
-
- topicService.denyConsumerForTopic(getDmaapContext(), topicName,
- consumerId);
-
- LOGGER.info("Read access revoked to consumer [" + consumerId
- + "] for topic " + topicName);
- } catch ( ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while revoking read access to consumer ["
- + consumerId + "] for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
- "Error while revoking read access to consumer ["
- + consumerId + "] for topic " + topicName+ excp.getMessage());
- LOGGER.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
+
+ tService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
+
+ LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
+ } catch (ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName,
excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName
+ + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }
- }
+ } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail() + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
-
+ }
+ }
}
diff --git a/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java b/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java
index a44c2ad..1a870a1 100644
--- a/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java
+++ b/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java
@@ -47,9 +47,9 @@ import com.att.nsa.cambria.utils.ConfigurationReader;
import com.att.nsa.configs.ConfigDbException;
/**
- * This class is a CXF REST service
- * which acts as gateway for DMaaP
- * Transaction Ids.
+ * This class is a CXF REST service which acts as gateway for DMaaP Transaction
+ * Ids.
+ *
* @author author
*
*/
@@ -84,10 +84,13 @@ public class TransactionRestService {
@Autowired
private TransactionService transactionService;
+ private DMaaPContext dmaapContext = new DMaaPContext();
+
/**
*
* Returns a list of all the existing Transaction Ids
- * @throws CambriaApiException
+ *
+ * @throws CambriaApiException
*
* @throws IOException
* @exception ConfigDbException
@@ -104,11 +107,10 @@ public class TransactionRestService {
LOGGER.info("Returning list of all transactions.");
} catch (ConfigDbException | IOException e) {
- LOGGER.error("Error while retrieving list of all transactions: "
- + e.getMessage(), e);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
- DMaaPResponseCode.RETRIEVE_TRANSACTIONS.getResponseCode(),
- "Error while retrieving list of all transactions:"+e.getMessage());
+ LOGGER.error("Error while retrieving list of all transactions: " + e.getMessage(), e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
+ DMaaPResponseCode.RETRIEVE_TRANSACTIONS.getResponseCode(),
+ "Error while retrieving list of all transactions:" + e.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
}
@@ -121,7 +123,7 @@ public class TransactionRestService {
*
* @param transactionId
* - id of transaction
- * @throws CambriaApiException
+ * @throws CambriaApiException
* @throws IOException
* @exception ConfigDbException
* @exception IOException
@@ -131,22 +133,18 @@ public class TransactionRestService {
*/
@GET
@Path("/{transactionId}")
- public void getTransactionObj(
- @PathParam("transactionId") String transactionId) throws CambriaApiException {
+ public void getTransactionObj(@PathParam("transactionId") String transactionId) throws CambriaApiException {
LOGGER.info("Fetching details of Transaction ID : " + transactionId);
try {
- transactionService.getTransactionObj(getDmaapContext(),
- transactionId);
+ transactionService.getTransactionObj(getDmaapContext(), transactionId);
} catch (ConfigDbException | JSONException | IOException e) {
- LOGGER.error("Error while retrieving transaction details for id: "
- + transactionId, e);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
- DMaaPResponseCode.RETRIEVE_TRANSACTIONS_DETAILS.getResponseCode(),
- "Error while retrieving transaction details for id: ["
- + transactionId + "]: " + e.getMessage());
+ LOGGER.error("Error while retrieving transaction details for id: " + transactionId, e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
+ DMaaPResponseCode.RETRIEVE_TRANSACTIONS_DETAILS.getResponseCode(),
+ "Error while retrieving transaction details for id: [" + transactionId + "]: " + e.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
@@ -165,8 +163,7 @@ public class TransactionRestService {
* Object,HttpServlet Object
*
*/
- private DMaaPContext getDmaapContext() {
- DMaaPContext dmaapContext = new DMaaPContext();
+ public DMaaPContext getDmaapContext() {
dmaapContext.setConfigReader(configReader);
dmaapContext.setRequest(request);
dmaapContext.setResponse(response);