diff options
-rw-r--r-- | .project | 2 | ||||
-rw-r--r-- | etc/collector.properties | 6 | ||||
-rw-r--r-- | src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java | 12 | ||||
-rw-r--r-- | src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java | 114 | ||||
-rw-r--r-- | src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java | 55 | ||||
-rw-r--r-- | src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java | 98 |
6 files changed, 166 insertions, 121 deletions
@@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <projectDescription> - <name>OpenVESCollector</name> + <name>ONAPVESCollector</name> <comment></comment> <projects> </projects> diff --git a/etc/collector.properties b/etc/collector.properties index 672a7fe..dabe1b0 100644 --- a/etc/collector.properties +++ b/etc/collector.properties @@ -21,7 +21,7 @@ collector.service.port=8080 ## The secure port is required if header.authflag is set to 1 (true) ## Authentication is only supported via secure port ## When enabled - require valid keystore defined -##collector.service.secure.port=8443 +## collector.service.secure.port=8443 ## The keystore must be setup per installation when secure port is configured collector.keystore.file.location=../etc/keystore @@ -37,7 +37,7 @@ collector.keystore.alias=tomcat ## buffer beyond a reasonable size limit. With a limit, the server won't crash ## due to being out of memory, and the caller will get a 5xx reply saying the ## server is in trouble. -#collector.inputQueue.maxPending=4096 +collector.inputQueue.maxPending=8096 ## Schema Validation checkflag ## default no validation checkflag (-1) @@ -59,7 +59,7 @@ exceptionConfig=./etc/ExceptionConfig.json header.authflag=0 ## Combination of userid,base64 encoded pwd list to be supported ## userid and pwd comma separated; pipe delimitation between each pair -header.authlist=secureid,IWRjYWVSb2FkbTEyMyEt|sample1,c2FtcGxlMQ==|vdnsagg,dmRuc2FnZw== +header.authlist=ssample1,c2FtcGxlMQ==|vdnsagg,dmRuc2FnZw== ############################################################################### ## diff --git a/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java b/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java index 869a5c7..f12dd60 100644 --- a/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java +++ b/src/main/java/org/openecomp/dcae/commonFunction/CommonStartup.java @@ -207,15 +207,18 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable EventProcessor ep = new EventProcessor (); - //Thread epThread=new Thread(ep); - //epThread.start(); executor = Executors.newFixedThreadPool(20); - executor.execute(ep); + for (int i = 0; i < 20; i++) { + executor.execute(ep); + + } } catch ( loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException | InterruptedException e ) { CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage() ); + + e.printStackTrace(); throw new RuntimeException ( e ); } finally @@ -272,7 +275,7 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable } log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); CommonStartup.metriclog.info("EVENT_PUBLISH_END"); - //ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS); + } catch ( JSONException e ){ @@ -338,4 +341,5 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; private static ApiServer fTomcatServer = null; private static final Logger log = LoggerFactory.getLogger ( CommonStartup.class ); + } diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java index 5f27217..796bc97 100644 --- a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java @@ -32,87 +32,113 @@ import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.TimeZone; -import java.util.UUID; -import org.json.JSONArray; +import org.json.JSONException; import org.json.JSONObject; public class EventProcessor implements Runnable { private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); - private JSONObject event = null; public EventProcessor() { log.debug("EventProcessor: Default Constructor"); - + String list[] = CommonStartup.streamid.split("\\|"); for (int i = 0; i < list.length; i++) { String domain = list[i].split("=")[0]; - //String streamIdList[] = list[i].split("=")[1].split(","); - String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(","); - + // String streamIdList[] = list[i].split("=")[1].split(","); + String streamIdList[] = list[i].substring(list[i].indexOf("=") + 1).split(","); + log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList)); streamid_hash.put(domain, streamIdList); } - + } @Override public void run() { - + JSONObject event = null; try { - + event = CommonStartup.fProcessingInputQueue.take(); log.info("EventProcessor\tRemoving element: " + event); - - //EventPublisher Ep=new EventPublisher(); + while (event != null) { - // As long as the producer is running we remove elements from the queue. - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); - localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); - String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event.toString()); - } - - else { - for (int i=0; i < streamIdList.length; i++) - { - log.info("Invoking publisher for streamId:" + streamIdList[i]); - this.overrideEvent(); - EventPublisher.getInstance(streamIdList[i]).sendEvent(event); - + + try { + + // As long as the producer is running we remove elements + // from the queue. + + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + String streamIdList[] = streamid_hash + .get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + log.debug("streamIdList:" + streamIdList); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event.toString()); + } + + else { + + event = this.overrideEvent(event); + for (int i = 0; i < streamIdList.length; i++) { + + if (!event.has("VESuniqueId")) { + event.put("VESuniqueId", uuid); + } + + log.info("Invoking publisher for streamId:" + streamIdList[i]); + + EventPublisher ep = new EventPublisher(streamIdList[i]); + ep.sendEvent(event); + ep.closePublisher(); + + } } + log.debug("Message published" + event.toString()); + + } catch (JSONException e) { + log.error("EventProcessor Json parse exception" + e.getMessage() + event.toString()); + e.printStackTrace(); + } catch (Exception e) { + log.error("EventProcessor exception" + e.getMessage() + event.toString()); + e.printStackTrace(); } - log.debug("Message published" + event.toString()); event = CommonStartup.fProcessingInputQueue.take(); - // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); } } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); + log.error("EventProcessor InterruptedException" + e.getMessage() + event.toString()); + e.printStackTrace(); } } - - public void overrideEvent() - { - //Set collector timestamp in event payload before publish + public JSONObject overrideEvent(JSONObject event) { + // Set collector timestamp in event payload before publish final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) ); + + /* + * "event": { "commonEventHeader": { "internalHeaderFields": { + * "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" }, + */ + + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)); JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey); + + event.getJSONObject("event").put("commonEventHeader", commonEventHeaderkey); + log.debug("Modified event:" + event); - + return event; + } } diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java index b40fb24..62a77cd 100644 --- a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java +++ b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java @@ -39,19 +39,18 @@ import com.att.nsa.logging.log4j.EcompFields; public class EventPublisher { - private static EventPublisher instance = null; - private static CambriaBatchingPublisher pub = null; - + + private CambriaBatchingPublisher pub = null; private String streamid = ""; private String ueburl=""; private String topic=""; private String authuser=""; private String authpwd=""; - private static Logger log = LoggerFactory.getLogger(EventPublisher.class); + private static Logger log = LoggerFactory.getLogger(EventPublisher.class); - private EventPublisher( String newstreamid) { + EventPublisher( String newstreamid) { this.streamid = newstreamid; try { @@ -75,28 +74,18 @@ public class EventPublisher { } - - public static synchronized EventPublisher getInstance( String streamid){ - if (instance == null) { - instance = new EventPublisher(streamid); - } - if (!instance.streamid.equals(streamid)){ - instance.closePublisher(); - instance = new EventPublisher(streamid); - } - return instance; - - } + - public synchronized void sendEvent(JSONObject event) { + public void sendEvent(JSONObject event) { log.debug("EventPublisher.sendEvent: instance for publish is ready"); + String uuid = ""; if (event.has("VESuniqueId")) { - String uuid = event.get("VESuniqueId").toString(); + uuid = event.get("VESuniqueId").toString(); LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); log.debug("Removing VESuniqueid object from event"); @@ -104,13 +93,12 @@ public class EventPublisher { } - try { if (authuser != null) { - log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd); + log.debug("URL:" + ueburl + " TOPIC:" + topic + " AuthUser:" + authuser); pub = new CambriaClientBuilders.PublisherBuilder () .usingHosts (ueburl) .onTopic (topic) @@ -124,7 +112,7 @@ public class EventPublisher { else { - log.debug("URL:" + ueburl + "TOPIC:" + topic ); + log.debug("URL:" + ueburl + " TOPIC:" + topic ); pub = new CambriaClientBuilders.PublisherBuilder () .usingHosts (ueburl) .onTopic (topic) @@ -142,25 +130,26 @@ public class EventPublisher { log.info("Pending Message Count="+pendingMsgs); } - //closePublisher(); log.info("pub.send invoked - no error"); - CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event); + CommonStartup.oplog.info ("URL:" + ueburl + " TOPIC:" + topic + " VESuniqueId:" + uuid + " Event Published:" + event); } catch(IOException e) { - log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + log.error("IOException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + e.printStackTrace(); } catch (GeneralSecurityException e) { - // TODO Auto-generated catch block - log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + log.error("GeneralSecurityException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + e.printStackTrace(); } catch (IllegalArgumentException e) { - log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + log.error("IllegalArgumentException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + e.printStackTrace(); } } - public synchronized void closePublisher() { + public void closePublisher() { try { if (pub!= null) @@ -171,10 +160,12 @@ public class EventPublisher { } } } - catch(InterruptedException ie) { + catch(InterruptedException e) { log.error("Caught an Interrupted Exception on Close event"); - }catch(IOException ioe) { - log.error("Caught IO Exception: " + ioe.toString()); + e.printStackTrace(); + }catch(IOException e) { + log.error("Caught IO Exception: " + e.toString()); + e.printStackTrace(); } } diff --git a/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java index 173b4d0..ba03dde 100644 --- a/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java +++ b/src/main/java/org/openecomp/dcae/restapi/endpoints/EventReceipt.java @@ -25,12 +25,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; + import java.util.UUID; + +import org.apache.commons.io.IOUtils; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import org.json.JSONTokener; + import org.openecomp.dcae.commonFunction.CommonStartup; import org.openecomp.dcae.commonFunction.CustomExceptionLoader; @@ -62,8 +65,8 @@ public class EventReceipt extends NsaBaseEndpoint { JSONObject jsonObject = null; FileReader fr = null; InputStream istr = null; - // String br = new BufferedReader(new - // InputStreamReader(ctx.request().getBodyStream())).readLine(); + String msg = null; + final UUID uuid = java.util.UUID.randomUUID(); @@ -74,14 +77,15 @@ public class EventReceipt extends NsaBaseEndpoint { try { - // JsonElement msg = new JsonParser().parse(new BufferedReader(new - // InputStreamReader(ctx.request().getBodyStream())).readLine()); - - istr = ctx.request().getBodyStream(); - jsonObject = new JSONObject(new JSONTokener(istr)); + // JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine()); + istr = ctx.request().getBodyStream(); + msg = IOUtils.toString(istr); + jsonObject = new JSONObject(msg); + + //jsonObject = new JSONObject(new JSONTokener(istr)); - CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); - log.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); + CommonStartup.inlog.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Messsage: " + jsonObject); + log.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Messsage: " + jsonObject); try { @@ -101,6 +105,12 @@ public class EventReceipt extends NsaBaseEndpoint { if (CommonStartup.schema_Validatorflag > 0) { + if (jsonObject.has("eventList")) + { + log.info("Validation failed - contains batch eventList block "); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); + return; + } fr = new FileReader(CommonStartup.schemaFile); String schema = new JsonParser().parse(fr).toString(); @@ -108,11 +118,11 @@ public class EventReceipt extends NsaBaseEndpoint { if (valresult.equals("true")) { log.info("Validation successful"); } else if (valresult.equals("false")) { - log.info("Validation failed"); + log.info("Validation failed on schema check"); respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); return; } else { - log.error("Validation errored" + valresult); + log.error("Validation errored on schema check" + valresult); respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); return; } @@ -137,16 +147,18 @@ public class EventReceipt extends NsaBaseEndpoint { return; } - } catch (JSONException | NullPointerException | IOException x) { + } catch (JSONException | NullPointerException | IOException e) { + e.printStackTrace(); log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest - + " Message:" + x.getMessage()); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString()); + + " Message:" + e.getMessage() + istr.toString() + msg); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request" + e.toString() + msg) ; + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); return; } catch (QueueFullException e) { e.printStackTrace(); - log.error("Collector internal queue full :" + e.getMessage()); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString()); + log.error("Collector internal queue full :" + e.getMessage() + msg); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString() + msg); respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full"); return; } finally { @@ -174,15 +186,10 @@ public class EventReceipt extends NsaBaseEndpoint { JSONObject jsonObject = null; FileReader fr = null; InputStream istr = null; + String msg = null; try { - // String br = new BufferedReader(new - // InputStreamReader(ctx.request().getBodyStream())).readLine(); - // JsonElement msg = new JsonParser().parse(new BufferedReader(new - // InputStreamReader(ctx.request().getBodyStream())).readLine()); - // jsonArray = new JSONArray ( new JSONTokener ( ctx.request - // ().getBodyStream () ) ); final UUID uuid = java.util.UUID.randomUUID(); LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); @@ -192,13 +199,17 @@ public class EventReceipt extends NsaBaseEndpoint { log.debug ("Request recieved :" + ctx.request().getRemoteAddress()); istr = ctx.request().getBodyStream(); - jsonObject = new JSONObject(new JSONTokener(istr)); - // jsonObject = new JSONObject ( new JSONTokener ( ctx.request - // ().getBodyStream () ) ); + + msg = IOUtils.toString(istr); + jsonObject = new JSONObject(msg); + + //jsonObject = new JSONObject(new JSONTokener(istr)); - CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); - log.info("Input Messsage: " + jsonObject); + CommonStartup.inlog.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Batch: " + jsonObject); + log.info(ctx.request().getRemoteAddress() + " VESuniqueId:" + uuid + " Input Batch: " + jsonObject); + + try { if (CommonStartup.authflag == 1) { retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx); @@ -214,31 +225,42 @@ public class EventReceipt extends NsaBaseEndpoint { if (retkey != null || CommonStartup.authflag == 0) { if (CommonStartup.schema_Validatorflag > 0) { + + fr = new FileReader(CommonStartup.schemaFile); String schema = new JsonParser().parse(fr).toString(); valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema); if (valresult.equals("true")) { - log.info("Validation successful"); + log.info("Validation successful on schema check"); } else if (valresult.equals("false")) { - log.info("Validation failed"); + log.info("Validation failed on schema check"); respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); return; } else { - log.error("Validation errored" + valresult); + log.error("Validation errored on schema check" + valresult); respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); return; } jsonArray = jsonObject.getJSONArray("eventList"); - log.info("Validation successful for all events in batch"); + for (int i = 0; i < jsonArray.length(); i++) { + + if (jsonArray.getJSONObject(i).has("event")) + { + log.info("Validation failed - contains sub event block "); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); + return; + } + event = new JSONObject().put("event", jsonArray.getJSONObject(i)); event.put("VESuniqueId", uuid + "-"+i); jsonArrayMod.put(event); } + log.info("Validation successful for all events in batch"); log.info("Modified jsonarray:" + jsonArrayMod.toString()); } @@ -258,16 +280,17 @@ public class EventReceipt extends NsaBaseEndpoint { respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user"); return; } - } catch (JSONException | NullPointerException | IOException x) { + } catch (JSONException | NullPointerException | IOException e) { + e.printStackTrace(); log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest - + " Message:" + x.getMessage()); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString()); + + " Message:" + e.getMessage() + msg); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + e.toString() + msg); respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); return; } catch (QueueFullException e) { e.printStackTrace(); - log.error("Collector internal queue full :" + e.getMessage()); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString()); + log.error("Collector internal queue full :" + e.getMessage() + msg); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString() + msg); respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full"); return; } finally { @@ -310,6 +333,7 @@ public class EventReceipt extends NsaBaseEndpoint { } } + public static void safeClose(FileReader fr) { if (fr != null) { |