aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java')
-rw-r--r--src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java114
1 files changed, 70 insertions, 44 deletions
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
index 5f27217..796bc97 100644
--- a/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/openecomp/dcae/commonFunction/EventProcessor.java
@@ -32,87 +32,113 @@ import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;
-import java.util.UUID;
-import org.json.JSONArray;
+import org.json.JSONException;
import org.json.JSONObject;
public class EventProcessor implements Runnable {
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
- private JSONObject event = null;
public EventProcessor() {
log.debug("EventProcessor: Default Constructor");
-
+
String list[] = CommonStartup.streamid.split("\\|");
for (int i = 0; i < list.length; i++) {
String domain = list[i].split("=")[0];
- //String streamIdList[] = list[i].split("=")[1].split(",");
- String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(",");
-
+ // String streamIdList[] = list[i].split("=")[1].split(",");
+ String streamIdList[] = list[i].substring(list[i].indexOf("=") + 1).split(",");
+
log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList));
streamid_hash.put(domain, streamIdList);
}
-
+
}
@Override
public void run() {
-
+ JSONObject event = null;
try {
-
+
event = CommonStartup.fProcessingInputQueue.take();
log.info("EventProcessor\tRemoving element: " + event);
-
- //EventPublisher Ep=new EventPublisher();
+
while (event != null) {
- // As long as the producer is running we remove elements from the queue.
- String uuid = event.get("VESuniqueId").toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
- localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
-
- log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
- String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
- log.debug("streamIdList:" + streamIdList);
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event.toString());
- }
-
- else {
- for (int i=0; i < streamIdList.length; i++)
- {
- log.info("Invoking publisher for streamId:" + streamIdList[i]);
- this.overrideEvent();
- EventPublisher.getInstance(streamIdList[i]).sendEvent(event);
-
+
+ try {
+
+ // As long as the producer is running we remove elements
+ // from the queue.
+
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:"
+ + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
+ String streamIdList[] = streamid_hash
+ .get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
+ log.debug("streamIdList:" + streamIdList);
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event.toString());
+ }
+
+ else {
+
+ event = this.overrideEvent(event);
+ for (int i = 0; i < streamIdList.length; i++) {
+
+ if (!event.has("VESuniqueId")) {
+ event.put("VESuniqueId", uuid);
+ }
+
+ log.info("Invoking publisher for streamId:" + streamIdList[i]);
+
+ EventPublisher ep = new EventPublisher(streamIdList[i]);
+ ep.sendEvent(event);
+ ep.closePublisher();
+
+ }
}
+ log.debug("Message published" + event.toString());
+
+ } catch (JSONException e) {
+ log.error("EventProcessor Json parse exception" + e.getMessage() + event.toString());
+ e.printStackTrace();
+ } catch (Exception e) {
+ log.error("EventProcessor exception" + e.getMessage() + event.toString());
+ e.printStackTrace();
}
- log.debug("Message published" + event.toString());
event = CommonStartup.fProcessingInputQueue.take();
- // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
}
} catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
+ log.error("EventProcessor InterruptedException" + e.getMessage() + event.toString());
+ e.printStackTrace();
}
}
-
- public void overrideEvent()
- {
- //Set collector timestamp in event payload before publish
+ public JSONObject overrideEvent(JSONObject event) {
+ // Set collector timestamp in event payload before publish
final Date currentTime = new Date();
- final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+ final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) );
+
+ /*
+ * "event": { "commonEventHeader": { "internalHeaderFields": {
+ * "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" },
+ */
+
+ JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime));
JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
- event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey);
+
+ event.getJSONObject("event").put("commonEventHeader", commonEventHeaderkey);
+
log.debug("Modified event:" + event);
-
+ return event;
+
}
}