diff options
12 files changed, 239 insertions, 594 deletions
diff --git a/bundleconfig-local/etc/appprops/MsgRtrApi.properties b/bundleconfig-local/etc/appprops/MsgRtrApi.properties index 3aef922..b36ac0f 100644 --- a/bundleconfig-local/etc/appprops/MsgRtrApi.properties +++ b/bundleconfig-local/etc/appprops/MsgRtrApi.properties @@ -151,6 +151,7 @@ msgRtr.namespace.aaf=org.onap.dmaap.mr.topic msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic: enforced.topic.name.AAF=org.onap.dmaap.mr forceAAF=false +useCustomAcls=false transidUEBtopicreqd=false defaultNSforUEB=org.onap.dmaap.mr ############################################################################## diff --git a/bundleconfig-local/etc/sysprops/sys-props.properties b/bundleconfig-local/etc/sysprops/sys-props.properties index cfba8d0..eccd1ee 100644 --- a/bundleconfig-local/etc/sysprops/sys-props.properties +++ b/bundleconfig-local/etc/sysprops/sys-props.properties @@ -161,3 +161,4 @@ RESTLET_COMPONENT_REUSE_ADDRESS=true maxcontentlength=10000 msg_size_exceeds=Message size exceeds the default size. forceAAF=false +useCustomAcls=false diff --git a/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties b/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties index 4764321..9341144 100644 --- a/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties +++ b/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties @@ -147,6 +147,7 @@ msgRtr.namespace.aaf=org.onap.dmaap.mr.topic msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic: enforced.topic.name.AAF=org.onap.dmaap.mr forceAAF=false +useCustomAcls=false transidUEBtopicreqd=false defaultNSforUEB=org.onap.dmaap.mr ############################################################################## @@ -23,7 +23,7 @@ <groupId>org.onap.dmaap.messagerouter.messageservice</groupId> <artifactId>dmaapMR1</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.1-SNAPSHOT</version> <name>dmaap-messagerouter-messageservice</name> <description>Message Router - Restful interface built for kafka</description> <licenses> diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java index 2235098..92aea97 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java @@ -21,6 +21,8 @@ */ package org.onap.dmaap.dmf.mr.service.impl; +import static org.onap.dmaap.util.DMaaPAuthFilter.isUseCustomAcls; + import com.att.ajsc.beans.PropertiesMapBean; import com.att.ajsc.filemonitor.AJSCPropertiesMap; import com.att.eelf.configuration.EELFLogger; @@ -69,7 +71,7 @@ import java.security.Principal; public class TopicServiceImpl implements TopicService { private static final String TOPIC_CREATE_OP = "create"; - private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getLogger(TopicServiceImpl.class); @Autowired private DMaaPErrorMessages errorMessages; @@ -116,18 +118,13 @@ public class TopicServiceImpl implements TopicService { public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { LOGGER.info("Fetching list of all the topics."); JSONObject json = new JSONObject(); - JSONArray topicsList = new JSONArray(); - for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { topicsList.put(topic.getName()); } - json.put("topics", topicsList); - LOGGER.info("Returning list of all the topics."); respondOk(dmaapContext, json); - } /** @@ -138,12 +135,9 @@ public class TopicServiceImpl implements TopicService { * */ public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { - LOGGER.info("Fetching list of all the topics."); JSONObject json = new JSONObject(); - JSONArray topicsList = new JSONArray(); - for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { JSONObject obj = new JSONObject(); obj.put("topicName", topic.getName()); @@ -152,12 +146,9 @@ public class TopicServiceImpl implements TopicService { obj.put("txenabled", topic.isTransactionEnabled()); topicsList.put(obj); } - json.put("topics", topicsList); - LOGGER.info("Returning list of all the topics."); respondOk(dmaapContext, json); - } /** @@ -170,29 +161,23 @@ public class TopicServiceImpl implements TopicService { @Override public void getTopic(DMaaPContext dmaapContext, String topicName) throws ConfigDbException, IOException, TopicExistsException { - LOGGER.info("Fetching details of topic " + topicName); Topic t = getMetaBroker(dmaapContext).getTopic(topicName); - if (null == t) { LOGGER.error("Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Topic [" + topicName + "] does not exist."); } - JSONObject o = new JSONObject(); o.put("name", t.getName()); o.put("description", t.getDescription()); - if (null != t.getOwners()) o.put("owner", t.getOwners().iterator().next()); if (null != t.getReaderAcl()) o.put("readerAcl", aclToJson(t.getReaderAcl())); if (null != t.getWriterAcl()) o.put("writerAcl", aclToJson(t.getWriterAcl())); - LOGGER.info("Returning details of topic " + topicName); respondOk(dmaapContext, o); - } /** @@ -212,26 +197,20 @@ public class TopicServiceImpl implements TopicService { String topicName = topicBean.getTopicName(); LOGGER.info("Creating topic {}",topicName); String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP); - try { final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions"); final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas"); - final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(), key, partitions, replicas, topicBean.isTransactionEnabled()); - LOGGER.info("Topic {} created successfully. Sending response", topicName); respondOk(dmaapContext, topicToJson(t)); } catch (JSONException ex) { - LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (ConfigDbException ex) { - LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); @@ -257,7 +236,7 @@ public class TopicServiceImpl implements TopicService { DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission)); } - } else if(operation.equals(TOPIC_CREATE_OP)){ + } else if (operation.equals(TOPIC_CREATE_OP)){ final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); clientId = (user != null) ? user.getKey() : Strings.EMPTY; } @@ -309,18 +288,14 @@ public class TopicServiceImpl implements TopicService { @Override public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException { - LOGGER.info(" Deleting topic " + topicName); authorizeClient(dmaapContext, topicName, "destroy"); - final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); if (topic == null) { LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist."); } - // metabroker.deleteTopic(topicName); - LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response."); respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully"); } @@ -347,18 +322,14 @@ public class TopicServiceImpl implements TopicService { throws ConfigDbException, IOException, TopicExistsException { LOGGER.info("Retrieving list of all the publishers for topic " + topicName); Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); - if (topic == null) { LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException( "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); } - final NsaAcl acl = topic.getWriterAcl(); - LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response."); respondOk(dmaapContext, aclToJson(acl)); - } /** @@ -392,15 +363,12 @@ public class TopicServiceImpl implements TopicService { throws IOException, ConfigDbException, TopicExistsException { LOGGER.info("Retrieving list of all the consumers for topic " + topicName); Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); - if (topic == null) { LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException( "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); } - final NsaAcl acl = topic.getReaderAcl(); - LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response."); respondOk(dmaapContext, aclToJson(acl)); @@ -432,49 +400,24 @@ public class TopicServiceImpl implements TopicService { */ @Override public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) - throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException { - + throws AccessDeniedException, ConfigDbException, TopicExistsException { LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); - - - // - // LOGGER.info("Authenticating the user, as ACL authentication is not - - //// String permission = - - // - - - - // { - // LOGGER.error("Failed to permit write access to producer [" + - // producerId + "] for topic " + topicName - - // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - // errorMessages.getNotPermitted1()+" <Grant publish permissions> - - - - // } - // } - Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); - if (null == topic) { LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName + "] does not exist."); } - - topic.permitWritesFromUser(producerId, user); - - LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName + if (isUseCustomAcls()) { + topic.permitWritesFromUser(producerId, user); + LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName + "]. Sending response."); + } else { + LOGGER.info("Ignoring acl update"); + } respondOk(dmaapContext, "Write access has been granted to publisher."); - } /** @@ -492,45 +435,19 @@ public class TopicServiceImpl implements TopicService { public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException { - LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); - - // - //// String permission = - - // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - // String permission = aaf.aafPermissionString(topicName, "manage"); - // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) - // { - // LOGGER.error("Failed to revoke write access to producer [" + - // producerId + "] for topic " + topicName - - // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - // errorMessages.getNotPermitted1()+" <Revoke publish permissions> - - - // throw new DMaaPAccessDeniedException(errRes); - // - - // } - Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); - if (null == topic) { LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName + "] does not exist."); } - topic.denyWritesFromUser(producerId, user); - LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName + "]. Sending response."); respondOk(dmaapContext, "Write access has been revoked for publisher."); - } /** @@ -541,44 +458,24 @@ public class TopicServiceImpl implements TopicService { */ @Override public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) - throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, - DMaaPAccessDeniedException { + throws AccessDeniedException, ConfigDbException, TopicExistsException { LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); - - // - //// String permission = - - - // String permission = aaf.aafPermissionString(topicName, "manage"); - // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) - // { - // LOGGER.error("Failed to permit read access to consumer [" + - // consumerId + "] for topic " + topicName - - // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - // errorMessages.getNotPermitted1()+" <Grant consume permissions> - - - - // } - // } - Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); - if (null == topic) { LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + "] does not exist."); } - - topic.permitReadsByUser(consumerId, user); - - LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName + if (isUseCustomAcls()) { + topic.permitReadsByUser(consumerId, user); + LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName + "]. Sending response."); + } else { + LOGGER.info("Ignoring acl update"); + } respondOk(dmaapContext, "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "]."); } @@ -596,37 +493,14 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); - - //// String permission = - - - // String permission = aaf.aafPermissionString(topicName, "manage"); - // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) - // { - // LOGGER.error("Failed to revoke read access to consumer [" + - // consumerId + "] for topic " + topicName - - // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - // errorMessages.getNotPermitted1()+" <Grant consume permissions> - - - // throw new DMaaPAccessDeniedException(errRes); - // } - // - // - Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); - if (null == topic) { LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + "] does not exist."); } - topic.denyReadsByUser(consumerId, user); - LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName + "]. Sending response."); respondOk(dmaapContext, diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java index 3ca60b0..419c652 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java @@ -86,7 +86,7 @@ public class DMaaPResponseBuilder { */ public static void respondOkWithHtml(DMaaPContext ctx, String html) { try { - respondOkWithStream(ctx, "text/html", new ByteArrayInputStream(html.toString().getBytes())); + respondOkWithStream(ctx, "text/html", new ByteArrayInputStream(html.getBytes())); } catch (Exception excp) { log.error(excp.getMessage(), excp); } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java index c420072..758d8c5 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java @@ -42,7 +42,7 @@ public class Utils { private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS"; public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth"; - private static final String AUTH_HEADER = "Authorization"; + public static final String AUTH_HEADER = "Authorization"; private static final String BATCH_ID_FORMAT = "000000"; private static final String X509_ATTR = "javax.servlet.request.X509Certificate"; private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class); diff --git a/src/main/java/org/onap/dmaap/service/EventsRestService.java b/src/main/java/org/onap/dmaap/service/EventsRestService.java index d3abd6b..b644ae4 100644 --- a/src/main/java/org/onap/dmaap/service/EventsRestService.java +++ b/src/main/java/org/onap/dmaap/service/EventsRestService.java @@ -70,8 +70,7 @@ public class EventsRestService { /** * Logger obj */ - //private Logger log = Logger.getLogger(EventsRestService.class.toString()); - private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class); + private static final EELFLogger log = EELFManager.getLogger(EventsRestService.class); /** * HttpServletRequest obj */ @@ -123,15 +122,13 @@ public class EventsRestService { public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup, @PathParam("consumerid") String consumerid) throws CambriaApiException { - // log.info("Consuming message from topic " + topic ); + log.info("Consuming message from topic " + topic ); DMaaPContext dMaaPContext = getDmaapContext(); dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); try { - eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid); - } - catch (TopicExistsException e) { + } catch (TopicExistsException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, @@ -141,11 +138,8 @@ public class EventsRestService { request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); - - } - catch (DMaaPAccessDeniedException | AccessDeniedException e) { + } catch (DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while reading data from topic [" + topic + "].", e); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, @@ -153,10 +147,7 @@ public class EventsRestService { request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); - - } - - catch (ConfigDbException | UnavailableException | IOException e) { + } catch (ConfigDbException | UnavailableException | IOException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, @@ -166,7 +157,6 @@ public class EventsRestService { request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } } @@ -185,8 +175,7 @@ public class EventsRestService { try { throw new TopicExistsException("Incorrect URL"); - } - catch (TopicExistsException e) { + } catch (TopicExistsException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, @@ -194,9 +183,7 @@ public class EventsRestService { ); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - } /** @@ -206,17 +193,13 @@ public class EventsRestService { @GET @Path("/{topic}/{consumergroup}") public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") - String consumergroup - ) throws CambriaApiException { + String consumergroup) throws CambriaApiException { // log.info("Consuming message from topic " + topic ); DMaaPContext dMaaPContext = getDmaapContext(); dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); - try { - throw new TopicExistsException("Incorrect URL"); - } - catch (TopicExistsException e) { + } catch (TopicExistsException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, @@ -224,16 +207,8 @@ public class EventsRestService { ); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - } - - - - - - /** * This method is used to publish messages.Taking two parameter topic and @@ -256,11 +231,9 @@ public class EventsRestService { public void pushEvents(@PathParam("topic") String topic, InputStream msg, @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { log.info("Publishing message to topic " + topic); - try { eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); - } - catch ( TopicExistsException e) { + } catch ( TopicExistsException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, @@ -269,8 +242,7 @@ public class EventsRestService { Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - catch ( DMaaPAccessDeniedException | AccessDeniedException e) { + } catch ( DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, @@ -279,10 +251,7 @@ public class EventsRestService { Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - - - catch (ConfigDbException | IOException | missingReqdSetting e) { + } catch (ConfigDbException | IOException | missingReqdSetting e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, @@ -313,18 +282,11 @@ public class EventsRestService { @Path("/transaction/{topic}") public void pushEventsWithTransaction(@PathParam("topic") String topic, @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { - // log.info("Publishing message with transaction id for topic " + topic - // ); - - try { - eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey, Utils.getFormattedDate(new Date())); - } - - catch ( TopicExistsException e) { + } catch ( TopicExistsException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, @@ -333,8 +295,7 @@ public class EventsRestService { Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - catch ( DMaaPAccessDeniedException| AccessDeniedException e) { + } catch ( DMaaPAccessDeniedException| AccessDeniedException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, @@ -343,9 +304,7 @@ public class EventsRestService { Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - - catch (ConfigDbException | IOException | missingReqdSetting e) { + } catch (ConfigDbException | IOException | missingReqdSetting e) { log.error("Error while publishing to topic : " + topic, e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, @@ -356,7 +315,6 @@ public class EventsRestService { null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } } diff --git a/src/main/java/org/onap/dmaap/service/TopicRestService.java b/src/main/java/org/onap/dmaap/service/TopicRestService.java index bbf2708..411c393 100644 --- a/src/main/java/org/onap/dmaap/service/TopicRestService.java +++ b/src/main/java/org/onap/dmaap/service/TopicRestService.java @@ -8,19 +8,23 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ - package org.onap.dmaap.service; +package org.onap.dmaap.service; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; import java.io.IOException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -31,20 +35,10 @@ import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; - import org.apache.http.HttpStatus; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - import org.json.JSONException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.beans.TopicBean; @@ -58,8 +52,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator; import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; import org.onap.dmaap.dmf.mr.service.TopicService; import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import org.onap.dmaap.dmf.mr.utils.Utils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; /** * This class is a CXF REST service which acts @@ -75,8 +71,8 @@ public class TopicRestService { /** * Logger obj */ - //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class); - private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class); + private static final EELFLogger LOGGER = EELFManager.getLogger(TopicRestService.class); + private static final String READ = " read "; /** * Config Reader */ @@ -101,448 +97,308 @@ public class TopicRestService { */ @Autowired private TopicService topicService; - + /** * DMaaPErrorMessages obj */ @Autowired private DMaaPErrorMessages errorMessages; - - /** - * mrNamespace - */ - //@Value("${msgRtr.namespace.aaf}") -// private String mrNamespace; - + + private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + dmaapContext.setConfigReader(configReader); + return dmaapContext; + } /** * Fetches a list of topics from the current kafka instance and converted * into json object. - * - * @return list of the topics in json format - * @throws AccessDeniedException - * @throws CambriaApiException + * + * @throws AccessDeniedException + * @throws CambriaApiException * @throws IOException * @throws JSONException * */ @GET - //@Produces(MediaType.TEXT_PLAIN) public void getTopics() throws CambriaApiException { try { - LOGGER.info("Authenticating the user before fetching the topics"); - //String permission = "com.att.dmaap.mr.topic|*|view"; - String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf"); - String permission =mrNameS+"|"+"*"+"|"+"view"; + String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf"); + String permission = mrNameS+"|"+"*"+"|"+"view"; DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); //Check if client is using AAF CADI Basic Authorization //If yes then check for AAF role authentication else display all topics - if(null!=getDmaapContext().getRequest().getHeader("Authorization")) - { - if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) - { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()); + if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER) && !aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) { + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2()); LOGGER.info(errRes.toString()); throw new DMaaPAccessDeniedException(errRes); - - - } - } - - LOGGER.info("Fetching all Topics"); - //topicService = new com.att.dmf.mr.service.impl.TopicServiceImpl(); - topicService.getTopics(getDmaapContext()); - - LOGGER.info("Returning List of all Topics"); - - + } + LOGGER.info("Fetching all Topics"); + topicService.getTopics(getDmaapContext()); + LOGGER.info("Returning List of all Topics"); } catch (JSONException | ConfigDbException | IOException excp) { - LOGGER.error( - "Failed to retrieve list of all topics: " - + excp.getMessage(), excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), - errorMessages.getTopicsfailure()+ excp.getMessage()); + LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), + errorMessages.getTopicsfailure() + excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - } } /** * Fetches a list of topics from the current kafka instance and converted * into json object. - * + * * @return list of the topics in json format - * @throws AccessDeniedException - * @throws CambriaApiException + * @throws AccessDeniedException + * @throws CambriaApiException * @throws IOException * @throws JSONException * */ @GET @Path("/listAll") - //@Produces(MediaType.TEXT_PLAIN) public void getAllTopics() throws CambriaApiException { try { - LOGGER.info("Authenticating the user before fetching the topics"); - //String permission = "com.att.dmaap.mr.topic|*|view"; - String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf"); - String permission =mrNameS+"|"+"*"+"|"+"view"; + String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf"); + String permission = mrNameS+"|"+"*"+"|"+"view"; DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); //Check if client is using AAF CADI Basic Authorization //If yes then check for AAF role authentication else display all topics - if(null!=getDmaapContext().getRequest().getHeader("Authorization")) - { - if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) - { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()); + if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) { + if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) { + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2()); LOGGER.info(errRes.toString()); throw new DMaaPAccessDeniedException(errRes); - - } - } - - LOGGER.info("Fetching all Topics"); - - topicService.getAllTopics(getDmaapContext()); - - LOGGER.info("Returning List of all Topics"); - - + } + LOGGER.info("Fetching all Topics"); + topicService.getAllTopics(getDmaapContext()); + LOGGER.info("Returning List of all Topics"); } catch (JSONException | ConfigDbException | IOException excp) { - LOGGER.error( - "Failed to retrieve list of all topics: " - + excp.getMessage(), excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), - errorMessages.getTopicsfailure()+ excp.getMessage()); + LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), + errorMessages.getTopicsfailure()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - } } - /** * Returns details of the topic whose name is passed as a parameter - * + * * @param topicName * - name of the topic * @return details of a topic whose name is mentioned in the request in json * format. - * @throws AccessDeniedException - * @throws DMaaPAccessDeniedException + * @throws AccessDeniedException + * @throws DMaaPAccessDeniedException * @throws IOException * */ @GET @Path("/{topicName}") - //@Produces(MediaType.TEXT_PLAIN) public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException { try { - - LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName); + LOGGER.info("Authenticating the user before fetching the details about topic = {}", topicName); DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - - //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view"; - //Check if client is using AAF CADI Basic Authorization //If yes then check for AAF role authentication else display all topics - if(null!=getDmaapContext().getRequest().getHeader("Authorization")) - { + if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) { String permission = aaf.aafPermissionString(topicName, "view"); - if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) - { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()); - LOGGER.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); - } - } - - LOGGER.info("Fetching Topic: " + topicName); - - topicService.getTopic(getDmaapContext(), topicName); - - LOGGER.info("Fetched details of topic: " + topicName); - + if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) { + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + } + } + LOGGER.info("Fetching Topic: {}", topicName); + topicService.getTopic(getDmaapContext(), topicName); + LOGGER.info("Fetched details of topic: {}", topicName); } catch (ConfigDbException | IOException | TopicExistsException excp) { - LOGGER.error("Failed to retrieve details of topic: " + topicName, - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(), - errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage()); + LOGGER.error("Failed to retrieve details of topic: " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(), + errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - } } - - /** * This method is still not working. Need to check on post call and how to * accept parameters for post call - * + * * @param topicBean * it will have the bean object * @throws TopicExistsException * @throws CambriaApiException - * @throws JSONException + * @throws JSONException * @throws IOException * @throws AccessDeniedException - * + * * */ @POST @Path("/create") @Consumes({ MediaType.APPLICATION_JSON }) - //@Produces(MediaType.TEXT_PLAIN) public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException { - try { + try { LOGGER.info("Creating Topic."+topicBean.getTopicName()); - topicService.createTopic(getDmaapContext(), topicBean); - LOGGER.info("Topic created Successfully."); - } - catch (TopicExistsException ex){ - - LOGGER.error("Error while creating a topic: " + ex.getMessage(), - ex); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + } catch (TopicExistsException ex){ + LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), errorMessages.getCreateTopicFail()+ ex.getMessage()); - LOGGER.info(errRes.toString()); - throw new CambriaApiException(errRes); - - - - - }catch (AccessDeniedException | DMaaPAccessDeniedException excp) { - LOGGER.error("Error while creating a topic: " + excp.getMessage(), - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), - errorMessages.getCreateTopicFail()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - }catch (CambriaApiException | IOException excp) { - LOGGER.error("Error while creating a topic: " + excp.getMessage(), - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), - errorMessages.getCreateTopicFail()+ excp.getMessage()); + } catch (AccessDeniedException | DMaaPAccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } catch (CambriaApiException | IOException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } } /** * Deletes existing topic whose name is passed as a parameter - * + * * @param topicName * topic - * @throws CambriaApiException + * @throws CambriaApiException * @throws IOException * */ @DELETE @Path("/{topicName}") - //@Produces(MediaType.TEXT_PLAIN) public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException { try { LOGGER.info("Deleting Topic: " + topicName); - topicService.deleteTopic(getDmaapContext(), topicName); - LOGGER.info("Topic [" + topicName + "] deleted successfully."); } catch (DMaaPAccessDeniedException| AccessDeniedException excp) { - LOGGER.error("Error while creating a topic: " + excp.getMessage(), - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), - errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.error("Error while deleting a topic: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - }catch (IOException | ConfigDbException - | CambriaApiException | TopicExistsException excp) { + } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) { LOGGER.error("Error while deleting topic: " + topicName, excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(), - errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage()); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(), + errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } } - private DMaaPContext getDmaapContext() { - - DMaaPContext dmaapContext = new DMaaPContext(); - dmaapContext.setRequest(request); - dmaapContext.setResponse(response); - dmaapContext.setConfigReader(configReader); - - return dmaapContext; - - } - /** * This method will fetch the details of publisher by giving topic name - * + * * @param topicName - * @throws CambriaApiException - * @throws AccessDeniedException + * @throws CambriaApiException + * @throws AccessDeniedException */ @GET @Path("/{topicName}/producers") - //@Produces(MediaType.TEXT_PLAIN) public void getPublishersByTopicName( - @PathParam("topicName") String topicName) throws CambriaApiException { + @PathParam("topicName") String topicName) throws CambriaApiException { try { - -// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; -// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); -// String permission = aaf.aafPermissionString(topicName, "view"); -// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) -// { - LOGGER.info("Fetching list of all the publishers for topic " - + topicName); - - topicService.getPublishersByTopicName(getDmaapContext(), topicName); - - LOGGER.info("Returning list of all the publishers for topic " - + topicName); -// }else{ -// LOGGER.error("Error while fetching list of publishers for topic "+ topicName); -// -// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, -// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), -// errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2()); -// LOGGER.info(errRes); -// throw new DMaaPAccessDeniedException(errRes); -// -// } - + LOGGER.info("Fetching list of all the publishers for topic " + topicName); + topicService.getPublishersByTopicName(getDmaapContext(), topicName); + LOGGER.info("Returning list of all the publishers for topic " + topicName); } catch (IOException | ConfigDbException | TopicExistsException excp) { - LOGGER.error("Error while fetching list of publishers for topic " - + topicName, excp); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(), - "Error while fetching list of publishers for topic: " - + topicName + excp.getMessage()); + LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(), + "Error while fetching list of publishers for topic: " + topicName + excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } } /** * proving permission for the topic for a particular publisher id - * + * * @param topicName * @param producerId - * @throws CambriaApiException + * @throws CambriaApiException */ @PUT @Path("/{topicName}/producers/{producerId}") public void permitPublisherForTopic( - @PathParam("topicName") String topicName, - @PathParam("producerId") String producerId) throws CambriaApiException { + @PathParam("topicName") String topicName, + @PathParam("producerId") String producerId) throws CambriaApiException { try { - LOGGER.info("Granting write access to producer [" + producerId - + "] for topic " + topicName); - - topicService.permitPublisherForTopic(getDmaapContext(), topicName, - producerId); - - LOGGER.info("Write access has been granted to producer [" - + producerId + "] for topic " + topicName); + LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); + topicService.permitPublisherForTopic(getDmaapContext(), topicName, producerId); + LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName); } catch (AccessDeniedException | DMaaPAccessDeniedException excp) { - LOGGER.error("Error while creating a topic: " + excp.getMessage(), - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), - errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - }catch ( ConfigDbException | IOException - | TopicExistsException excp) { + } catch ( ConfigDbException | IOException | TopicExistsException excp) { LOGGER.error("Error while granting write access to producer [" - + producerId + "] for topic " + topicName, excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(), - "Error while granting write access to producer [" - + producerId + "] for topic " + topicName + excp.getMessage()); + + producerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(), + "Error while granting write access to producer [" + + producerId + "] for topic " + topicName + excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } } /** * Removing access for a publisher id for any particular topic - * + * * @param topicName * @param producerId - * @throws CambriaApiException + * @throws CambriaApiException */ @DELETE @Path("/{topicName}/producers/{producerId}") public void denyPublisherForTopic(@PathParam("topicName") String topicName, - @PathParam("producerId") String producerId) throws CambriaApiException { + @PathParam("producerId") String producerId) throws CambriaApiException { try { - LOGGER.info("Revoking write access to producer [" + producerId - + "] for topic " + topicName); - - topicService.denyPublisherForTopic(getDmaapContext(), topicName, - producerId); - - LOGGER.info("Write access revoked for producer [" + producerId - + "] for topic " + topicName); + LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); + topicService.denyPublisherForTopic(getDmaapContext(), topicName, producerId); + LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName); } catch (DMaaPAccessDeniedException | AccessDeniedException excp) { - LOGGER.error("Error while creating a topic: " + excp.getMessage(), - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), - errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - }catch ( ConfigDbException | IOException - | TopicExistsException excp) { - LOGGER.error("Error while revoking write access for producer [" - + producerId + "] for topic " + topicName, excp); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(), - "Error while revoking write access to producer [" - + producerId + "] for topic " + topicName + excp.getMessage()); + } catch ( ConfigDbException | IOException | TopicExistsException excp) { + LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(), + "Error while revoking write access to producer [" + + producerId + "] for topic " + topicName + excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } @@ -550,136 +406,87 @@ public class TopicRestService { /** * Get the consumer details by the topic name - * + * * @param topicName - * @throws AccessDeniedException - * @throws CambriaApiException + * @throws CambriaApiException */ @GET @Path("/{topicName}/consumers") - //@Produces(MediaType.TEXT_PLAIN) - public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException, - CambriaApiException { + public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException { try { - - -// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view"; -// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); -// String permission = aaf.aafPermissionString(topicName, "view"); -// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) -// { - LOGGER.info("Fetching list of all consumers for topic " + topicName); - - topicService.getConsumersByTopicName(getDmaapContext(), topicName); - - LOGGER.info("Returning list of all consumers for topic " - + topicName); - -// }else{ -// LOGGER.error( -// "Error while fetching list of all consumers for topic " -// + topicName); -// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, -// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), -// errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2()); -// LOGGER.info(errRes); -// throw new DMaaPAccessDeniedException(errRes); -// -// -// } - - - + LOGGER.info("Fetching list of all consumers for topic " + topicName); + topicService.getConsumersByTopicName(getDmaapContext(), topicName); + LOGGER.info("Returning list of all consumers for topic " + topicName); } catch (IOException | ConfigDbException | TopicExistsException excp) { - LOGGER.error( - "Error while fetching list of all consumers for topic " - + topicName, excp); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(), - "Error while fetching list of all consumers for topic: " - + topicName+ excp.getMessage()); + LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(), + "Error while fetching list of all consumers for topic: " + topicName+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } } /** * providing access for consumer for any particular topic - * + * * @param topicName * @param consumerId - * @throws CambriaApiException + * @throws CambriaApiException */ @PUT @Path("/{topicName}/consumers/{consumerId}") public void permitConsumerForTopic( - @PathParam("topicName") String topicName, - @PathParam("consumerId") String consumerId) throws CambriaApiException { + @PathParam("topicName") String topicName, + @PathParam("consumerId") String consumerId) throws CambriaApiException { try { - LOGGER.info("Granting read access to consumer [" + consumerId - + "] for topic " + topicName); - - topicService.permitConsumerForTopic(getDmaapContext(), topicName, - consumerId); - - LOGGER.info("Read access granted to consumer [" + consumerId - + "] for topic " + topicName); + LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); + topicService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId); + LOGGER.info("Read access granted to consumer [{0}] for topic {1}", consumerId, topicName); } catch (AccessDeniedException | ConfigDbException | IOException - | TopicExistsException excp) { - LOGGER.error("Error while granting read access to consumer [" - + consumerId + "] for topic " + topicName, excp); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(), - "Error while granting read access to consumer [" - + consumerId + "] for topic " + topicName+ excp.getMessage()); + | TopicExistsException excp) { + LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(), + "Error while granting read access to consumer [" + + consumerId + "] for topic " + topicName+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - } } /** * Removing access for consumer for any particular topic - * + * * @param topicName * @param consumerId - * @throws CambriaApiException + * @throws CambriaApiException */ @DELETE @Path("/{topicName}/consumers/{consumerId}") public void denyConsumerForTopic(@PathParam("topicName") String topicName, - @PathParam("consumerId") String consumerId) throws CambriaApiException { + @PathParam("consumerId") String consumerId) throws CambriaApiException { try { - LOGGER.info("Revoking read access to consumer [" + consumerId - + "] for topic " + topicName); - - topicService.denyConsumerForTopic(getDmaapContext(), topicName, - consumerId); - - LOGGER.info("Read access revoked to consumer [" + consumerId - + "] for topic " + topicName); + LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); + topicService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId); + LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName); } catch ( ConfigDbException | IOException - | TopicExistsException excp) { - LOGGER.error("Error while revoking read access to consumer [" - + consumerId + "] for topic " + topicName, excp); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(), - "Error while revoking read access to consumer [" - + consumerId + "] for topic " + topicName+ excp.getMessage()); + | TopicExistsException excp) { + LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(), + "Error while revoking read access to consumer [" + + consumerId + "] for topic " + topicName+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - }catch (DMaaPAccessDeniedException | AccessDeniedException excp) { - LOGGER.error("Error while creating a topic: " + excp.getMessage(), - excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), - errorMessages.getCreateTopicFail()+ excp.getMessage()); + } catch (DMaaPAccessDeniedException | AccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); - - } + } } public TopicService getTopicService() { @@ -690,7 +497,4 @@ public class TopicRestService { this.topicService = topicService; } - - - } diff --git a/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java b/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java index 5c7170b..e8c69e4 100644 --- a/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java +++ b/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java @@ -45,6 +45,7 @@ import org.onap.aaf.cadi.filter.CadiFilter; public class DMaaPAuthFilter extends CadiFilter { private static final String FORCE_AAF_FLAG = "forceAAF"; + private static final String USE_CUSTOM_ACLS = "useCustomAcls"; static final String X509_ATTR = "javax.servlet.request.X509Certificate"; static final String AUTH_HEADER = "Authorization"; static final String APP_HEADER = "AppName"; @@ -93,7 +94,11 @@ public class DMaaPAuthFilter extends CadiFilter { } boolean isAAFforced() { - return Boolean.valueOf(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, FORCE_AAF_FLAG)); + return Boolean.parseBoolean(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, FORCE_AAF_FLAG)); + } + + public static boolean isUseCustomAcls() { + return Boolean.parseBoolean(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, USE_CUSTOM_ACLS)); } } diff --git a/src/test/resources/MsgRtrApi.properties b/src/test/resources/MsgRtrApi.properties index 3c2e346..ae97d9d 100644 --- a/src/test/resources/MsgRtrApi.properties +++ b/src/test/resources/MsgRtrApi.properties @@ -151,6 +151,7 @@ msgRtr.namespace.aaf=org.onap.dmaap.mr.topic msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic: enforced.topic.name.AAF=org.onap.dmaap.mr forceAAF=false +useCustomAcls=false transidUEBtopicreqd=false defaultNSforUEB=org.onap.dmaap.mr ############################################################################## diff --git a/version.properties b/version.properties index 4b00b94..b7a70d5 100644 --- a/version.properties +++ b/version.properties @@ -27,7 +27,7 @@ major=1 minor=4 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} |