aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVENKATESH KUMAR <vv770d@att.com>2017-06-26 19:48:47 -0400
committerVENKATESH KUMAR <vv770d@att.com>2017-06-26 20:02:13 -0400
commit85c567a4214faa17d28934912bb76aea282b5158 (patch)
tree91f1067a76a6f19948940c42171a323b23feae64 /src
parent35ad3be1673be895ec4775f3a8c905dbf17abbf1 (diff)
Update VES for logging and exception handling
Change-Id: I44fd9535ebb3d3be88ddbf5890109bd23cd00bdc Signed-off-by: VENKATESH KUMAR <vv770d@att.com>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java12
-rw-r--r--src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java114
-rw-r--r--src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java55
-rw-r--r--src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java98
4 files changed, 162 insertions, 117 deletions
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java b/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java
index 869a5c7..f12dd60 100644
--- a/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java
+++ b/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java
@@ -207,15 +207,18 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable
EventProcessor ep = new EventProcessor ();
- //Thread epThread=new Thread(ep);
- //epThread.start();
executor = Executors.newFixedThreadPool(20);
- executor.execute(ep);
+ for (int i = 0; i < 20; i++) {
+ executor.execute(ep);
+
+ }
}
catch ( loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException | InterruptedException e )
{
CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage() );
+
+ e.printStackTrace();
throw new RuntimeException ( e );
}
finally
@@ -272,7 +275,7 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable
}
log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- //ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
+
}
catch ( JSONException e ){
@@ -338,4 +341,5 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable
static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
private static ApiServer fTomcatServer = null;
private static final Logger log = LoggerFactory.getLogger ( CommonStartup.class );
+
}
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
index 5f27217..796bc97 100644
--- a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
@@ -32,87 +32,113 @@ import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;
-import java.util.UUID;
-import org.json.JSONArray;
+import org.json.JSONException;
import org.json.JSONObject;
public class EventProcessor implements Runnable {
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
- private JSONObject event = null;
public EventProcessor() {
log.debug("EventProcessor: Default Constructor");
-
+
String list[] = CommonStartup.streamid.split("\\|");
for (int i = 0; i < list.length; i++) {
String domain = list[i].split("=")[0];
- //String streamIdList[] = list[i].split("=")[1].split(",");
- String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(",");
-
+ // String streamIdList[] = list[i].split("=")[1].split(",");
+ String streamIdList[] = list[i].substring(list[i].indexOf("=") + 1).split(",");
+
log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList));
streamid_hash.put(domain, streamIdList);
}
-
+
}
@Override
public void run() {
-
+ JSONObject event = null;
try {
-
+
event = CommonStartup.fProcessingInputQueue.take();
log.info("EventProcessor\tRemoving element: " + event);
-
- //EventPublisher Ep=new EventPublisher();
+
while (event != null) {
- // As long as the producer is running we remove elements from the queue.
- String uuid = event.get("VESuniqueId").toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
- localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
-
- log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
- String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
- log.debug("streamIdList:" + streamIdList);
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event.toString());
- }
-
- else {
- for (int i=0; i < streamIdList.length; i++)
- {
- log.info("Invoking publisher for streamId:" + streamIdList[i]);
- this.overrideEvent();
- EventPublisher.getInstance(streamIdList[i]).sendEvent(event);
-
+
+ try {
+
+ // As long as the producer is running we remove elements
+ // from the queue.
+
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:"
+ + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
+ String streamIdList[] = streamid_hash
+ .get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
+ log.debug("streamIdList:" + streamIdList);
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event.toString());
+ }
+
+ else {
+
+ event = this.overrideEvent(event);
+ for (int i = 0; i < streamIdList.length; i++) {
+
+ if (!event.has("VESuniqueId")) {
+ event.put("VESuniqueId", uuid);
+ }
+
+ log.info("Invoking publisher for streamId:" + streamIdList[i]);
+
+ EventPublisher ep = new EventPublisher(streamIdList[i]);
+ ep.sendEvent(event);
+ ep.closePublisher();
+
+ }
}
+ log.debug("Message published" + event.toString());
+
+ } catch (JSONException e) {
+ log.error("EventProcessor Json parse exception" + e.getMessage() + event.toString());
+ e.printStackTrace();
+ } catch (Exception e) {
+ log.error("EventProcessor exception" + e.getMessage() + event.toString());
+ e.printStackTrace();
}
- log.debug("Message published" + event.toString());
event = CommonStartup.fProcessingInputQueue.take();
- // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
}
} catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
+ log.error("EventProcessor InterruptedException" + e.getMessage() + event.toString());
+ e.printStackTrace();
}
}
-
- public void overrideEvent()
- {
- //Set collector timestamp in event payload before publish
+ public JSONObject overrideEvent(JSONObject event) {
+ // Set collector timestamp in event payload before publish
final Date currentTime = new Date();
- final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+ final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) );
+
+ /*
+ * "event": { "commonEventHeader": { "internalHeaderFields": {
+ * "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" },
+ */
+
+ JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime));
JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
- event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey);
+
+ event.getJSONObject("event").put("commonEventHeader", commonEventHeaderkey);
+
log.debug("Modified event:" + event);
-
+ return event;
+
}
}
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
index b40fb24..62a77cd 100644
--- a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
+++ b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
@@ -39,19 +39,18 @@ import com.att.nsa.logging.log4j.EcompFields;
public class EventPublisher {
- private static EventPublisher instance = null;
- private static CambriaBatchingPublisher pub = null;
-
+
+ private CambriaBatchingPublisher pub = null;
private String streamid = "";
private String ueburl="";
private String topic="";
private String authuser="";
private String authpwd="";
- private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
+ private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
- private EventPublisher( String newstreamid) {
+ EventPublisher( String newstreamid) {
this.streamid = newstreamid;
try {
@@ -75,28 +74,18 @@ public class EventPublisher {
}
-
- public static synchronized EventPublisher getInstance( String streamid){
- if (instance == null) {
- instance = new EventPublisher(streamid);
- }
- if (!instance.streamid.equals(streamid)){
- instance.closePublisher();
- instance = new EventPublisher(streamid);
- }
- return instance;
-
- }
+
- public synchronized void sendEvent(JSONObject event) {
+ public void sendEvent(JSONObject event) {
log.debug("EventPublisher.sendEvent: instance for publish is ready");
+ String uuid = "";
if (event.has("VESuniqueId"))
{
- String uuid = event.get("VESuniqueId").toString();
+ uuid = event.get("VESuniqueId").toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
log.debug("Removing VESuniqueid object from event");
@@ -104,13 +93,12 @@ public class EventPublisher {
}
-
try {
if (authuser != null)
{
- log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd);
+ log.debug("URL:" + ueburl + " TOPIC:" + topic + " AuthUser:" + authuser);
pub = new CambriaClientBuilders.PublisherBuilder ()
.usingHosts (ueburl)
.onTopic (topic)
@@ -124,7 +112,7 @@ public class EventPublisher {
else
{
- log.debug("URL:" + ueburl + "TOPIC:" + topic );
+ log.debug("URL:" + ueburl + " TOPIC:" + topic );
pub = new CambriaClientBuilders.PublisherBuilder ()
.usingHosts (ueburl)
.onTopic (topic)
@@ -142,25 +130,26 @@ public class EventPublisher {
log.info("Pending Message Count="+pendingMsgs);
}
- //closePublisher();
log.info("pub.send invoked - no error");
- CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event);
+ CommonStartup.oplog.info ("URL:" + ueburl + " TOPIC:" + topic + " VESuniqueId:" + uuid + " Event Published:" + event);
} catch(IOException e) {
- log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ log.error("IOException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ e.printStackTrace();
} catch (GeneralSecurityException e) {
- // TODO Auto-generated catch block
- log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ log.error("GeneralSecurityException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ e.printStackTrace();
}
catch (IllegalArgumentException e)
{
- log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ log.error("IllegalArgumentException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ e.printStackTrace();
}
}
- public synchronized void closePublisher() {
+ public void closePublisher() {
try {
if (pub!= null)
@@ -171,10 +160,12 @@ public class EventPublisher {
}
}
}
- catch(InterruptedException ie) {
+ catch(InterruptedException e) {
log.error("Caught an Interrupted Exception on Close event");
- }catch(IOException ioe) {
- log.error("Caught IO Exception: " + ioe.toString());
+ e.printStackTrace();
+ }catch(IOException e) {
+ log.error("Caught IO Exception: " + e.toString());
+ e.printStackTrace();
}
}
diff --git a/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java
index 173b4d0..ba03dde 100644
--- a/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java
+++ b/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java
@@ -25,12 +25,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+
import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import org.json.JSONTokener;
+
import org.openecomp.dcae.commonFunction.CommonStartup;
import org.openecomp.dcae.commonFunction.CustomExceptionLoader;
@@ -62,8 +65,8 @@ public class EventReceipt extends NsaBaseEndpoint {
JSONObject jsonObject = null;
FileReader fr = null;
InputStream istr = null;
- // String br = new BufferedReader(new
- // InputStreamReader(ctx.request().getBodyStream())).readLine();
+ String msg = null;
+
final UUID uuid = java.util.UUID.randomUUID();
@@ -74,14 +77,15 @@ public class EventReceipt extends NsaBaseEndpoint {
try {
- // JsonElement msg = new JsonParser().parse(new BufferedReader(new
- // InputStreamReader(ctx.request().getBodyStream())).readLine());
-
- istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
+ // JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine());
+ istr = ctx.request().getBodyStream();
+ msg = IOUtils.toString(istr);
+ jsonObject = new JSONObject(msg);
+
+ //jsonObject = new JSONObject(new JSONTokener(istr));
- CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
+ CommonStartup.inlog.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Messsage: " + jsonObject);
try {
@@ -101,6 +105,12 @@ public class EventReceipt extends NsaBaseEndpoint {
if (CommonStartup.schema_Validatorflag > 0) {
+ if (jsonObject.has("eventList"))
+ {
+ log.info("Validation failed - contains batch eventList block ");
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
+ return;
+ }
fr = new FileReader(CommonStartup.schemaFile);
String schema = new JsonParser().parse(fr).toString();
@@ -108,11 +118,11 @@ public class EventReceipt extends NsaBaseEndpoint {
if (valresult.equals("true")) {
log.info("Validation successful");
} else if (valresult.equals("false")) {
- log.info("Validation failed");
+ log.info("Validation failed on schema check");
respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
return;
} else {
- log.error("Validation errored" + valresult);
+ log.error("Validation errored on schema check" + valresult);
respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
return;
}
@@ -137,16 +147,18 @@ public class EventReceipt extends NsaBaseEndpoint {
return;
}
- } catch (JSONException | NullPointerException | IOException x) {
+ } catch (JSONException | NullPointerException | IOException e) {
+ e.printStackTrace();
log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest
- + " Message:" + x.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString());
+ + " Message:" + e.getMessage() + istr.toString() + msg);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request" + e.toString() + msg) ;
+
respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
return;
} catch (QueueFullException e) {
e.printStackTrace();
- log.error("Collector internal queue full :" + e.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString());
+ log.error("Collector internal queue full :" + e.getMessage() + msg);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString() + msg);
respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
return;
} finally {
@@ -174,15 +186,10 @@ public class EventReceipt extends NsaBaseEndpoint {
JSONObject jsonObject = null;
FileReader fr = null;
InputStream istr = null;
+ String msg = null;
try {
- // String br = new BufferedReader(new
- // InputStreamReader(ctx.request().getBodyStream())).readLine();
- // JsonElement msg = new JsonParser().parse(new BufferedReader(new
- // InputStreamReader(ctx.request().getBodyStream())).readLine());
- // jsonArray = new JSONArray ( new JSONTokener ( ctx.request
- // ().getBodyStream () ) );
final UUID uuid = java.util.UUID.randomUUID();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
@@ -192,13 +199,17 @@ public class EventReceipt extends NsaBaseEndpoint {
log.debug ("Request recieved :" + ctx.request().getRemoteAddress());
istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
- // jsonObject = new JSONObject ( new JSONTokener ( ctx.request
- // ().getBodyStream () ) );
+
+ msg = IOUtils.toString(istr);
+ jsonObject = new JSONObject(msg);
+
+ //jsonObject = new JSONObject(new JSONTokener(istr));
- CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
- log.info("Input Messsage: " + jsonObject);
+ CommonStartup.inlog.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Batch: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Batch: " + jsonObject);
+
+
try {
if (CommonStartup.authflag == 1) {
retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
@@ -214,31 +225,42 @@ public class EventReceipt extends NsaBaseEndpoint {
if (retkey != null || CommonStartup.authflag == 0) {
if (CommonStartup.schema_Validatorflag > 0) {
+
+
fr = new FileReader(CommonStartup.schemaFile);
String schema = new JsonParser().parse(fr).toString();
valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
if (valresult.equals("true")) {
- log.info("Validation successful");
+ log.info("Validation successful on schema check");
} else if (valresult.equals("false")) {
- log.info("Validation failed");
+ log.info("Validation failed on schema check");
respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
return;
} else {
- log.error("Validation errored" + valresult);
+ log.error("Validation errored on schema check" + valresult);
respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
return;
}
jsonArray = jsonObject.getJSONArray("eventList");
- log.info("Validation successful for all events in batch");
+
for (int i = 0; i < jsonArray.length(); i++) {
+
+ if (jsonArray.getJSONObject(i).has("event"))
+ {
+ log.info("Validation failed - contains sub event block ");
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
+ return;
+ }
+
event = new JSONObject().put("event", jsonArray.getJSONObject(i));
event.put("VESuniqueId", uuid + "-"+i);
jsonArrayMod.put(event);
}
+ log.info("Validation successful for all events in batch");
log.info("Modified jsonarray:" + jsonArrayMod.toString());
}
@@ -258,16 +280,17 @@ public class EventReceipt extends NsaBaseEndpoint {
respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user");
return;
}
- } catch (JSONException | NullPointerException | IOException x) {
+ } catch (JSONException | NullPointerException | IOException e) {
+ e.printStackTrace();
log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest
- + " Message:" + x.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString());
+ + " Message:" + e.getMessage() + msg);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + e.toString() + msg);
respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
return;
} catch (QueueFullException e) {
e.printStackTrace();
- log.error("Collector internal queue full :" + e.getMessage());
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString());
+ log.error("Collector internal queue full :" + e.getMessage() + msg);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString() + msg);
respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
return;
} finally {
@@ -310,6 +333,7 @@ public class EventReceipt extends NsaBaseEndpoint {
}
}
+
public static void safeClose(FileReader fr) {
if (fr != null) {