summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bundleconfig-local/etc/appprops/MsgRtrApi.properties1
-rw-r--r--bundleconfig-local/etc/sysprops/sys-props.properties1
-rw-r--r--csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties1
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java162
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java2
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java2
-rw-r--r--src/main/java/org/onap/dmaap/service/EventsRestService.java70
-rw-r--r--src/main/java/org/onap/dmaap/service/TopicRestService.java582
-rw-r--r--src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java7
-rw-r--r--src/test/resources/MsgRtrApi.properties1
-rw-r--r--version.properties2
12 files changed, 239 insertions, 594 deletions
diff --git a/bundleconfig-local/etc/appprops/MsgRtrApi.properties b/bundleconfig-local/etc/appprops/MsgRtrApi.properties
index 3aef922..b36ac0f 100644
--- a/bundleconfig-local/etc/appprops/MsgRtrApi.properties
+++ b/bundleconfig-local/etc/appprops/MsgRtrApi.properties
@@ -151,6 +151,7 @@ msgRtr.namespace.aaf=org.onap.dmaap.mr.topic
msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
enforced.topic.name.AAF=org.onap.dmaap.mr
forceAAF=false
+useCustomAcls=false
transidUEBtopicreqd=false
defaultNSforUEB=org.onap.dmaap.mr
##############################################################################
diff --git a/bundleconfig-local/etc/sysprops/sys-props.properties b/bundleconfig-local/etc/sysprops/sys-props.properties
index cfba8d0..eccd1ee 100644
--- a/bundleconfig-local/etc/sysprops/sys-props.properties
+++ b/bundleconfig-local/etc/sysprops/sys-props.properties
@@ -161,3 +161,4 @@ RESTLET_COMPONENT_REUSE_ADDRESS=true
maxcontentlength=10000
msg_size_exceeds=Message size exceeds the default size.
forceAAF=false
+useCustomAcls=false
diff --git a/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties b/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties
index 4764321..9341144 100644
--- a/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties
+++ b/csit/scripts/dmaap-message-router/docker-compose/mr/MsgRtrApi.properties
@@ -147,6 +147,7 @@ msgRtr.namespace.aaf=org.onap.dmaap.mr.topic
msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
enforced.topic.name.AAF=org.onap.dmaap.mr
forceAAF=false
+useCustomAcls=false
transidUEBtopicreqd=false
defaultNSforUEB=org.onap.dmaap.mr
##############################################################################
diff --git a/pom.xml b/pom.xml
index 9a3f661..91b5fee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
<groupId>org.onap.dmaap.messagerouter.messageservice</groupId>
<artifactId>dmaapMR1</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.4.1-SNAPSHOT</version>
<name>dmaap-messagerouter-messageservice</name>
<description>Message Router - Restful interface built for kafka</description>
<licenses>
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
index 2235098..92aea97 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
@@ -21,6 +21,8 @@
*/
package org.onap.dmaap.dmf.mr.service.impl;
+import static org.onap.dmaap.util.DMaaPAuthFilter.isUseCustomAcls;
+
import com.att.ajsc.beans.PropertiesMapBean;
import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import com.att.eelf.configuration.EELFLogger;
@@ -69,7 +71,7 @@ import java.security.Principal;
public class TopicServiceImpl implements TopicService {
private static final String TOPIC_CREATE_OP = "create";
- private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
+ private static final EELFLogger LOGGER = EELFManager.getLogger(TopicServiceImpl.class);
@Autowired
private DMaaPErrorMessages errorMessages;
@@ -116,18 +118,13 @@ public class TopicServiceImpl implements TopicService {
public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
LOGGER.info("Fetching list of all the topics.");
JSONObject json = new JSONObject();
-
JSONArray topicsList = new JSONArray();
-
for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
topicsList.put(topic.getName());
}
-
json.put("topics", topicsList);
-
LOGGER.info("Returning list of all the topics.");
respondOk(dmaapContext, json);
-
}
/**
@@ -138,12 +135,9 @@ public class TopicServiceImpl implements TopicService {
*
*/
public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
-
LOGGER.info("Fetching list of all the topics.");
JSONObject json = new JSONObject();
-
JSONArray topicsList = new JSONArray();
-
for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
JSONObject obj = new JSONObject();
obj.put("topicName", topic.getName());
@@ -152,12 +146,9 @@ public class TopicServiceImpl implements TopicService {
obj.put("txenabled", topic.isTransactionEnabled());
topicsList.put(obj);
}
-
json.put("topics", topicsList);
-
LOGGER.info("Returning list of all the topics.");
respondOk(dmaapContext, json);
-
}
/**
@@ -170,29 +161,23 @@ public class TopicServiceImpl implements TopicService {
@Override
public void getTopic(DMaaPContext dmaapContext, String topicName)
throws ConfigDbException, IOException, TopicExistsException {
-
LOGGER.info("Fetching details of topic " + topicName);
Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == t) {
LOGGER.error("Topic [" + topicName + "] does not exist.");
throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
}
-
JSONObject o = new JSONObject();
o.put("name", t.getName());
o.put("description", t.getDescription());
-
if (null != t.getOwners())
o.put("owner", t.getOwners().iterator().next());
if (null != t.getReaderAcl())
o.put("readerAcl", aclToJson(t.getReaderAcl()));
if (null != t.getWriterAcl())
o.put("writerAcl", aclToJson(t.getWriterAcl()));
-
LOGGER.info("Returning details of topic " + topicName);
respondOk(dmaapContext, o);
-
}
/**
@@ -212,26 +197,20 @@ public class TopicServiceImpl implements TopicService {
String topicName = topicBean.getTopicName();
LOGGER.info("Creating topic {}",topicName);
String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
-
try {
final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
-
final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
key, partitions, replicas, topicBean.isTransactionEnabled());
-
LOGGER.info("Topic {} created successfully. Sending response", topicName);
respondOk(dmaapContext, topicToJson(t));
} catch (JSONException ex) {
-
LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
} catch (ConfigDbException ex) {
-
LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
@@ -257,7 +236,7 @@ public class TopicServiceImpl implements TopicService {
DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
"Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
}
- } else if(operation.equals(TOPIC_CREATE_OP)){
+ } else if (operation.equals(TOPIC_CREATE_OP)){
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
clientId = (user != null) ? user.getKey() : Strings.EMPTY;
}
@@ -309,18 +288,14 @@ public class TopicServiceImpl implements TopicService {
@Override
public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
-
LOGGER.info(" Deleting topic " + topicName);
authorizeClient(dmaapContext, topicName, "destroy");
-
final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
if (topic == null) {
LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
}
-
// metabroker.deleteTopic(topicName);
-
LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
}
@@ -347,18 +322,14 @@ public class TopicServiceImpl implements TopicService {
throws ConfigDbException, IOException, TopicExistsException {
LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (topic == null) {
LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
throw new TopicExistsException(
"Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
}
-
final NsaAcl acl = topic.getWriterAcl();
-
LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
respondOk(dmaapContext, aclToJson(acl));
-
}
/**
@@ -392,15 +363,12 @@ public class TopicServiceImpl implements TopicService {
throws IOException, ConfigDbException, TopicExistsException {
LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (topic == null) {
LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
throw new TopicExistsException(
"Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
}
-
final NsaAcl acl = topic.getReaderAcl();
-
LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
respondOk(dmaapContext, aclToJson(acl));
@@ -432,49 +400,24 @@ public class TopicServiceImpl implements TopicService {
*/
@Override
public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
-
+ throws AccessDeniedException, ConfigDbException, TopicExistsException {
LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
-
- //
- // LOGGER.info("Authenticating the user, as ACL authentication is not
-
- //// String permission =
-
- //
-
-
-
- // {
- // LOGGER.error("Failed to permit write access to producer [" +
- // producerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant publish permissions>
-
-
-
- // }
- // }
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to permit write access to producer [" + producerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
- topic.permitWritesFromUser(producerId, user);
-
- LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
+ if (isUseCustomAcls()) {
+ topic.permitWritesFromUser(producerId, user);
+ LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
+ "]. Sending response.");
+ } else {
+ LOGGER.info("Ignoring acl update");
+ }
respondOk(dmaapContext, "Write access has been granted to publisher.");
-
}
/**
@@ -492,45 +435,19 @@ public class TopicServiceImpl implements TopicService {
public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
DMaaPAccessDeniedException {
-
LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
- //
- //// String permission =
-
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to revoke write access to producer [" +
- // producerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
-
-
- // throw new DMaaPAccessDeniedException(errRes);
- //
-
- // }
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
topic.denyWritesFromUser(producerId, user);
-
LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
+ "]. Sending response.");
respondOk(dmaapContext, "Write access has been revoked for publisher.");
-
}
/**
@@ -541,44 +458,24 @@ public class TopicServiceImpl implements TopicService {
*/
@Override
public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
- DMaaPAccessDeniedException {
+ throws AccessDeniedException, ConfigDbException, TopicExistsException {
LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
- //
- //// String permission =
-
-
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to permit read access to consumer [" +
- // consumerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-
-
-
- // }
- // }
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
- topic.permitReadsByUser(consumerId, user);
-
- LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
+ if (isUseCustomAcls()) {
+ topic.permitReadsByUser(consumerId, user);
+ LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
+ "]. Sending response.");
+ } else {
+ LOGGER.info("Ignoring acl update");
+ }
respondOk(dmaapContext,
"Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
}
@@ -596,37 +493,14 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
- //// String permission =
-
-
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to revoke read access to consumer [" +
- // consumerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-
-
- // throw new DMaaPAccessDeniedException(errRes);
- // }
- //
- //
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
topic.denyReadsByUser(consumerId, user);
-
LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
+ "]. Sending response.");
respondOk(dmaapContext,
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java
index 3ca60b0..419c652 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/DMaaPResponseBuilder.java
@@ -86,7 +86,7 @@ public class DMaaPResponseBuilder {
*/
public static void respondOkWithHtml(DMaaPContext ctx, String html) {
try {
- respondOkWithStream(ctx, "text/html", new ByteArrayInputStream(html.toString().getBytes()));
+ respondOkWithStream(ctx, "text/html", new ByteArrayInputStream(html.getBytes()));
} catch (Exception excp) {
log.error(excp.getMessage(), excp);
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
index c420072..758d8c5 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
@@ -42,7 +42,7 @@ public class Utils {
private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth";
- private static final String AUTH_HEADER = "Authorization";
+ public static final String AUTH_HEADER = "Authorization";
private static final String BATCH_ID_FORMAT = "000000";
private static final String X509_ATTR = "javax.servlet.request.X509Certificate";
private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
diff --git a/src/main/java/org/onap/dmaap/service/EventsRestService.java b/src/main/java/org/onap/dmaap/service/EventsRestService.java
index d3abd6b..b644ae4 100644
--- a/src/main/java/org/onap/dmaap/service/EventsRestService.java
+++ b/src/main/java/org/onap/dmaap/service/EventsRestService.java
@@ -70,8 +70,7 @@ public class EventsRestService {
/**
* Logger obj
*/
- //private Logger log = Logger.getLogger(EventsRestService.class.toString());
- private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
+ private static final EELFLogger log = EELFManager.getLogger(EventsRestService.class);
/**
* HttpServletRequest obj
*/
@@ -123,15 +122,13 @@ public class EventsRestService {
public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
String consumergroup,
@PathParam("consumerid") String consumerid) throws CambriaApiException {
- // log.info("Consuming message from topic " + topic );
+ log.info("Consuming message from topic " + topic );
DMaaPContext dMaaPContext = getDmaapContext();
dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
try {
-
eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
- }
- catch (TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
@@ -141,11 +138,8 @@ public class EventsRestService {
request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }
- catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
-
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
@@ -153,10 +147,7 @@ public class EventsRestService {
request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }
-
- catch (ConfigDbException | UnavailableException | IOException e) {
+ } catch (ConfigDbException | UnavailableException | IOException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
@@ -166,7 +157,6 @@ public class EventsRestService {
request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
@@ -185,8 +175,7 @@ public class EventsRestService {
try {
throw new TopicExistsException("Incorrect URL");
- }
- catch (TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
@@ -194,9 +183,7 @@ public class EventsRestService {
);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
-
}
/**
@@ -206,17 +193,13 @@ public class EventsRestService {
@GET
@Path("/{topic}/{consumergroup}")
public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
- String consumergroup
- ) throws CambriaApiException {
+ String consumergroup) throws CambriaApiException {
// log.info("Consuming message from topic " + topic );
DMaaPContext dMaaPContext = getDmaapContext();
dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
-
try {
-
throw new TopicExistsException("Incorrect URL");
- }
- catch (TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
@@ -224,16 +207,8 @@ public class EventsRestService {
);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
-
}
-
-
-
-
-
-
/**
* This method is used to publish messages.Taking two parameter topic and
@@ -256,11 +231,9 @@ public class EventsRestService {
public void pushEvents(@PathParam("topic") String topic, InputStream msg,
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
log.info("Publishing message to topic " + topic);
-
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
- }
- catch ( TopicExistsException e) {
+ } catch ( TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
@@ -269,8 +242,7 @@ public class EventsRestService {
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
@@ -279,10 +251,7 @@ public class EventsRestService {
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
-
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+ } catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
@@ -313,18 +282,11 @@ public class EventsRestService {
@Path("/transaction/{topic}")
public void pushEventsWithTransaction(@PathParam("topic") String topic,
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
- // log.info("Publishing message with transaction id for topic " + topic
- // );
-
-
try {
-
eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
partitionKey,
Utils.getFormattedDate(new Date()));
- }
-
- catch ( TopicExistsException e) {
+ } catch ( TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
@@ -333,8 +295,7 @@ public class EventsRestService {
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
+ } catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
@@ -343,9 +304,7 @@ public class EventsRestService {
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+ } catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic : " + topic, e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
@@ -356,7 +315,6 @@ public class EventsRestService {
null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
diff --git a/src/main/java/org/onap/dmaap/service/TopicRestService.java b/src/main/java/org/onap/dmaap/service/TopicRestService.java
index bbf2708..411c393 100644
--- a/src/main/java/org/onap/dmaap/service/TopicRestService.java
+++ b/src/main/java/org/onap/dmaap/service/TopicRestService.java
@@ -8,19 +8,23 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
- package org.onap.dmaap.service;
+package org.onap.dmaap.service;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -31,20 +35,10 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
-
import org.apache.http.HttpStatus;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.json.JSONException;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
import org.onap.dmaap.dmf.mr.beans.TopicBean;
@@ -58,8 +52,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
import org.onap.dmaap.dmf.mr.service.TopicService;
import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import org.onap.dmaap.dmf.mr.utils.Utils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
/**
* This class is a CXF REST service which acts
@@ -75,8 +71,8 @@ public class TopicRestService {
/**
* Logger obj
*/
- //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class);
- private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
+ private static final EELFLogger LOGGER = EELFManager.getLogger(TopicRestService.class);
+ private static final String READ = " read ";
/**
* Config Reader
*/
@@ -101,448 +97,308 @@ public class TopicRestService {
*/
@Autowired
private TopicService topicService;
-
+
/**
* DMaaPErrorMessages obj
*/
@Autowired
private DMaaPErrorMessages errorMessages;
-
- /**
- * mrNamespace
- */
- //@Value("${msgRtr.namespace.aaf}")
-// private String mrNamespace;
-
+
+ private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ dmaapContext.setConfigReader(configReader);
+ return dmaapContext;
+ }
/**
* Fetches a list of topics from the current kafka instance and converted
* into json object.
- *
- * @return list of the topics in json format
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ *
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
* @throws IOException
* @throws JSONException
* */
@GET
- //@Produces(MediaType.TEXT_PLAIN)
public void getTopics() throws CambriaApiException {
try {
-
LOGGER.info("Authenticating the user before fetching the topics");
- //String permission = "com.att.dmaap.mr.topic|*|view";
- String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
- String permission =mrNameS+"|"+"*"+"|"+"view";
+ String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+ String permission = mrNameS+"|"+"*"+"|"+"view";
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
//Check if client is using AAF CADI Basic Authorization
//If yes then check for AAF role authentication else display all topics
- if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
- {
- if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
- {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER) && !aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
LOGGER.info(errRes.toString());
throw new DMaaPAccessDeniedException(errRes);
-
-
- }
- }
-
- LOGGER.info("Fetching all Topics");
- //topicService = new com.att.dmf.mr.service.impl.TopicServiceImpl();
- topicService.getTopics(getDmaapContext());
-
- LOGGER.info("Returning List of all Topics");
-
-
+ }
+ LOGGER.info("Fetching all Topics");
+ topicService.getTopics(getDmaapContext());
+ LOGGER.info("Returning List of all Topics");
} catch (JSONException | ConfigDbException | IOException excp) {
- LOGGER.error(
- "Failed to retrieve list of all topics: "
- + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
- errorMessages.getTopicsfailure()+ excp.getMessage());
+ LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure() + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
-
}
}
/**
* Fetches a list of topics from the current kafka instance and converted
* into json object.
- *
+ *
* @return list of the topics in json format
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
* @throws IOException
* @throws JSONException
* */
@GET
@Path("/listAll")
- //@Produces(MediaType.TEXT_PLAIN)
public void getAllTopics() throws CambriaApiException {
try {
-
LOGGER.info("Authenticating the user before fetching the topics");
- //String permission = "com.att.dmaap.mr.topic|*|view";
- String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
- String permission =mrNameS+"|"+"*"+"|"+"view";
+ String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+ String permission = mrNameS+"|"+"*"+"|"+"view";
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
//Check if client is using AAF CADI Basic Authorization
//If yes then check for AAF role authentication else display all topics
- if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
- {
- if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
- {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) {
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
LOGGER.info(errRes.toString());
throw new DMaaPAccessDeniedException(errRes);
-
-
}
- }
-
- LOGGER.info("Fetching all Topics");
-
- topicService.getAllTopics(getDmaapContext());
-
- LOGGER.info("Returning List of all Topics");
-
-
+ }
+ LOGGER.info("Fetching all Topics");
+ topicService.getAllTopics(getDmaapContext());
+ LOGGER.info("Returning List of all Topics");
} catch (JSONException | ConfigDbException | IOException excp) {
- LOGGER.error(
- "Failed to retrieve list of all topics: "
- + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
- errorMessages.getTopicsfailure()+ excp.getMessage());
+ LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
-
}
}
-
/**
* Returns details of the topic whose name is passed as a parameter
- *
+ *
* @param topicName
* - name of the topic
* @return details of a topic whose name is mentioned in the request in json
* format.
- * @throws AccessDeniedException
- * @throws DMaaPAccessDeniedException
+ * @throws AccessDeniedException
+ * @throws DMaaPAccessDeniedException
* @throws IOException
* */
@GET
@Path("/{topicName}")
- //@Produces(MediaType.TEXT_PLAIN)
public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
- LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
+ LOGGER.info("Authenticating the user before fetching the details about topic = {}", topicName);
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-
- //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
-
//Check if client is using AAF CADI Basic Authorization
//If yes then check for AAF role authentication else display all topics
- if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
- {
+ if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) {
String permission = aaf.aafPermissionString(topicName, "view");
- if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
- {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
- LOGGER.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
- }
- }
-
- LOGGER.info("Fetching Topic: " + topicName);
-
- topicService.getTopic(getDmaapContext(), topicName);
-
- LOGGER.info("Fetched details of topic: " + topicName);
-
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+ }
+ LOGGER.info("Fetching Topic: {}", topicName);
+ topicService.getTopic(getDmaapContext(), topicName);
+ LOGGER.info("Fetched details of topic: {}", topicName);
} catch (ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Failed to retrieve details of topic: " + topicName,
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
- errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
+ LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
+ errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
-
}
}
-
-
/**
* This method is still not working. Need to check on post call and how to
* accept parameters for post call
- *
+ *
* @param topicBean
* it will have the bean object
* @throws TopicExistsException
* @throws CambriaApiException
- * @throws JSONException
+ * @throws JSONException
* @throws IOException
* @throws AccessDeniedException
- *
+ *
* */
@POST
@Path("/create")
@Consumes({ MediaType.APPLICATION_JSON })
- //@Produces(MediaType.TEXT_PLAIN)
public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
- try {
+ try {
LOGGER.info("Creating Topic."+topicBean.getTopicName());
-
topicService.createTopic(getDmaapContext(), topicBean);
-
LOGGER.info("Topic created Successfully.");
- }
- catch (TopicExistsException ex){
-
- LOGGER.error("Error while creating a topic: " + ex.getMessage(),
- ex);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ } catch (TopicExistsException ex){
+ LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
errorMessages.getCreateTopicFail()+ ex.getMessage());
- LOGGER.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
-
-
-
- }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch (CambriaApiException | IOException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ } catch (CambriaApiException | IOException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
/**
* Deletes existing topic whose name is passed as a parameter
- *
+ *
* @param topicName
* topic
- * @throws CambriaApiException
+ * @throws CambriaApiException
* @throws IOException
* */
@DELETE
@Path("/{topicName}")
- //@Produces(MediaType.TEXT_PLAIN)
public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
LOGGER.info("Deleting Topic: " + topicName);
-
topicService.deleteTopic(getDmaapContext(), topicName);
-
LOGGER.info("Topic [" + topicName + "] deleted successfully.");
} catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.error("Error while deleting a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch (IOException | ConfigDbException
- | CambriaApiException | TopicExistsException excp) {
+ } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
LOGGER.error("Error while deleting topic: " + topicName, excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
- private DMaaPContext getDmaapContext() {
-
- DMaaPContext dmaapContext = new DMaaPContext();
- dmaapContext.setRequest(request);
- dmaapContext.setResponse(response);
- dmaapContext.setConfigReader(configReader);
-
- return dmaapContext;
-
- }
-
/**
* This method will fetch the details of publisher by giving topic name
- *
+ *
* @param topicName
- * @throws CambriaApiException
- * @throws AccessDeniedException
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
*/
@GET
@Path("/{topicName}/producers")
- //@Produces(MediaType.TEXT_PLAIN)
public void getPublishersByTopicName(
- @PathParam("topicName") String topicName) throws CambriaApiException {
+ @PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
-// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
-// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-// String permission = aaf.aafPermissionString(topicName, "view");
-// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
-// {
- LOGGER.info("Fetching list of all the publishers for topic "
- + topicName);
-
- topicService.getPublishersByTopicName(getDmaapContext(), topicName);
-
- LOGGER.info("Returning list of all the publishers for topic "
- + topicName);
-// }else{
-// LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
-//
-// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-// errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
-// LOGGER.info(errRes);
-// throw new DMaaPAccessDeniedException(errRes);
-//
-// }
-
+ LOGGER.info("Fetching list of all the publishers for topic " + topicName);
+ topicService.getPublishersByTopicName(getDmaapContext(), topicName);
+ LOGGER.info("Returning list of all the publishers for topic " + topicName);
} catch (IOException | ConfigDbException | TopicExistsException excp) {
- LOGGER.error("Error while fetching list of publishers for topic "
- + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
- "Error while fetching list of publishers for topic: "
- + topicName + excp.getMessage());
+ LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
/**
* proving permission for the topic for a particular publisher id
- *
+ *
* @param topicName
* @param producerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@PUT
@Path("/{topicName}/producers/{producerId}")
public void permitPublisherForTopic(
- @PathParam("topicName") String topicName,
- @PathParam("producerId") String producerId) throws CambriaApiException {
+ @PathParam("topicName") String topicName,
+ @PathParam("producerId") String producerId) throws CambriaApiException {
try {
- LOGGER.info("Granting write access to producer [" + producerId
- + "] for topic " + topicName);
-
- topicService.permitPublisherForTopic(getDmaapContext(), topicName,
- producerId);
-
- LOGGER.info("Write access has been granted to producer ["
- + producerId + "] for topic " + topicName);
+ LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
+ topicService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
+ LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
} catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch ( ConfigDbException | IOException
- | TopicExistsException excp) {
+ } catch ( ConfigDbException | IOException | TopicExistsException excp) {
LOGGER.error("Error while granting write access to producer ["
- + producerId + "] for topic " + topicName, excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
- "Error while granting write access to producer ["
- + producerId + "] for topic " + topicName + excp.getMessage());
+ + producerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while granting write access to producer ["
+ + producerId + "] for topic " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
/**
* Removing access for a publisher id for any particular topic
- *
+ *
* @param topicName
* @param producerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@DELETE
@Path("/{topicName}/producers/{producerId}")
public void denyPublisherForTopic(@PathParam("topicName") String topicName,
- @PathParam("producerId") String producerId) throws CambriaApiException {
+ @PathParam("producerId") String producerId) throws CambriaApiException {
try {
- LOGGER.info("Revoking write access to producer [" + producerId
- + "] for topic " + topicName);
-
- topicService.denyPublisherForTopic(getDmaapContext(), topicName,
- producerId);
-
- LOGGER.info("Write access revoked for producer [" + producerId
- + "] for topic " + topicName);
+ LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
+ topicService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
+ LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
} catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }catch ( ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while revoking write access for producer ["
- + producerId + "] for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
- "Error while revoking write access to producer ["
- + producerId + "] for topic " + topicName + excp.getMessage());
+ } catch ( ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking write access to producer ["
+ + producerId + "] for topic " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
}
@@ -550,136 +406,87 @@ public class TopicRestService {
/**
* Get the consumer details by the topic name
- *
+ *
* @param topicName
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@GET
@Path("/{topicName}/consumers")
- //@Produces(MediaType.TEXT_PLAIN)
- public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException,
- CambriaApiException {
+ public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
-
-// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
-// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-// String permission = aaf.aafPermissionString(topicName, "view");
-// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
-// {
- LOGGER.info("Fetching list of all consumers for topic " + topicName);
-
- topicService.getConsumersByTopicName(getDmaapContext(), topicName);
-
- LOGGER.info("Returning list of all consumers for topic "
- + topicName);
-
-// }else{
-// LOGGER.error(
-// "Error while fetching list of all consumers for topic "
-// + topicName);
-// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-// errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
-// LOGGER.info(errRes);
-// throw new DMaaPAccessDeniedException(errRes);
-//
-//
-// }
-
-
-
+ LOGGER.info("Fetching list of all consumers for topic " + topicName);
+ topicService.getConsumersByTopicName(getDmaapContext(), topicName);
+ LOGGER.info("Returning list of all consumers for topic " + topicName);
} catch (IOException | ConfigDbException | TopicExistsException excp) {
- LOGGER.error(
- "Error while fetching list of all consumers for topic "
- + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
- "Error while fetching list of all consumers for topic: "
- + topicName+ excp.getMessage());
+ LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of all consumers for topic: " + topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
/**
* providing access for consumer for any particular topic
- *
+ *
* @param topicName
* @param consumerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@PUT
@Path("/{topicName}/consumers/{consumerId}")
public void permitConsumerForTopic(
- @PathParam("topicName") String topicName,
- @PathParam("consumerId") String consumerId) throws CambriaApiException {
+ @PathParam("topicName") String topicName,
+ @PathParam("consumerId") String consumerId) throws CambriaApiException {
try {
- LOGGER.info("Granting read access to consumer [" + consumerId
- + "] for topic " + topicName);
-
- topicService.permitConsumerForTopic(getDmaapContext(), topicName,
- consumerId);
-
- LOGGER.info("Read access granted to consumer [" + consumerId
- + "] for topic " + topicName);
+ LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
+ topicService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
+ LOGGER.info("Read access granted to consumer [{0}] for topic {1}", consumerId, topicName);
} catch (AccessDeniedException | ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while granting read access to consumer ["
- + consumerId + "] for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
- "Error while granting read access to consumer ["
- + consumerId + "] for topic " + topicName+ excp.getMessage());
+ | TopicExistsException excp) {
+ LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while granting read access to consumer ["
+ + consumerId + "] for topic " + topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
/**
* Removing access for consumer for any particular topic
- *
+ *
* @param topicName
* @param consumerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@DELETE
@Path("/{topicName}/consumers/{consumerId}")
public void denyConsumerForTopic(@PathParam("topicName") String topicName,
- @PathParam("consumerId") String consumerId) throws CambriaApiException {
+ @PathParam("consumerId") String consumerId) throws CambriaApiException {
try {
- LOGGER.info("Revoking read access to consumer [" + consumerId
- + "] for topic " + topicName);
-
- topicService.denyConsumerForTopic(getDmaapContext(), topicName,
- consumerId);
-
- LOGGER.info("Read access revoked to consumer [" + consumerId
- + "] for topic " + topicName);
+ LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
+ topicService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
+ LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
} catch ( ConfigDbException | IOException
- | TopicExistsException excp) {
- LOGGER.error("Error while revoking read access to consumer ["
- + consumerId + "] for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
- "Error while revoking read access to consumer ["
- + consumerId + "] for topic " + topicName+ excp.getMessage());
+ | TopicExistsException excp) {
+ LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking read access to consumer ["
+ + consumerId + "] for topic " + topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
- }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(),
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail()+ excp.getMessage());
+ } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }
+ }
}
public TopicService getTopicService() {
@@ -690,7 +497,4 @@ public class TopicRestService {
this.topicService = topicService;
}
-
-
-
}
diff --git a/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java b/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java
index 5c7170b..e8c69e4 100644
--- a/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java
+++ b/src/main/java/org/onap/dmaap/util/DMaaPAuthFilter.java
@@ -45,6 +45,7 @@ import org.onap.aaf.cadi.filter.CadiFilter;
public class DMaaPAuthFilter extends CadiFilter {
private static final String FORCE_AAF_FLAG = "forceAAF";
+ private static final String USE_CUSTOM_ACLS = "useCustomAcls";
static final String X509_ATTR = "javax.servlet.request.X509Certificate";
static final String AUTH_HEADER = "Authorization";
static final String APP_HEADER = "AppName";
@@ -93,7 +94,11 @@ public class DMaaPAuthFilter extends CadiFilter {
}
boolean isAAFforced() {
- return Boolean.valueOf(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, FORCE_AAF_FLAG));
+ return Boolean.parseBoolean(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, FORCE_AAF_FLAG));
+ }
+
+ public static boolean isUseCustomAcls() {
+ return Boolean.parseBoolean(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, USE_CUSTOM_ACLS));
}
}
diff --git a/src/test/resources/MsgRtrApi.properties b/src/test/resources/MsgRtrApi.properties
index 3c2e346..ae97d9d 100644
--- a/src/test/resources/MsgRtrApi.properties
+++ b/src/test/resources/MsgRtrApi.properties
@@ -151,6 +151,7 @@ msgRtr.namespace.aaf=org.onap.dmaap.mr.topic
msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
enforced.topic.name.AAF=org.onap.dmaap.mr
forceAAF=false
+useCustomAcls=false
transidUEBtopicreqd=false
defaultNSforUEB=org.onap.dmaap.mr
##############################################################################
diff --git a/version.properties b/version.properties
index 4b00b94..b7a70d5 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
major=1
minor=4
-patch=0
+patch=1
base_version=${major}.${minor}.${patch}