summaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
authorvagrant <vv770d@att.com>2017-11-28 13:36:38 +0000
committervagrant <vv770d@att.com>2017-11-28 13:37:32 +0000
commitc4098a3af4794da9a1516760baa2318fb91ae258 (patch)
treecf42b53da99678e72fe71b66850c3ed7bad3eea8 /src/main/java/org
parent8a7c0d6da77b78995d37f7e5f8f08f2c9886b086 (diff)
code syncup with ECOMP updates
Issue-ID: DCAEGEN2-212 Change-Id: Id388f26be57d2a12250a6845ed5678d0cebceed6 Signed-off-by: Vijay Venkatesh Kumar<vv770d@att.com>
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java9
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java3
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java44
-rw-r--r--src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java2
-rw-r--r--src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java5
-rw-r--r--src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java6
-rw-r--r--src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java451
7 files changed, 258 insertions, 262 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
index b974ed53..fe3ba325 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
@@ -194,11 +194,14 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable {
executor = Executors.newFixedThreadPool(20);
executor.execute(ep);
- } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
- | InterruptedException e) {
+ } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException | InterruptedException e) {
CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
throw new RuntimeException(e);
- } finally {
+ } catch (Throwable e) {
+ System.err.println("Uncaught exception - " + e.getMessage());
+ CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage() );
+ e.printStackTrace(System.err);
+ } finally {
// This will make the executor accept no new threads
// and finish all existing threads in the queue
if (executor != null) {
diff --git a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
index 10a1db47..fbeba2f6 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
@@ -77,9 +77,8 @@ public class CustomExceptionLoader {
}
log.debug("CustomExceptionLoader.LoadMap --> Map loaded - " + map);
- } catch (JsonIOException|JsonSyntaxException|FileNotFoundException e) {
+ } catch (JsonIOException|JsonSyntaxException|FileNotFoundException e) {
log.error("Exception in LoadMap:" + e.getMessage());
- //e.printStackTrace();
map = null;
}
finally {
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index 2bc5e45b..05e5f0ba 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -30,6 +30,7 @@ import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.FileReader;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
@@ -117,20 +118,6 @@ public class EventProcessor implements Runnable {
final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
- /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
- JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
- JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
- commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/
-
-
-/* "event": {
- "commonEventHeader": {
- "internalHeaderFields": {
- "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT"
- },
-*/
-
- //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
JSONObject collectorTimeStamp = new JSONObject()
.put("collectorTimeStamp", sdf.format(currentTime));
JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL)
@@ -141,9 +128,10 @@ public class EventProcessor implements Runnable {
if (CommonStartup.eventTransformFlag == 1) {
// read the mapping json file
final JsonParser parser = new JsonParser();
+ FileReader fr = null;
try {
- final JsonArray jo = (JsonArray) parser
- .parse(new FileReader("./etc/eventTransform.json"));
+ fr = new FileReader ( "./etc/eventTransform.json" );
+ final JsonArray jo = (JsonArray) parser.parse(fr);
log.info("parse eventTransform.json");
// now convert to org.json
final String jsonText = jo.toString();
@@ -183,11 +171,25 @@ public class EventProcessor implements Runnable {
} catch (Exception e) {
- log.error("EventProcessor Exception" + e.getMessage() + e);
- log.error("EventProcessor Exception" + e.getCause());
+ log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause() );
}
- }
- log.debug("Modified event:" + event);
+ finally {
+ //close the file
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ log.error("Error closing file reader stream : " + e.toString());
+ }
+
+ }
+ }
+ }
+ //Remove VESversion from event. This field is for internal use and must be removed after use.
+ if (event.has("VESversion"))
+ event.remove("VESversion");
- }
+ log.debug("Modified event:" + event);
+
+ }
}
diff --git a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
index 5ef44f5c..655b9755 100644
--- a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
@@ -80,6 +80,7 @@ public class FetchDynamicConfig {
log.error(
"Error in writing configuration into file /opt/app/KV-Configuration.json "
+ jsonObject, e);
+ e.printStackTrace();
}
} else {
log.info(">>>Static configuration to be used");
@@ -110,6 +111,7 @@ public class FetchDynamicConfig {
ipr.close();
} catch (IOException e) {
log.error("error", e);
+ e.printStackTrace();
}
return result;
diff --git a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
index a28bca86..1f77751d 100644
--- a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
@@ -50,9 +50,6 @@ public class LoadDynamicConfig {
public static void main(String[] args) {
Map<String, String> env = System.getenv();
- /*for (String envName : env.keySet()) {
- System.out.format("%s=%s%n", envName, env.get(envName));
- }*/
//Check again to ensure new controller deployment related config
if (env.containsKey("CONSUL_HOST") &&
@@ -95,6 +92,7 @@ public class LoadDynamicConfig {
} catch (ConfigurationException e) {
log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
}
@@ -117,6 +115,7 @@ public class LoadDynamicConfig {
br.close();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
}
return result;
}
diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
index a8bfc24e..c18901dc 100644
--- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
+++ b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
@@ -57,12 +57,6 @@ public class RestfulCollectorServlet extends CommonServlet
public RestfulCollectorServlet ( rrNvReadable settings ) throws loadException, missingReqdSetting
{
super ( settings, "collector", false );
- //authid = settings.getString(CommonStartup.kSetting_authid,null);
- /*if (authid != null)
- {
- String authpwdtemp = settings.getString(CommonStartup.kSetting_authpwd,null);
- authpwd = new String(Base64.decodeBase64(authpwdtemp));
- }*/
authlist = settings.getString(CommonStartup.kSetting_authlist,null);
}
diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
index e6b7d20c..5ee3b792 100644
--- a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
+++ b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -49,230 +49,227 @@ import java.util.regex.Pattern;
public class EventReceipt extends NsaBaseEndpoint {
- private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
- private static final String MESSAGE = " Message:";
- static String valresult;
- static JSONObject customerror;
-
-
- public static void receiveVESEvent(DrumlinRequestContext ctx) {
- // the request body carries events. assume for now it's an array
- // of json objects that fits in memory. (See cambria's parsing for
- // handling large messages)
-
- NsaSimpleApiKey retkey = null;
-
- JSONArray jsonArray;
- JSONArray jsonArrayMod = new JSONArray();
- JSONObject event;
- JSONObject jsonObject;
- FileReader fr = null;
- InputStream istr = null;
- int arrayFlag = 0;
- String vesVersion = null;
-
- try {
- //System.out.print("Version string:" + version);
-
- // String br = new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine();
- // JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine());
- // jsonArray = new JSONArray ( new JSONTokener ( ctx.request().getBodyStream () ) );
-
- log.debug("Request recieved :" + ctx.request().getRemoteAddress());
- istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
-
- log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
- Pattern p = Pattern.compile("(v\\d+)");
- Matcher m = p.matcher(ctx.request().getPathInContext());
-
- if (m.find()) {
- log.info("VES version:" + m.group());
- vesVersion = m.group();
- }
- if (ctx.request().getPathInContext().contains("eventBatch")) {
- CommonStartup.inlog.info(
- ctx.request().getRemoteAddress() + "VES Batch Input Messsage: " + jsonObject);
- log.info(
- ctx.request().getRemoteAddress() + "VES Batch Input Messsage: " + jsonObject);
- arrayFlag = 1;
- } else {
- CommonStartup.inlog
- .info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
-
- }
-
- UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-
- try {
- if (CommonStartup.authflag == 1) {
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
- log.info(
- "Invalid user request " + ctx.request().getContentType() + MESSAGE
- + jsonObject);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
- return;
- }
-
- if (retkey != null || CommonStartup.authflag == 0) {
- if (CommonStartup.schema_Validatorflag > 0) {
-
- //fr = new FileReader(CommonStartup.schemaFile);
- fr = new FileReader(schemaFileVersion(vesVersion));
- String schema = new JsonParser().parse(fr).toString();
-
- valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
- if ("true".equals(valresult)) {
- log.info("Validation successful");
- } else if ("false".equals(valresult)) {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Schema validation failed");
-
- return;
- } else {
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Couldn't parse JSON object");
- return;
-
- }
-
- if (arrayFlag == 1) {
- jsonArray = jsonObject.getJSONArray("eventList");
- log.info("Validation successful for all events in batch");
- for (int i = 0; i < jsonArray.length(); i++) {
- event = new JSONObject().put("event", jsonArray.getJSONObject(i));
- event.put("VESuniqueId", uuid + "-" + i);
- event.put("VESversion", vesVersion);
- jsonArrayMod.put(event);
- }
-
- log.info("Modified jsonarray:" + jsonArrayMod);
-
- } else {
-
- jsonObject.put("VESuniqueId", uuid);
- jsonObject.put("VESversion", vesVersion);
- jsonArrayMod = new JSONArray().put(jsonObject);
- }
-
- }
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info(String.format("Rejecting request with content type %s Message:%s",
- ctx.request().getContentType(), jsonObject));
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Incorrect message content-type; only accepts application/json messages");
- return;
- }
-
- CommonStartup.handleEvents(jsonArrayMod);
- } else {
- log.info(
- String.format("Unauthorized request %s%s%s", ctx.request().getContentType(),
- MESSAGE, jsonObject));
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized,
- "Unauthorized user");
- return;
- }
- } catch (JSONException | NullPointerException | IOException x) {
- log.error(String
- .format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
- HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Couldn't parse JSON object");
- return;
- } catch (QueueFullException e) {
- log.error("Collector internal queue full :" + e.getMessage(), e);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
- return;
- } finally {
- if (fr != null) {
- safeClose(fr);
- }
-
- if (istr != null) {
- safeClose(istr);
- }
- }
- log.info("MessageAccepted and k200_ok to be sent");
- ctx.response()
- .sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
-
- public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {
- String[] str;
- String exceptionType = "GeneralException";
-
- str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);
- log.info("Post CustomExceptionLoader.LookupMap" + str);
-
- if (str != null) {
-
- if (str[0].matches("SVC")) {
- exceptionType = "ServiceException";
- } else if (str[1].matches("POL")) {
- exceptionType = "PolicyException";
- }
-
- JSONObject jb = new JSONObject().put("requestError",
- new JSONObject().put(exceptionType,
- new JSONObject().put("MessagID", str[0]).put("text", str[1])));
-
- log.debug("Constructed json error : " + jb);
- ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
- } else {
- JSONObject jb = new JSONObject().put("requestError",
- new JSONObject()
- .put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg)));
- ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
- }
-
- }
-
- public static void safeClose(FileReader fr) {
- if (fr != null) {
- try {
- fr.close();
- } catch (IOException e) {
- log.error("Error closing file reader stream : " + e);
- }
- }
-
- }
-
- public static void safeClose(InputStream is) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- log.error("Error closing Input stream : " + e);
- }
- }
-
- }
-
- public static String schemaFileVersion(String version) {
- String filename = null;
-
- if (CommonStartup.schemaFileJson.has(version)) {
- filename = CommonStartup.schemaFileJson.getString(version);
- } else {
- filename = CommonStartup.schemaFile;
- }
- log.info(String.format("VESversion: %s Schema File:%s", version, filename));
- return filename;
-
- }
+ private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
+ private static final String MESSAGE = " Message:";
+ static String valresult;
+ static JSONObject customerror;
+
+ public static void receiveVESEvent(DrumlinRequestContext ctx) {
+ // the request body carries events. assume for now it's an array
+ // of json objects that fits in memory. (See cambria's parsing for
+ // handling large messages)
+
+ NsaSimpleApiKey retkey = null;
+
+ JSONArray jsonArray;
+ JSONArray jsonArrayMod = new JSONArray();
+ JSONObject event;
+ JSONObject jsonObject;
+ FileReader fr = null;
+ InputStream istr = null;
+ int arrayFlag = 0;
+ String vesVersion = null;
+
+ try {
+ // System.out.print("Version string:" + version);
+
+ // String br = new BufferedReader(new
+ // InputStreamReader(ctx.request().getBodyStream())).readLine();
+ // JsonElement msg = new JsonParser().parse(new BufferedReader(new
+ // InputStreamReader(ctx.request().getBodyStream())).readLine());
+ // jsonArray = new JSONArray ( new JSONTokener (
+ // ctx.request().getBodyStream () ) );
+
+ log.debug("Request recieved :" + ctx.request().getRemoteAddress());
+ istr = ctx.request().getBodyStream();
+ jsonObject = new JSONObject(new JSONTokener(istr));
+
+ log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
+ Pattern p = Pattern.compile("(v\\d+)");
+ Matcher m = p.matcher(ctx.request().getPathInContext());
+
+ if (m.find()) {
+ log.info("VES version:" + m.group());
+ vesVersion = m.group();
+ m = null;
+ p = null;
+
+ }
+
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ if (ctx.request().getPathInContext().contains("eventBatch")) {
+ CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
+ + " VES Batch Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
+ + jsonObject);
+ arrayFlag = 1;
+ } else {
+ CommonStartup.inlog.info(
+ ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
+
+ }
+
+ try {
+ if (CommonStartup.authflag == 1) {
+ retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
+ }
+ } catch (NullPointerException x) {
+ log.info("Invalid user request " + ctx.request().getContentType() + MESSAGE + jsonObject);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
+ return;
+ }
+
+ if (retkey != null || CommonStartup.authflag == 0) {
+ if (CommonStartup.schema_Validatorflag > 0) {
+ if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
+ || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
+ fr = new FileReader(schemaFileVersion(vesVersion));
+ String schema = new JsonParser().parse(fr).toString();
+
+ valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
+ if (valresult.equals("true")) {
+ log.info("Validation successful");
+ } else if (valresult.equals("false")) {
+ log.info("Validation failed");
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
+ "Schema validation failed");
+ return;
+ } else {
+ log.error("Validation errored" + valresult);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
+ "Couldn't parse JSON object");
+ return;
+ }
+ } else {
+ log.info("Validation failed");
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
+ return;
+ }
+ if (arrayFlag == 1) {
+ jsonArray = jsonObject.getJSONArray("eventList");
+ log.info("Validation successful for all events in batch");
+ for (int i = 0; i < jsonArray.length(); i++) {
+ event = new JSONObject().put("event", jsonArray.getJSONObject(i));
+ event.put("VESuniqueId", uuid + "-" + i);
+ event.put("VESversion", vesVersion);
+ jsonArrayMod.put(event);
+ }
+ log.info("Modified jsonarray:" + jsonArrayMod.toString());
+ } else {
+ jsonObject.put("VESuniqueId", uuid);
+ jsonObject.put("VESversion", vesVersion);
+ jsonArrayMod = new JSONArray().put(jsonObject);
+ }
+ }
+
+ // reject anything that's not JSON
+ if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
+ log.info(String.format("Rejecting request with content type %s Message:%s",
+ ctx.request().getContentType(), jsonObject));
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
+ "Incorrect message content-type; only accepts application/json messages");
+ return;
+ }
+
+ CommonStartup.handleEvents(jsonArrayMod);
+ } else {
+ log.info(String.format("Unauthorized request %s%s%s", ctx.request().getContentType(), MESSAGE,
+ jsonObject));
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user");
+ return;
+ }
+ } catch (JSONException | NullPointerException | IOException x) {
+ log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
+ HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
+ return;
+ } catch (QueueFullException e) {
+ log.error("Collector internal queue full :" + e.getMessage(), e);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
+ return;
+ } finally {
+ if (fr != null) {
+ safeClose(fr);
+ }
+
+ if (istr != null) {
+ safeClose(istr);
+ }
+ }
+ log.info("MessageAccepted and k200_ok to be sent");
+ ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
+ }
+
+ public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {
+ String[] str;
+ String exceptionType = "GeneralException";
+
+ str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);
+ log.info("Post CustomExceptionLoader.LookupMap" + str);
+
+ if (str != null) {
+
+ if (str[0].matches("SVC")) {
+ exceptionType = "ServiceException";
+ } else if (str[1].matches("POL")) {
+ exceptionType = "PolicyException";
+ }
+
+ JSONObject jb = new JSONObject().put("requestError",
+ new JSONObject().put(exceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1])));
+
+ log.debug("Constructed json error : " + jb);
+ ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
+ } else {
+ JSONObject jb = new JSONObject().put("requestError",
+ new JSONObject().put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg)));
+ ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
+ }
+
+ }
+
+ public static void safeClose(FileReader fr) {
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ log.error("Error closing file reader stream : " + e);
+ }
+ }
+
+ }
+
+ public static void safeClose(InputStream is) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ log.error("Error closing Input stream : " + e);
+ }
+ }
+
+ }
+
+ public static String schemaFileVersion(String version) {
+ String filename = null;
+
+ if (CommonStartup.schemaFileJson.has(version)) {
+ filename = CommonStartup.schemaFileJson.getString(version);
+ } else {
+ filename = CommonStartup.schemaFileJson.getString("v5");
+
+ }
+ log.info(String.format("VESversion: %s Schema File:%s", version, filename));
+ return filename;
+
+ }
}
+