diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java | 637 |
1 files changed, 637 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java new file mode 100644 index 0000000..2235098 --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java @@ -0,0 +1,637 @@ +/* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved. + * ================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.dmaap.dmf.mr.service.impl; + +import com.att.ajsc.beans.PropertiesMapBean; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import joptsimple.internal.Strings; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; +import org.apache.http.HttpStatus; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.onap.dmaap.dmf.mr.CambriaApiException; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker; +import org.onap.dmaap.dmf.mr.beans.TopicBean; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; +import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; +import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; +import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; +import org.onap.dmaap.dmf.mr.exception.ErrorResponse; +import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException; +import org.onap.dmaap.dmf.mr.metabroker.Broker1; +import org.onap.dmaap.dmf.mr.metabroker.Topic; +import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator; +import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; +import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; +import org.onap.dmaap.dmf.mr.service.TopicService; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; +import org.onap.dmaap.dmf.mr.utils.Utils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.security.Principal; + +/** + * @author muzainulhaque.qazi + * + */ +@Service +public class TopicServiceImpl implements TopicService { + + private static final String TOPIC_CREATE_OP = "create"; + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); + @Autowired + private DMaaPErrorMessages errorMessages; + + public DMaaPErrorMessages getErrorMessages() { + return errorMessages; + } + + public void setErrorMessages(DMaaPErrorMessages errorMessages) { + this.errorMessages = errorMessages; + } + + + String getPropertyFromAJSCbean(String propertyKey) { + return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } + + String getPropertyFromAJSCmap(String propertyKey) { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } + + NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) { + return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + } + + void respondOk(DMaaPContext context, String msg) { + DMaaPResponseBuilder.respondOkWithHtml(context, msg); + } + + void respondOk(DMaaPContext context, JSONObject json) throws IOException { + DMaaPResponseBuilder.respondOk(context, json); + } + + boolean isCadiEnabled() { + return Utils.isCadiEnabled(); + } + /** + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + * + */ + @Override + public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { + LOGGER.info("Fetching list of all the topics."); + JSONObject json = new JSONObject(); + + JSONArray topicsList = new JSONArray(); + + for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { + topicsList.put(topic.getName()); + } + + json.put("topics", topicsList); + + LOGGER.info("Returning list of all the topics."); + respondOk(dmaapContext, json); + + } + + /** + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + * + */ + public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { + + LOGGER.info("Fetching list of all the topics."); + JSONObject json = new JSONObject(); + + JSONArray topicsList = new JSONArray(); + + for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { + JSONObject obj = new JSONObject(); + obj.put("topicName", topic.getName()); + + obj.put("owner", topic.getOwner()); + obj.put("txenabled", topic.isTransactionEnabled()); + topicsList.put(obj); + } + + json.put("topics", topicsList); + + LOGGER.info("Returning list of all the topics."); + respondOk(dmaapContext, json); + + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + @Override + public void getTopic(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + + LOGGER.info("Fetching details of topic " + topicName); + Topic t = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == t) { + LOGGER.error("Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Topic [" + topicName + "] does not exist."); + } + + JSONObject o = new JSONObject(); + o.put("name", t.getName()); + o.put("description", t.getDescription()); + + if (null != t.getOwners()) + o.put("owner", t.getOwners().iterator().next()); + if (null != t.getReaderAcl()) + o.put("readerAcl", aclToJson(t.getReaderAcl())); + if (null != t.getWriterAcl()) + o.put("writerAcl", aclToJson(t.getWriterAcl())); + + LOGGER.info("Returning details of topic " + topicName); + respondOk(dmaapContext, o); + + } + + /** + * @param dmaapContext + * @param topicBean + * @throws CambriaApiException + * @throws AccessDeniedException + * @throws IOException + * @throws TopicExistsException + * @throws JSONException + * + * + * + */ + @Override + public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException { + String topicName = topicBean.getTopicName(); + LOGGER.info("Creating topic {}",topicName); + String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP); + + try { + final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions"); + final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas"); + + final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(), + key, partitions, replicas, topicBean.isTransactionEnabled()); + + LOGGER.info("Topic {} created successfully. Sending response", topicName); + respondOk(dmaapContext, topicToJson(t)); + } catch (JSONException ex) { + + LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (ConfigDbException ex) { + + LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } catch (Broker1.TopicExistsException ex) { + LOGGER.error( "Failed to create topic "+ topicName +". Topic already exists.",ex); + } + } + + private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException { + String clientId = Strings.EMPTY; + if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) { + LOGGER.info("Performing AAF authorization for topic {} creation.", topicName); + String permission = buildPermission(topicName, operation); + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + clientId = getAAFclientId(dmaapContext.getRequest()); + + if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { + LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}", + operation, topicName, clientId, permission); + throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission)); + } + } else if(operation.equals(TOPIC_CREATE_OP)){ + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); + clientId = (user != null) ? user.getKey() : Strings.EMPTY; + } + return clientId; + } + + private String getAAFclientId(HttpServletRequest request) { + Principal principal = request.getUserPrincipal(); + if (principal !=null) { + return principal.getName(); + } else { + LOGGER.warn("Performing AAF authorization but user has not been provided in request."); + return null; + } + } + + private boolean isTopicWithEnforcedAuthorization(String topicName) { + String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF"); + return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace); + } + + int getValueOrDefault(int value, String defaultProperty) { + int returnValue = value; + if (returnValue <= 0) { + String defaultValue = getPropertyFromAJSCmap(defaultProperty); + returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1; + returnValue = (returnValue <= 0) ? 1 : returnValue; + } + return returnValue; + } + + private String buildPermission(String topicName, String operation) { + String nameSpace = (topicName.indexOf('.') > 1) ? + topicName.substring(0, topicName.lastIndexOf('.')) : ""; + + String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf"); + return mrFactoryValue + nameSpace + "|" + operation; + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws CambriaApiException + * @throws AccessDeniedException + */ + @Override + public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException, + CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException { + + LOGGER.info(" Deleting topic " + topicName); + authorizeClient(dmaapContext, topicName, "destroy"); + + final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + if (topic == null) { + LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist."); + } + + // metabroker.deleteTopic(topicName); + + LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response."); + respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully"); + } + + /** + * + * @param dmaapContext + * @return + */ + DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) { + return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker(); + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * + */ + @Override + public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + LOGGER.info("Retrieving list of all the publishers for topic " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException( + "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); + } + + final NsaAcl acl = topic.getWriterAcl(); + + LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response."); + respondOk(dmaapContext, aclToJson(acl)); + + } + + /** + * + * @param acl + * @return + */ + private static JSONObject aclToJson(NsaAcl acl) { + final JSONObject o = new JSONObject(); + if (acl == null) { + o.put("enabled", false); + o.put("users", new JSONArray()); + } else { + o.put("enabled", acl.isActive()); + + final JSONArray a = new JSONArray(); + for (String user : acl.getUsers()) { + a.put(user); + } + o.put("users", a); + } + return o; + } + + /** + * @param dmaapContext + * @param topicName + */ + @Override + public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName) + throws IOException, ConfigDbException, TopicExistsException { + LOGGER.info("Retrieving list of all the consumers for topic " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException( + "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); + } + + final NsaAcl acl = topic.getReaderAcl(); + + LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response."); + respondOk(dmaapContext, aclToJson(acl)); + + } + + /** + * + * @param t + * @return + */ + static JSONObject topicToJson(Topic t) { + final JSONObject o = new JSONObject(); + + o.put("name", t.getName()); + o.put("description", t.getDescription()); + o.put("owner", t.getOwner()); + o.put("readerAcl", aclToJson(t.getReaderAcl())); + o.put("writerAcl", aclToJson(t.getWriterAcl())); + + return o; + } + + /** + * @param dmaapContext + * @param topicName @param producerId @throws + * ConfigDbException @throws IOException @throws + * TopicExistsException @throws AccessDeniedException @throws + * + */ + @Override + public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException { + + LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); + + + // + // LOGGER.info("Authenticating the user, as ACL authentication is not + + //// String permission = + + // + + + + // { + // LOGGER.error("Failed to permit write access to producer [" + + // producerId + "] for topic " + topicName + + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Grant publish permissions> + + + + // } + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit write access to producer [" + producerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.permitWritesFromUser(producerId, user); + + LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName + + "]. Sending response."); + respondOk(dmaapContext, "Write access has been granted to publisher."); + + } + + /** + * @param dmaapContext + * @param topicName + * @param producerId + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws AccessDeniedException + * @throws DMaaPAccessDeniedException + * + */ + @Override + public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, + DMaaPAccessDeniedException { + + LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); + + // + //// String permission = + + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to revoke write access to producer [" + + // producerId + "] for topic " + topicName + + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Revoke publish permissions> + + + // throw new DMaaPAccessDeniedException(errRes); + // + + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to revoke write access to producer [" + producerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.denyWritesFromUser(producerId, user); + + LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName + + "]. Sending response."); + respondOk(dmaapContext, "Write access has been revoked for publisher."); + + } + + /** + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws DMaaPAccessDeniedException + */ + @Override + public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, + DMaaPAccessDeniedException { + + LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); + + // + //// String permission = + + + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to permit read access to consumer [" + + // consumerId + "] for topic " + topicName + + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Grant consume permissions> + + + + // } + // } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.permitReadsByUser(consumerId, user); + + LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName + + "]. Sending response."); + respondOk(dmaapContext, + "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "]."); + } + + /** + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws DMaaPAccessDeniedException + */ + @Override + public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, + DMaaPAccessDeniedException { + + LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); + + //// String permission = + + + // String permission = aaf.aafPermissionString(topicName, "manage"); + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + // { + // LOGGER.error("Failed to revoke read access to consumer [" + + // consumerId + "] for topic " + topicName + + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + // errorMessages.getNotPermitted1()+" <Grant consume permissions> + + + // throw new DMaaPAccessDeniedException(errRes); + // } + // + // + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.denyReadsByUser(consumerId, user); + + LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName + + "]. Sending response."); + respondOk(dmaapContext, + "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "]."); + + } + +} |