summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/nsa/dmaap/service/EventsRestService.java')
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/EventsRestService.java207
1 files changed, 141 insertions, 66 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
index 6fbfd01..40468a3 100644
--- a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
+++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
@@ -8,14 +8,14 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
@@ -42,25 +42,24 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
-import com.att.nsa.cambria.beans.DMaaPContext;
-import com.att.nsa.cambria.exception.DMaaPErrorMessages;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
-import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
-import com.att.nsa.cambria.service.EventsService;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.cambria.utils.Utils;
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.backends.ConsumerFactory.UnavailableException;
+import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.dmf.mr.exception.DMaaPErrorMessages;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
+import com.att.dmf.mr.service.EventsService;
+import com.att.dmf.mr.utils.ConfigurationReader;
+import com.att.dmf.mr.utils.Utils;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
-
+import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
/**
- * This class is a CXF REST service which acts as gateway for MR Event Service.
- *
- * @author author
+ * This class is a CXF REST service which acts
+ * as gateway for MR Event Service.
+ * @author rajashree.khare
*
*/
@Component
@@ -70,8 +69,7 @@ public class EventsRestService {
/**
* Logger obj
*/
- // private Logger log =
- // Logger.getLogger(EventsRestService.class.toString());
+ //private Logger log = Logger.getLogger(EventsRestService.class.toString());
private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
/**
* HttpServletRequest obj
@@ -85,6 +83,7 @@ public class EventsRestService {
@Context
private HttpServletResponse response;
+
/**
* Config Reader
*/
@@ -98,8 +97,6 @@ public class EventsRestService {
@Autowired
private DMaaPErrorMessages errorMessages;
- private DMaaPContext dmaapContext = new DMaaPContext();
-
/**
* This method is used to consume messages.Taking three parameter
* topic,consumerGroup and consumerId .Consumer decide to which topic they
@@ -121,49 +118,120 @@ public class EventsRestService {
*/
@GET
@Path("/{topic}/{consumergroup}/{consumerid}")
- public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup,
+ public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
+ String consumergroup,
@PathParam("consumerid") String consumerid) throws CambriaApiException {
// log.info("Consuming message from topic " + topic );
- dmaapContext = getDmaapContext();
- dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+ DMaaPContext dMaaPContext = getDmaapContext();
+ dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
try {
- eventsService.getEvents(dmaapContext, topic, consumergroup, consumerid);
- } catch (TopicExistsException e) {
+ eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
+ }
+ catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
- errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, null, null, consumerid, request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
+ consumerid,
+ request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ }
+ catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
- errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, null, null, consumerid, request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
+ consumerid,
+ request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
+
catch (ConfigDbException | UnavailableException | IOException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
+ consumerid,
+ request.getRemoteHost());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+
+ /**
+ * This method is used to throw an exception back to the client app if CG/CID is not passed
+ * while consuming messages
+ */
+ @GET
+ @Path("/{topic}")
+ public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
+ // log.info("Consuming message from topic " + topic );
+ DMaaPContext dMaaPContext = getDmaapContext();
+ dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+
+ try {
+
+ throw new TopicExistsException("Incorrect URL");
+ }
+ catch (TopicExistsException e) {
+ log.error("Error while reading data from topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
+ );
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+
+ }
+
+ /**
+ * This method is used to throw an exception back to the client app if CG/CID is not passed
+ * while consuming messages
+ */
+ @GET
+ @Path("/{topic}/{consumergroup}")
+ public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
+ String consumergroup
+ ) throws CambriaApiException {
+ // log.info("Consuming message from topic " + topic );
+ DMaaPContext dMaaPContext = getDmaapContext();
+ dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+
+ try {
+
+ throw new TopicExistsException("Incorrect URL");
+ }
+ catch (TopicExistsException e) {
+ log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
- errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, null, null, consumerid, request.getRemoteHost());
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
+ );
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
+
}
+
+
+
+
+
+
/**
* This method is used to publish messages.Taking two parameter topic and
@@ -189,33 +257,36 @@ public class EventsRestService {
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
- } catch (TopicExistsException e) {
+ }
+ catch ( TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
- errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ }
+ catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
- errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
- errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
@@ -244,37 +315,40 @@ public class EventsRestService {
// );
try {
- eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey,
+ eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
+ partitionKey,
Utils.getFormattedDate(new Date()));
- }
-
- catch (TopicExistsException e) {
+ }
+
+ catch ( TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
- errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ }
+ catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
- errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
- topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic : " + topic, e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + e.getMessage(), null,
- Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), request.getRemoteHost(),
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
+ + errorMessages.getPublishMsgError() + e.getMessage(), null,
+ Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
+ request.getRemoteHost(),
null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
@@ -293,6 +367,7 @@ public class EventsRestService {
*/
private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
dmaapContext.setRequest(request);
dmaapContext.setResponse(response);
dmaapContext.setConfigReader(configReader);