diff options
Diffstat (limited to 'src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java')
-rw-r--r-- | src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java | 79 |
1 files changed, 68 insertions, 11 deletions
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java index 0e6f7e7..a5e90b9 100644 --- a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java @@ -20,40 +20,81 @@ package org.openecomp.dcae.commonFunction; -import org.json.JSONObject; +import java.text.SimpleDateFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; + +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.TimeZone; +import java.util.UUID; + +import org.json.JSONArray; +import org.json.JSONObject; + public class EventProcessor implements Runnable { private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + + private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); private JSONObject event = null; public EventProcessor() { log.debug("EventProcessor: Default Constructor"); + + String list[] = CommonStartup.streamid.split("\\|"); + for (int i = 0; i < list.length; i++) { + String domain = list[i].split("=")[0]; + //String streamIdList[] = list[i].split("=")[1].split(","); + String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(","); + + log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList)); + streamid_hash.put(domain, streamIdList); + } + } @Override public void run() { try { + event = CommonStartup.fProcessingInputQueue.take(); log.info("EventProcessor\tRemoving element: " + event); - + + //EventPublisher Ep=new EventPublisher(); while (event != null) { - // As long as the producer is running, - // we remove elements from the queue. + // As long as the producer is running we remove elements from the queue. - // log.info("EventProcessor\tRemoving element: " + - // this.queue.remove()); - - if (CommonStartup.streamid == null) { + //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString()); + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); + localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); + + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + log.debug("streamIdList:" + streamIdList); + + if (streamIdList.length == 0) { log.error("No StreamID defined for publish - Message dropped" + event.toString()); - } else { - EventPublisher.getInstance(CommonStartup.cambriaConfigFile, CommonStartup.streamid) - .sendEvent(event.toString(), CommonStartup.streamid); + } + + else { + for (int i=0; i < streamIdList.length; i++) + { + log.info("Invoking publisher for streamId:" + streamIdList[i]); + this.overrideEvent(); + EventPublisher.getInstance(streamIdList[i]).sendEvent(event); + + } } log.debug("Message published" + event.toString()); event = CommonStartup.fProcessingInputQueue.take(); + // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); } } catch (InterruptedException e) { log.error("EventProcessor InterruptedException" + e.getMessage()); @@ -61,4 +102,20 @@ public class EventProcessor implements Runnable { } + + public void overrideEvent() + { + //Set collector timestamp in event payload before publish + final Date currentTime = new Date(); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); + JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray ); + JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); + commonEventHeaderkey.put("internalHeaderFields", additionalParameter); + event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey); + log.debug("Modified event:" + event); + + } } |