summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/nsa/dmaap/service/EventsRestService.java')
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/EventsRestService.java116
1 files changed, 53 insertions, 63 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
index cda431c..6fbfd01 100644
--- a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
+++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
@@ -56,9 +56,10 @@ import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+
/**
- * This class is a CXF REST service which acts
- * as gateway for MR Event Service.
+ * This class is a CXF REST service which acts as gateway for MR Event Service.
+ *
* @author author
*
*/
@@ -69,7 +70,8 @@ public class EventsRestService {
/**
* Logger obj
*/
- //private Logger log = Logger.getLogger(EventsRestService.class.toString());
+ // private Logger log =
+ // Logger.getLogger(EventsRestService.class.toString());
private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
/**
* HttpServletRequest obj
@@ -83,7 +85,6 @@ public class EventsRestService {
@Context
private HttpServletResponse response;
-
/**
* Config Reader
*/
@@ -97,6 +98,8 @@ public class EventsRestService {
@Autowired
private DMaaPErrorMessages errorMessages;
+ private DMaaPContext dmaapContext = new DMaaPContext();
+
/**
* This method is used to consume messages.Taking three parameter
* topic,consumerGroup and consumerId .Consumer decide to which topic they
@@ -118,50 +121,44 @@ public class EventsRestService {
*/
@GET
@Path("/{topic}/{consumergroup}/{consumerid}")
- public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
- String consumergroup,
+ public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup,
@PathParam("consumerid") String consumerid) throws CambriaApiException {
// log.info("Consuming message from topic " + topic );
- DMaaPContext dMaaPContext = getDmaapContext();
- dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+ dmaapContext = getDmaapContext();
+ dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
try {
- eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
- }
- catch (TopicExistsException e) {
+ eventsService.getEvents(dmaapContext, topic, consumergroup, consumerid);
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
- consumerid,
- request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
+ errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, null, null, consumerid, request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
- consumerid,
- request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
+ errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, null, null, consumerid, request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
+
catch (ConfigDbException | UnavailableException | IOException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
-
+
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
- consumerid,
- request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
+ errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, null, null, consumerid, request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
@@ -192,36 +189,33 @@ public class EventsRestService {
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
- }
- catch ( TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
@@ -250,40 +244,37 @@ public class EventsRestService {
// );
try {
- eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
- partitionKey,
+ eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey,
Utils.getFormattedDate(new Date()));
- }
-
- catch ( TopicExistsException e) {
+ }
+
+ catch (TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
- + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
+ topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic : " + topic, e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
- + errorMessages.getPublishMsgError() + e.getMessage(), null,
- Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
- request.getRemoteHost(),
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
+ "Transaction-" + errorMessages.getPublishMsgError() + e.getMessage(), null,
+ Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), request.getRemoteHost(),
null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
@@ -302,7 +293,6 @@ public class EventsRestService {
*/
private DMaaPContext getDmaapContext() {
- DMaaPContext dmaapContext = new DMaaPContext();
dmaapContext.setRequest(request);
dmaapContext.setResponse(response);
dmaapContext.setConfigReader(configReader);