diff options
Diffstat (limited to 'src/main/java/com/att/nsa/dmaap/service/EventsRestService.java')
-rw-r--r-- | src/main/java/com/att/nsa/dmaap/service/EventsRestService.java | 116 |
1 files changed, 53 insertions, 63 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java index cda431c..6fbfd01 100644 --- a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java +++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java @@ -56,9 +56,10 @@ import com.att.nsa.configs.ConfigDbException; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; + /** - * This class is a CXF REST service which acts - * as gateway for MR Event Service. + * This class is a CXF REST service which acts as gateway for MR Event Service. + * * @author author * */ @@ -69,7 +70,8 @@ public class EventsRestService { /** * Logger obj */ - //private Logger log = Logger.getLogger(EventsRestService.class.toString()); + // private Logger log = + // Logger.getLogger(EventsRestService.class.toString()); private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class); /** * HttpServletRequest obj @@ -83,7 +85,6 @@ public class EventsRestService { @Context private HttpServletResponse response; - /** * Config Reader */ @@ -97,6 +98,8 @@ public class EventsRestService { @Autowired private DMaaPErrorMessages errorMessages; + private DMaaPContext dmaapContext = new DMaaPContext(); + /** * This method is used to consume messages.Taking three parameter * topic,consumerGroup and consumerId .Consumer decide to which topic they @@ -118,50 +121,44 @@ public class EventsRestService { */ @GET @Path("/{topic}/{consumergroup}/{consumerid}") - public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") - String consumergroup, + public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup, @PathParam("consumerid") String consumerid) throws CambriaApiException { // log.info("Consuming message from topic " + topic ); - DMaaPContext dMaaPContext = getDmaapContext(); - dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); + dmaapContext = getDmaapContext(); + dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); try { - eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid); - } - catch (TopicExistsException e) { + eventsService.getEvents(dmaapContext, topic, consumergroup, consumerid); + } catch (TopicExistsException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, - consumerid, - request.getRemoteHost()); + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), + errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, null, null, consumerid, request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - catch (DMaaPAccessDeniedException | AccessDeniedException e) { + } catch (DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, - consumerid, - request.getRemoteHost()); + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), + errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, null, null, consumerid, request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); } - + catch (ConfigDbException | UnavailableException | IOException e) { log.error("Error while reading data from topic [" + topic + "].", e); - + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, - consumerid, - request.getRemoteHost()); + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), + errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, null, null, consumerid, request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); @@ -192,36 +189,33 @@ public class EventsRestService { try { eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); - } - catch ( TopicExistsException e) { + } catch (TopicExistsException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), + errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - catch ( DMaaPAccessDeniedException | AccessDeniedException e) { + } catch (DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), + errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); } - - - catch (ConfigDbException | IOException | missingReqdSetting e) { + + catch (ConfigDbException | IOException | missingReqdSetting e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), + errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); } @@ -250,40 +244,37 @@ public class EventsRestService { // ); try { - eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), - partitionKey, + eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey, Utils.getFormattedDate(new Date())); - } - - catch ( TopicExistsException e) { + } + + catch (TopicExistsException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), + errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } - catch ( DMaaPAccessDeniedException| AccessDeniedException e) { + } catch (DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() - + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), + errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), + topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); } - - catch (ConfigDbException | IOException | missingReqdSetting e) { + + catch (ConfigDbException | IOException | missingReqdSetting e) { log.error("Error while publishing to topic : " + topic, e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-" - + errorMessages.getPublishMsgError() + e.getMessage(), null, - Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), - request.getRemoteHost(), + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + e.getMessage(), null, + Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); @@ -302,7 +293,6 @@ public class EventsRestService { */ private DMaaPContext getDmaapContext() { - DMaaPContext dmaapContext = new DMaaPContext(); dmaapContext.setRequest(request); dmaapContext.setResponse(response); dmaapContext.setConfigReader(configReader); |