diff options
Diffstat (limited to 'src/main/java/com/att/nsa/dmaap/service/EventsRestService.java')
-rw-r--r-- | src/main/java/com/att/nsa/dmaap/service/EventsRestService.java | 207 |
1 files changed, 141 insertions, 66 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java index 6fbfd01..40468a3 100644 --- a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java +++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java @@ -8,14 +8,14 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ @@ -42,25 +42,24 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; -import com.att.nsa.cambria.beans.DMaaPContext; -import com.att.nsa.cambria.exception.DMaaPErrorMessages; -import com.att.nsa.cambria.exception.DMaaPResponseCode; -import com.att.nsa.cambria.exception.ErrorResponse; -import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; -import com.att.nsa.cambria.service.EventsService; -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.cambria.utils.Utils; +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.backends.ConsumerFactory.UnavailableException; +import com.att.dmf.mr.beans.DMaaPContext; +import com.att.dmf.mr.exception.DMaaPErrorMessages; +import com.att.dmf.mr.exception.DMaaPResponseCode; +import com.att.dmf.mr.exception.ErrorResponse; +import com.att.dmf.mr.metabroker.Broker.TopicExistsException; +import com.att.dmf.mr.service.EventsService; +import com.att.dmf.mr.utils.ConfigurationReader; +import com.att.dmf.mr.utils.Utils; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; - +import com.att.dmf.mr.exception.DMaaPAccessDeniedException; /** - * This class is a CXF REST service which acts as gateway for MR Event Service. - * - * @author author + * This class is a CXF REST service which acts + * as gateway for MR Event Service. + * @author rajashree.khare * */ @Component @@ -70,8 +69,7 @@ public class EventsRestService { /** * Logger obj */ - // private Logger log = - // Logger.getLogger(EventsRestService.class.toString()); + //private Logger log = Logger.getLogger(EventsRestService.class.toString()); private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class); /** * HttpServletRequest obj @@ -85,6 +83,7 @@ public class EventsRestService { @Context private HttpServletResponse response; + /** * Config Reader */ @@ -98,8 +97,6 @@ public class EventsRestService { @Autowired private DMaaPErrorMessages errorMessages; - private DMaaPContext dmaapContext = new DMaaPContext(); - /** * This method is used to consume messages.Taking three parameter * topic,consumerGroup and consumerId .Consumer decide to which topic they @@ -121,49 +118,120 @@ public class EventsRestService { */ @GET @Path("/{topic}/{consumergroup}/{consumerid}") - public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup, + public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") + String consumergroup, @PathParam("consumerid") String consumerid) throws CambriaApiException { // log.info("Consuming message from topic " + topic ); - dmaapContext = getDmaapContext(); - dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); + DMaaPContext dMaaPContext = getDmaapContext(); + dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); try { - eventsService.getEvents(dmaapContext, topic, consumergroup, consumerid); - } catch (TopicExistsException e) { + eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid); + } + catch (TopicExistsException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), - errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, null, null, consumerid, request.getRemoteHost()); + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, + consumerid, + request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (DMaaPAccessDeniedException | AccessDeniedException e) { + } + catch (DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), - errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, null, null, consumerid, request.getRemoteHost()); + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, + consumerid, + request.getRemoteHost()); log.info(errRes.toString()); throw new CambriaApiException(errRes); } - + catch (ConfigDbException | UnavailableException | IOException e) { log.error("Error while reading data from topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, + consumerid, + request.getRemoteHost()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + + /** + * This method is used to throw an exception back to the client app if CG/CID is not passed + * while consuming messages + */ + @GET + @Path("/{topic}") + public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException { + // log.info("Consuming message from topic " + topic ); + DMaaPContext dMaaPContext = getDmaapContext(); + dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); + + try { + + throw new TopicExistsException("Incorrect URL"); + } + catch (TopicExistsException e) { + log.error("Error while reading data from topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost() + ); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + + } + + /** + * This method is used to throw an exception back to the client app if CG/CID is not passed + * while consuming messages + */ + @GET + @Path("/{topic}/{consumergroup}") + public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") + String consumergroup + ) throws CambriaApiException { + // log.info("Consuming message from topic " + topic ); + DMaaPContext dMaaPContext = getDmaapContext(); + dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); + + try { + + throw new TopicExistsException("Incorrect URL"); + } + catch (TopicExistsException e) { + log.error("Error while reading data from topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), - errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, null, null, consumerid, request.getRemoteHost()); + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost() + ); log.info(errRes.toString()); throw new CambriaApiException(errRes); } + } + + + + + + /** * This method is used to publish messages.Taking two parameter topic and @@ -189,33 +257,36 @@ public class EventsRestService { try { eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); - } catch (TopicExistsException e) { + } + catch ( TopicExistsException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), - errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (DMaaPAccessDeniedException | AccessDeniedException e) { + } + catch ( DMaaPAccessDeniedException | AccessDeniedException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), - errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); } - - catch (ConfigDbException | IOException | missingReqdSetting e) { + + + catch (ConfigDbException | IOException | missingReqdSetting e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), - errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); } @@ -244,37 +315,40 @@ public class EventsRestService { // ); try { - eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey, + eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), + partitionKey, Utils.getFormattedDate(new Date())); - } - - catch (TopicExistsException e) { + } + + catch ( TopicExistsException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), - errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (DMaaPAccessDeniedException | AccessDeniedException e) { + } + catch ( DMaaPAccessDeniedException| AccessDeniedException e) { log.error("Error while publishing to topic [" + topic + "].", e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), - errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()), - topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); } - - catch (ConfigDbException | IOException | missingReqdSetting e) { + + catch (ConfigDbException | IOException | missingReqdSetting e) { log.error("Error while publishing to topic : " + topic, e); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), - "Transaction-" + errorMessages.getPublishMsgError() + e.getMessage(), null, - Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), request.getRemoteHost(), + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-" + + errorMessages.getPublishMsgError() + e.getMessage(), null, + Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), + request.getRemoteHost(), null, null); log.info(errRes.toString()); throw new CambriaApiException(errRes); @@ -293,6 +367,7 @@ public class EventsRestService { */ private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); dmaapContext.setRequest(request); dmaapContext.setResponse(response); dmaapContext.setConfigReader(configReader); |