diff options
Diffstat (limited to 'src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java')
-rw-r--r-- | src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java | 114 |
1 files changed, 70 insertions, 44 deletions
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java index 5f27217..796bc97 100644 --- a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java @@ -32,87 +32,113 @@ import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.TimeZone; -import java.util.UUID; -import org.json.JSONArray; +import org.json.JSONException; import org.json.JSONObject; public class EventProcessor implements Runnable { private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); - private JSONObject event = null; public EventProcessor() { log.debug("EventProcessor: Default Constructor"); - + String list[] = CommonStartup.streamid.split("\\|"); for (int i = 0; i < list.length; i++) { String domain = list[i].split("=")[0]; - //String streamIdList[] = list[i].split("=")[1].split(","); - String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(","); - + // String streamIdList[] = list[i].split("=")[1].split(","); + String streamIdList[] = list[i].substring(list[i].indexOf("=") + 1).split(","); + log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList)); streamid_hash.put(domain, streamIdList); } - + } @Override public void run() { - + JSONObject event = null; try { - + event = CommonStartup.fProcessingInputQueue.take(); log.info("EventProcessor\tRemoving element: " + event); - - //EventPublisher Ep=new EventPublisher(); + while (event != null) { - // As long as the producer is running we remove elements from the queue. - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); - localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); - String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event.toString()); - } - - else { - for (int i=0; i < streamIdList.length; i++) - { - log.info("Invoking publisher for streamId:" + streamIdList[i]); - this.overrideEvent(); - EventPublisher.getInstance(streamIdList[i]).sendEvent(event); - + + try { + + // As long as the producer is running we remove elements + // from the queue. + + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + String streamIdList[] = streamid_hash + .get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + log.debug("streamIdList:" + streamIdList); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event.toString()); + } + + else { + + event = this.overrideEvent(event); + for (int i = 0; i < streamIdList.length; i++) { + + if (!event.has("VESuniqueId")) { + event.put("VESuniqueId", uuid); + } + + log.info("Invoking publisher for streamId:" + streamIdList[i]); + + EventPublisher ep = new EventPublisher(streamIdList[i]); + ep.sendEvent(event); + ep.closePublisher(); + + } } + log.debug("Message published" + event.toString()); + + } catch (JSONException e) { + log.error("EventProcessor Json parse exception" + e.getMessage() + event.toString()); + e.printStackTrace(); + } catch (Exception e) { + log.error("EventProcessor exception" + e.getMessage() + event.toString()); + e.printStackTrace(); } - log.debug("Message published" + event.toString()); event = CommonStartup.fProcessingInputQueue.take(); - // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); } } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); + log.error("EventProcessor InterruptedException" + e.getMessage() + event.toString()); + e.printStackTrace(); } } - - public void overrideEvent() - { - //Set collector timestamp in event payload before publish + public JSONObject overrideEvent(JSONObject event) { + // Set collector timestamp in event payload before publish final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) ); + + /* + * "event": { "commonEventHeader": { "internalHeaderFields": { + * "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" }, + */ + + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)); JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey); + + event.getJSONObject("event").put("commonEventHeader", commonEventHeaderkey); + log.debug("Modified event:" + event); - + return event; + } } |