diff options
author | sunil unnava <su622b@att.com> | 2018-08-14 09:34:46 -0400 |
---|---|---|
committer | sunil unnava <su622b@att.com> | 2018-08-14 09:39:23 -0400 |
commit | b32effcaf5684d5e2f338a4537b71a2375c534e5 (patch) | |
tree | e1b80407f414509ffcc766b987ec6a95f7254b4e /src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java | |
parent | 0823cb186012c8e6b7de3d979dfabb9f838da7c2 (diff) |
update the testcases after the kafka 11 changes
Issue-ID: DMAAP-526
Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93
Signed-off-by: sunil unnava <su622b@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java | 704 |
1 files changed, 704 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java new file mode 100644 index 0000000..01ed1cc --- /dev/null +++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java @@ -0,0 +1,704 @@ +/** + * + */ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.service.impl; + +import java.io.IOException; + +import org.apache.http.HttpStatus; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.beans.DMaaPContext; +import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker; +import com.att.dmf.mr.beans.TopicBean; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.exception.DMaaPAccessDeniedException; +import com.att.dmf.mr.exception.DMaaPErrorMessages; +import com.att.dmf.mr.exception.DMaaPResponseCode; +import com.att.dmf.mr.exception.ErrorResponse; +import com.att.dmf.mr.metabroker.Broker.TopicExistsException; +import com.att.dmf.mr.metabroker.Broker1; +//import com.att.dmf.mr.metabroker.Broker1; +import com.att.dmf.mr.metabroker.Topic; +import com.att.dmf.mr.security.DMaaPAAFAuthenticator; +import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl; +import com.att.dmf.mr.security.DMaaPAuthenticatorImpl; +import com.att.dmf.mr.service.TopicService; +import com.att.dmf.mr.utils.DMaaPResponseBuilder; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * @author muzainulhaque.qazi + * + */ +@Service +public class TopicServiceImpl implements TopicService { + + // private static final Logger LOGGER = + // Logger.getLogger(TopicServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); + @Autowired + private DMaaPErrorMessages errorMessages; + + // @Value("${msgRtr.topicfactory.aaf}") + // private String mrFactory; + + public DMaaPErrorMessages getErrorMessages() { + return errorMessages; + } + + public void setErrorMessages(DMaaPErrorMessages errorMessages) { + this.errorMessages = errorMessages; + } + + /** + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + * + */ + @Override + public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { + LOGGER.info("Fetching list of all the topics."); + JSONObject json = new JSONObject(); + + JSONArray topicsList = new JSONArray(); + + for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { + topicsList.put(topic.getName()); + } + + json.put("topics", topicsList); + + LOGGER.info("Returning list of all the topics."); + DMaaPResponseBuilder.respondOk(dmaapContext, json); + + } + + /** + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + * + */ + public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { + + LOGGER.info("Fetching list of all the topics."); + JSONObject json = new JSONObject(); + + JSONArray topicsList = new JSONArray(); + + for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { + JSONObject obj = new JSONObject(); + obj.put("topicName", topic.getName()); + // obj.put("description", topic.getDescription()); + obj.put("owner", topic.getOwner()); + obj.put("txenabled", topic.isTransactionEnabled()); + topicsList.put(obj); + } + + json.put("topics", topicsList); + + LOGGER.info("Returning list of all the topics."); + DMaaPResponseBuilder.respondOk(dmaapContext, json); + + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + @Override + public void getTopic(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + + LOGGER.info("Fetching details of topic " + topicName); + Topic t = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == t) { + LOGGER.error("Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Topic [" + topicName + "] does not exist."); + } + + JSONObject o = new JSONObject(); + o.put("name", t.getName()); + o.put("description", t.getDescription()); + + if (null != t.getOwners()) + o.put("owner", t.getOwners().iterator().next()); + if (null != t.getReaderAcl()) + o.put("readerAcl", aclToJson(t.getReaderAcl())); + if (null != t.getWriterAcl()) + o.put("writerAcl", aclToJson(t.getWriterAcl())); + + LOGGER.info("Returning details of topic " + topicName); + DMaaPResponseBuilder.respondOk(dmaapContext, o); + + } + + /** + * @param dmaapContext + * @param topicBean + * @throws CambriaApiException + * @throws AccessDeniedException + * @throws IOException + * @throws TopicExistsException + * @throws JSONException + * + * + * + */ + @Override + public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) + throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException { + + LOGGER.info("Creating topic " + topicBean.getTopicName()); + + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + String key = null; + //String appName = dmaapContext.getRequest().getHeader("AppName"); + String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, + "enforced.topic.name.AAF"); + + if (user != null) { + key = user.getKey(); + + if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) { + + LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Failed to create topic: Access Denied.User does not have permission to perform create topic"); + + LOGGER.info(errRes.toString()); + // throw new DMaaPAccessDeniedException(errRes); + + } + } + // else if (user==null && + // (null==dmaapContext.getRequest().getHeader("Authorization") && null + // == dmaapContext.getRequest().getHeader("cookie")) ) { + /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization") + ) { + LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Failed to create topic: Access Denied.User does not have permission to perform create topic"); + + LOGGER.info(errRes.toString()); + // throw new DMaaPAccessDeniedException(errRes); + }*/ + + if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization") + )*/) { + // if (user == null && + // (null!=dmaapContext.getRequest().getHeader("Authorization") || + // null != dmaapContext.getRequest().getHeader("cookie"))) { + // ACL authentication is not provided so we will use the aaf + // authentication + /*LOGGER.info("Authorization the topic"); + + String permission = ""; + String nameSpace = ""; + if (topicBean.getTopicName().indexOf(".") > 1) + nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf(".")); + + String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "msgRtr.topicfactory.aaf"); + + // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); + + permission = mrFactoryVal + nameSpace + "|create"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/ + + //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { + if (false) { + LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Failed to create topic: Access Denied.User does not have permission to create topic with perm " + //+ permission); + + "permission"); + + + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } else { + // if user is null and aaf authentication is ok then key should + // be "" + // key = ""; + /** + * Added as part of AAF user it should return username + */ + + //key = dmaapContext.getRequest().getUserPrincipal().getName().toString(); + key="admin"; + LOGGER.info("key ==================== " + key); + + } + } + + try { + final String topicName = topicBean.getTopicName(); + final String desc = topicBean.getTopicDescription(); + int partition = topicBean.getPartitionCount(); + // int replica = topicBean.getReplicationCount(); + if (partition == 0) { + partition = 8; + } + final int partitions = partition; + + int replica = topicBean.getReplicationCount(); + if (replica == 0) { + //replica = 3; + replica = 1; + } + final int replicas = replica; + boolean transactionEnabled = topicBean.isTransactionEnabled(); + + final Broker1 metabroker = getMetaBroker(dmaapContext); + final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled); + + LOGGER.info("Topic created successfully. Sending response"); + DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t)); + } catch (JSONException excp) { + + LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (ConfigDbException excp1) { + + LOGGER.error("Failed to create topic. Config DB Exception", excp1); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws CambriaApiException + * @throws AccessDeniedException + */ + @Override + public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException, + CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException { + + LOGGER.info(" Deleting topic " + topicName); + /*if (true) { // { + LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed."); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " " + + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + }*/ + + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + + /*if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) { + LOGGER.info("Authenticating the user, as ACL authentication is not provided"); + // String permission = + // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + String permission = ""; + String nameSpace = topicName.substring(0, topicName.lastIndexOf(".")); + String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "msgRtr.topicfactory.aaf"); + // String tokens[] = topicName.split(".mr.topic."); + permission = mrFactoryVal + nameSpace + "|destroy"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { + LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed."); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete " + + errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + } + + }*/ + + final Broker1 metabroker = getMetaBroker(dmaapContext); + final Topic topic = metabroker.getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist."); + } + + try { + metabroker.deleteTopic(topicName); + } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) { + // TODO Auto-generated catch block + throw new CambriaApiException(500, "failed to delete the topic"); + } + + LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully"); + + } + + /** + * + * @param dmaapContext + * @return + */ + private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) { + return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker(); + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * + */ + @Override + public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + LOGGER.info("Retrieving list of all the publishers for topic " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException( + "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); + } + + final NsaAcl acl = topic.getWriterAcl(); + + LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response."); + DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl)); + + } + + /** + * + * @param acl + * @return + */ + private static JSONObject aclToJson(NsaAcl acl) { + final JSONObject o = new JSONObject(); + if (acl == null) { + o.put("enabled", false); + o.put("users", new JSONArray()); + } else { + o.put("enabled", acl.isActive()); + + final JSONArray a = new JSONArray(); + for (String user : acl.getUsers()) { + a.put(user); + } + o.put("users", a); + } + return o; + } + + /** + * @param dmaapContext + * @param topicName + */ + @Override + public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName) + throws IOException, ConfigDbException, TopicExistsException { + LOGGER.info("Retrieving list of all the consumers for topic " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException( + "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); + } + + final NsaAcl acl = topic.getReaderAcl(); + + LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response."); + DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl)); + + } + + /** + * + * @param t + * @return + */ + private static JSONObject topicToJson(Topic t) { + final JSONObject o = new JSONObject(); + + o.put("name", t.getName()); + o.put("description", t.getDescription()); + o.put("owner", t.getOwner()); + o.put("readerAcl", aclToJson(t.getReaderAcl())); + o.put("writerAcl", aclToJson(t.getWriterAcl())); + + return o; + } + + /** + * @param dmaapContext + * @param topicName @param producerId @throws + * ConfigDbException @throws IOException @throws + * TopicExistsException @throws AccessDeniedException @throws + * + */ + @Override + public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException { + + LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + + // if (user == null) { + // + // LOGGER.info("Authenticating the user, as ACL authentication is not + // provided"); + //// String permission = + // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to permit write access to producer [" + + // producerId + "] for topic " + topicName + // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Grant publish permissions> + // "+errorMessages.getNotPermitted2()+ topicName); + // LOGGER.info(errRes); + // throw new DMaaPAccessDeniedException(errRes); + // } + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit write access to producer [" + producerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.permitWritesFromUser(producerId, user); + + LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher."); + + } + + /** + * @param dmaapContext + * @param topicName + * @param producerId + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws AccessDeniedException + * @throws DMaaPAccessDeniedException + * + */ + @Override + public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, + DMaaPAccessDeniedException { + + LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + // if (user == null) { + // + //// String permission = + // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to revoke write access to producer [" + + // producerId + "] for topic " + topicName + // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Revoke publish permissions> + // "+errorMessages.getNotPermitted2()+ topicName); + // LOGGER.info(errRes); + // throw new DMaaPAccessDeniedException(errRes); + // + // } + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to revoke write access to producer [" + producerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.denyWritesFromUser(producerId, user); + + LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher."); + + } + + /** + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws DMaaPAccessDeniedException + */ + @Override + public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, + DMaaPAccessDeniedException { + + LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + // if (user == null) { + // + //// String permission = + // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to permit read access to consumer [" + + // consumerId + "] for topic " + topicName + // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Grant consume permissions> + // "+errorMessages.getNotPermitted2()+ topicName); + // LOGGER.info(errRes); + // throw new DMaaPAccessDeniedException(errRes); + // } + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.permitReadsByUser(consumerId, user); + + LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "]."); + } + + /** + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws DMaaPAccessDeniedException + */ + @Override + public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, + DMaaPAccessDeniedException { + + LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + // if (user == null) { + //// String permission = + // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to revoke read access to consumer [" + + // consumerId + "] for topic " + topicName + // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Grant consume permissions> + // "+errorMessages.getNotPermitted2()+ topicName); + // LOGGER.info(errRes); + // throw new DMaaPAccessDeniedException(errRes); + // } + // + // + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.denyReadsByUser(consumerId, user); + + LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "]."); + + } + +} |