diff options
Diffstat (limited to 'src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java')
-rw-r--r-- | src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java | 569 |
1 files changed, 332 insertions, 237 deletions
diff --git a/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java index 54512e7..173b4d0 100644 --- a/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java +++ b/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java @@ -1,243 +1,338 @@ -/*-
+/*- * ============LICENSE_START======================================================= * PROJECT * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.dcae.restapi.endpoints;
-
-
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.openecomp.dcae.commonFunction.CommonStartup;
-import org.openecomp.dcae.commonFunction.CustomExceptionLoader;
-import org.openecomp.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-import com.google.gson.JsonParser;
-
-
-public class EventReceipt extends NsaBaseEndpoint {
- static String valresult = null;
- static JSONObject customerror = null;
-
- public static void receiveSingleEvent(DrumlinRequestContext ctx) throws IOException {
-
- NsaSimpleApiKey retkey = null;
- JSONObject jsonObject = null;
- //String br = new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine();
-
- try {
-
-
- //JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine());
- jsonObject = new JSONObject ( new JSONTokener ( ctx.request ().getBodyStream () ) );
-
- CommonStartup.inlog.info("Input Messsage: " + jsonObject);
- log.info("Input Messsage: " + jsonObject);
-
-
- try {
-
- if (CommonStartup.authflag == 1) {
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
-
- log.info("Invalid user request " + ctx.request().getContentType() + " Message:" + jsonObject.toString());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x.toString() );
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
- return;
-
- }
- if (retkey != null || CommonStartup.authflag == 0) {
-
- if (CommonStartup.schema_Validatorflag > 0) {
- String schema = new JsonParser().parse(new FileReader(CommonStartup.schemaFile)).toString();
-
- valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
- if (valresult.equals("true")) {
- log.info("Validation successful");
- } else if (valresult.equals("false")) {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
- return;
- } else {
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,"Couldn't parse JSON object");
- return;
- }
- }
-
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info("Rejecting request with content type " + ctx.request().getContentType() + " Message:"
- + jsonObject);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Incorrect message content-type; only accepts application/json messages");
- return;
- }
- final JSONArray jsonArray = new JSONArray().put(jsonObject);
-
- CommonStartup.handleEvents(jsonArray);
- } else {
- log.info("Unauthorized request " + ctx.request().getContentType() + " Message:" + jsonObject.toString());
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user");
- return;
- }
-
- } catch (JSONException | NullPointerException | IOException x) {
- log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest
- + " Message:" + x.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString() );
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
- return;
- } catch (QueueFullException e) {
- e.printStackTrace();
- log.error("Collector internal queue full :" + e.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString() );
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
- return;
- }
- log.info("MessageAccepted and k200_ok to be sent");
- ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
- public static void receiveMultipleEvents(DrumlinRequestContext ctx) throws IOException {
- // the request body carries events. assume for now it's an array
- // of json objects that fits in memory. (See cambria's parsing for handling large messages)
-
- NsaSimpleApiKey retkey = null;
-
- JSONArray jsonArray = null;
-
- try {
-
- //String br = new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine();
- //JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine());
- jsonArray = new JSONArray ( new JSONTokener ( ctx.request ().getBodyStream () ) );
-
- CommonStartup.inlog.info("Input Messsage: " + jsonArray);
- log.info("Input Messsage: " + jsonArray);
-
- try {
- if (CommonStartup.authflag == 1) {
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
- log.info("Invalid user request " + ctx.request().getContentType() + " Message:" + jsonArray.toString());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x.toString() );
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
- return;
- }
-
- if (retkey != null || CommonStartup.authflag == 0) {
- if (CommonStartup.schema_Validatorflag > 0) {
-
- String schema = new JsonParser().parse(new FileReader(CommonStartup.schemaFile)).toString();
-
- for (int i = 0; i < jsonArray.length(); i++) {
- valresult = CommonStartup.schemavalidate(jsonArray.getJSONObject(i).toString(), schema);
- if (valresult.equals("false")) {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,"Standard schema validation failed");
- return;
- } else if (!valresult.equals("true")) {
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,"Couldn't parse JSON object");
- return;
-
- }
- }
- log.info("Validation successful for all events in batch");
-
- }
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info("Rejecting request with content type " + ctx.request().getContentType() + " Message:"
- + jsonArray.toString());
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Incorrect message content-type; only accepts application/json messages");
- return;
- }
-
- CommonStartup.handleEvents(jsonArray);
- } else {
- log.info("Unauthorized request " + ctx.request().getContentType() + " Message:" + jsonArray.toString());
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized request");
- return;
- }
- } catch (JSONException | NullPointerException | IOException x) {
- log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest
- + " Message:" + x.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString() );
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
- return;
- } catch (QueueFullException e) {
- e.printStackTrace();
- log.error("Collector internal HP queue full :" + e.getMessage() );
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString() );
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
- return;
- }
-
- ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
- public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {
- String[] str = null;
- String ExceptionType = "GeneralException";
-
- str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);
- System.out.println("Post CustomExceptionLoader.LookupMap" + str);
-
- if (str != null) {
-
- if (str[0].matches("SVC")) {
- ExceptionType = "ServiceException";
- } else if (str[1].matches("POL")) {
- ExceptionType = "PolicyException";
- }
-
- JSONObject jb = new JSONObject().put("requestError",
- new JSONObject().put(ExceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1])));
-
- log.debug("Constructed json error : " + jb.toString());
- ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
- } else {
- JSONObject jb = new JSONObject().put("requestError",
- new JSONObject().put(ExceptionType, new JSONObject().put("Status", sc).put("Error", msg)));
- ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
- }
-
- }
-
- private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
-
-}
+ * ============LICENSE_END========================================================= + */ + +package org.openecomp.dcae.restapi.endpoints; + +import java.io.FileReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.util.UUID; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.openecomp.dcae.commonFunction.CommonStartup; +import org.openecomp.dcae.commonFunction.CustomExceptionLoader; + +import org.openecomp.dcae.commonFunction.VESLogger; +import org.openecomp.dcae.commonFunction.CommonStartup.QueueFullException; + +import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; +import com.att.nsa.clock.SaClock; +import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.logging.LoggingContext; + +import com.att.nsa.logging.log4j.EcompFields; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; + +import com.google.gson.JsonParser; + +public class EventReceipt extends NsaBaseEndpoint { + static String valresult = null; + static JSONObject customerror = null; + + private static final Logger log = LoggerFactory.getLogger(EventReceipt.class); + + public static void receiveSingleEvent(DrumlinRequestContext ctx) throws IOException { + + + NsaSimpleApiKey retkey = null; + JSONObject jsonObject = null; + FileReader fr = null; + InputStream istr = null; + // String br = new BufferedReader(new + // InputStreamReader(ctx.request().getBodyStream())).readLine(); + + + final UUID uuid = java.util.UUID.randomUUID(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); + + log.debug ("Request recieved :" + ctx.request().getRemoteAddress()); + + try { + + // JsonElement msg = new JsonParser().parse(new BufferedReader(new + // InputStreamReader(ctx.request().getBodyStream())).readLine()); + + istr = ctx.request().getBodyStream(); + jsonObject = new JSONObject(new JSONTokener(istr)); + + CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); + log.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); + + try { + + if (CommonStartup.authflag == 1) { + retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx); + } + } catch (NullPointerException x) { + + log.info( + "Invalid user request " + ctx.request().getContentType() + " Message:" + jsonObject.toString()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user"); + return; + + } + if (retkey != null || CommonStartup.authflag == 0) { + + if (CommonStartup.schema_Validatorflag > 0) { + + fr = new FileReader(CommonStartup.schemaFile); + String schema = new JsonParser().parse(fr).toString(); + + valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema); + if (valresult.equals("true")) { + log.info("Validation successful"); + } else if (valresult.equals("false")) { + log.info("Validation failed"); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); + return; + } else { + log.error("Validation errored" + valresult); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); + return; + } + } + + // reject anything that's not JSON + if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) { + log.info("Rejecting request with content type " + ctx.request().getContentType() + " Message:" + + jsonObject); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, + "Incorrect message content-type; only accepts application/json messages"); + return; + } + jsonObject.put("VESuniqueId", uuid); + final JSONArray jsonArray = new JSONArray().put(jsonObject); + + CommonStartup.handleEvents(jsonArray); + } else { + log.info( + "Unauthorized request " + ctx.request().getContentType() + " Message:" + jsonObject.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user"); + return; + } + + } catch (JSONException | NullPointerException | IOException x) { + log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest + + " Message:" + x.getMessage()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); + return; + } catch (QueueFullException e) { + e.printStackTrace(); + log.error("Collector internal queue full :" + e.getMessage()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full"); + return; + } finally { + if (fr != null) { + safeClose(fr); + } + if (istr != null) { + safeClose(istr); + } + } + log.info("MessageAccepted and k200_ok to be sent"); + ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson); + } + + public static void receiveMultipleEvents(DrumlinRequestContext ctx) throws IOException { + // the request body carries events. assume for now it's an array + // of json objects that fits in memory. (See cambria's parsing for + // handling large messages) + + NsaSimpleApiKey retkey = null; + + JSONArray jsonArray = null; + JSONArray jsonArrayMod = new JSONArray(); + JSONObject event = null; + JSONObject jsonObject = null; + FileReader fr = null; + InputStream istr = null; + + try { + + // String br = new BufferedReader(new + // InputStreamReader(ctx.request().getBodyStream())).readLine(); + // JsonElement msg = new JsonParser().parse(new BufferedReader(new + // InputStreamReader(ctx.request().getBodyStream())).readLine()); + // jsonArray = new JSONArray ( new JSONTokener ( ctx.request + // ().getBodyStream () ) ); + + final UUID uuid = java.util.UUID.randomUUID(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); + + + log.debug ("Request recieved :" + ctx.request().getRemoteAddress()); + + istr = ctx.request().getBodyStream(); + jsonObject = new JSONObject(new JSONTokener(istr)); + // jsonObject = new JSONObject ( new JSONTokener ( ctx.request + // ().getBodyStream () ) ); + + CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); + log.info("Input Messsage: " + jsonObject); + + try { + if (CommonStartup.authflag == 1) { + retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx); + } + } catch (NullPointerException x) { + log.info( + "Invalid user request " + ctx.request().getContentType() + " Message:" + jsonObject.toString()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user"); + return; + } + + if (retkey != null || CommonStartup.authflag == 0) { + if (CommonStartup.schema_Validatorflag > 0) { + + fr = new FileReader(CommonStartup.schemaFile); + String schema = new JsonParser().parse(fr).toString(); + + valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema); + if (valresult.equals("true")) { + log.info("Validation successful"); + } else if (valresult.equals("false")) { + log.info("Validation failed"); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); + + return; + } else { + log.error("Validation errored" + valresult); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); + return; + + } + jsonArray = jsonObject.getJSONArray("eventList"); + log.info("Validation successful for all events in batch"); + for (int i = 0; i < jsonArray.length(); i++) { + event = new JSONObject().put("event", jsonArray.getJSONObject(i)); + event.put("VESuniqueId", uuid + "-"+i); + jsonArrayMod.put(event); + } + + log.info("Modified jsonarray:" + jsonArrayMod.toString()); + + } + // reject anything that's not JSON + if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) { + log.info("Rejecting request with content type " + ctx.request().getContentType() + " Message:" + + jsonObject); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, + "Incorrect message content-type; only accepts application/json messages"); + return; + } + + CommonStartup.handleEvents(jsonArrayMod); + } else { + log.info( + "Unauthorized request " + ctx.request().getContentType() + " Message:" + jsonObject.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user"); + return; + } + } catch (JSONException | NullPointerException | IOException x) { + log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest + + " Message:" + x.getMessage()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); + return; + } catch (QueueFullException e) { + e.printStackTrace(); + log.error("Collector internal queue full :" + e.getMessage()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full"); + return; + } finally { + if (fr != null) { + safeClose(fr); + } + + if (istr != null) { + safeClose(istr); + } + } + log.info("MessageAccepted and k200_ok to be sent"); + ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson); + } + + public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) { + String[] str = null; + String ExceptionType = "GeneralException"; + + str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg); + System.out.println("Post CustomExceptionLoader.LookupMap" + str); + + if (str != null) { + + if (str[0].matches("SVC")) { + ExceptionType = "ServiceException"; + } else if (str[1].matches("POL")) { + ExceptionType = "PolicyException"; + } + + JSONObject jb = new JSONObject().put("requestError", + new JSONObject().put(ExceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1]))); + + log.debug("Constructed json error : " + jb.toString()); + ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson); + } else { + JSONObject jb = new JSONObject().put("requestError", + new JSONObject().put(ExceptionType, new JSONObject().put("Status", sc).put("Error", msg))); + ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson); + } + + } + + public static void safeClose(FileReader fr) { + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + log.error("Error closing file reader stream : " + e.toString()); + } + } + + } + + public static void safeClose(InputStream is) { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + log.error("Error closing Input stream : " + e.toString()); + } + } + + } + + + +} |