diff options
author | VENKATESH KUMAR <vv770d@att.com> | 2017-12-07 15:30:01 +0000 |
---|---|---|
committer | VENKATESH KUMAR <vv770d@att.com> | 2017-12-07 10:53:26 -0500 |
commit | 9346cb2915128831b4f58854362775f33cbd7415 (patch) | |
tree | 56c1d1e56a19fa549cdcc54d16d343c75aa13811 /src/main/java/org/onap/dcae/commonFunction/EventProcessor.java | |
parent | c4098a3af4794da9a1516760baa2318fb91ae258 (diff) |
code sync-up updates
Issue-ID: DCAEGEN2-212
Change-Id: Id72d2b1851ff4457295088609c355e19d19e3d73
Signed-off-by: VENKATESH KUMAR <vv770d@att.com>
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/EventProcessor.java')
-rw-r--r-- | src/main/java/org/onap/dcae/commonFunction/EventProcessor.java | 290 |
1 files changed, 143 insertions, 147 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 05e5f0ba..79dea790 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -42,154 +42,150 @@ import java.util.TimeZone; public class EventProcessor implements Runnable { - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private static final String EVENT_LITERAL = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - - private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); - public JSONObject event; - - public EventProcessor() { - log.debug("EventProcessor: Default Constructor"); - - String[] list = CommonStartup.streamid.split("\\|"); - for (String aList : list) { - String domain = aList.split("=")[0]; - //String streamIdList[] = list[i].split("=")[1].split(","); - String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); - - log.debug(String.format("Domain: %s streamIdList:%s", domain, - Arrays.toString(streamIdList))); - streamid_hash.put(domain, streamIdList); - } - - } - - @Override - public void run() { - - try { - - event = CommonStartup.fProcessingInputQueue.take(); - log.info("EventProcessor\tRemoving element: " + event); - - //EventPublisher Ep=new EventPublisher(); - while (event != null) { - // As long as the producer is running we remove elements from the queue. - - //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString()); - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") - + "event.commonEventHeader.domain:" + event.getJSONObject(EVENT_LITERAL) - .getJSONObject(COMMON_EVENT_HEADER).getString("domain")); - String[] streamIdList = streamid_hash.get( - event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER) - .getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event); - } else { - for (String aStreamIdList : streamIdList) { - log.info("Invoking publisher for streamId:" + aStreamIdList); - this.overrideEvent(); - EventPublisher.getInstance(aStreamIdList).sendEvent(event); - - } - } - log.debug("Message published" + event); - event = CommonStartup.fProcessingInputQueue.take(); - // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - } - - } - - - @SuppressWarnings({"unchecked", "rawtypes"}) - public void overrideEvent() { - //Set collector timestamp in event payload before publish - final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - JSONObject collectorTimeStamp = new JSONObject() - .put("collectorTimeStamp", sdf.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL) - .getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - - if (CommonStartup.eventTransformFlag == 1) { - // read the mapping json file - final JsonParser parser = new JsonParser(); - FileReader fr = null; - try { - fr = new FileReader ( "./etc/eventTransform.json" ); - final JsonArray jo = (JsonArray) parser.parse(fr); - log.info("parse eventTransform.json"); - // now convert to org.json - final String jsonText = jo.toString(); - final JSONArray topLevel = new JSONArray(jsonText); - //log.info("topLevel == " + topLevel); - - Class[] paramJSONObject = new Class[1]; - paramJSONObject[0] = JSONObject.class; - //load VESProcessors class at runtime - Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); - Constructor constr = cls.getConstructor(paramJSONObject); - Object obj = constr.newInstance(event); - - for (int j = 0; j < topLevel.length(); j++) { - JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter"); - Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject); - boolean filterMet = (boolean) method.invoke(obj, filterObj); - if (filterMet) { - final JSONArray processors = topLevel.getJSONObject(j) - .getJSONArray("processors"); - - //call the processor method - for (int i = 0; i < processors.length(); i++) { - final JSONObject processorList = processors.getJSONObject(i); - final String functionName = processorList.getString("functionName"); - final JSONObject args = processorList.getJSONObject("args"); - //final JSONObject filter = processorList.getJSONObject("filter"); - - log.info(String.format("functionName==%s | args==%s", functionName, - args)); - //reflect method call - method = cls.getDeclaredMethod(functionName, paramJSONObject); - method.invoke(obj, args); - } - } - } - - } catch (Exception e) { - - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause() ); - } - finally { - //close the file - if (fr != null) { - try { - fr.close(); - } catch (IOException e) { - log.error("Error closing file reader stream : " + e.toString()); - } - - } - } + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + + private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); + public JSONObject event; + + public EventProcessor() { + log.debug("EventProcessor: Default Constructor"); + + String[] list = CommonStartup.streamid.split("\\|"); + for (String aList : list) { + String domain = aList.split("=")[0]; + // String streamIdList[] = list[i].split("=")[1].split(","); + String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); + + log.debug(String.format("Domain: %s streamIdList:%s", domain, Arrays.toString(streamIdList))); + streamid_hash.put(domain, streamIdList); + } + } - //Remove VESversion from event. This field is for internal use and must be removed after use. - if (event.has("VESversion")) - event.remove("VESversion"); - log.debug("Modified event:" + event); + @Override + public void run() { + + try { + + event = CommonStartup.fProcessingInputQueue.take(); + log.info("EventProcessor\tRemoving element: " + event); + + // EventPublisher Ep=new EventPublisher(); + while (event != null) { + // As long as the producer is running we remove elements from + // the queue. + + // UUID uuid = + // UUID.fromString(event.get("VESuniqueId").toString()); + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + + event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain")); + String[] streamIdList = streamid_hash + .get(event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain")); + log.debug("streamIdList:" + streamIdList); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event); + } else { + for (String aStreamIdList : streamIdList) { + log.info("Invoking publisher for streamId:" + aStreamIdList); + this.overrideEvent(); + EventPublisher.getInstance(aStreamIdList).sendEvent(event); + + } + } + log.debug("Message published" + event); + event = CommonStartup.fProcessingInputQueue.take(); + // log.info("EventProcessor\tRemoving element: " + + // this.queue.remove()); + } + } catch (InterruptedException e) { + log.error("EventProcessor InterruptedException" + e.getMessage()); + } - } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void overrideEvent() { + // Set collector timestamp in event payload before publish + final Date currentTime = new Date(); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + + if (CommonStartup.eventTransformFlag == 1) { + // read the mapping json file + final JsonParser parser = new JsonParser(); + FileReader fr = null; + try { + fr = new FileReader("./etc/eventTransform.json"); + final JsonArray jo = (JsonArray) parser.parse(fr); + log.info("parse eventTransform.json"); + // now convert to org.json + final String jsonText = jo.toString(); + final JSONArray topLevel = new JSONArray(jsonText); + // log.info("topLevel == " + topLevel); + + Class[] paramJSONObject = new Class[1]; + paramJSONObject[0] = JSONObject.class; + // load VESProcessors class at runtime + Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); + Constructor constr = cls.getConstructor(paramJSONObject); + Object obj = constr.newInstance(event); + + for (int j = 0; j < topLevel.length(); j++) { + JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter"); + Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject); + boolean filterMet = (boolean) method.invoke(obj, filterObj); + if (filterMet) { + final JSONArray processors = topLevel.getJSONObject(j).getJSONArray("processors"); + + // call the processor method + for (int i = 0; i < processors.length(); i++) { + final JSONObject processorList = processors.getJSONObject(i); + final String functionName = processorList.getString("functionName"); + final JSONObject args = processorList.getJSONObject("args"); + // final JSONObject filter = + // processorList.getJSONObject("filter"); + + log.info(String.format("functionName==%s | args==%s", functionName, args)); + // reflect method call + method = cls.getDeclaredMethod(functionName, paramJSONObject); + method.invoke(obj, args); + } + } + } + + } catch (Exception e) { + + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } finally { + // close the file + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + log.error("Error closing file reader stream : " + e.toString()); + } + + } + } + } + // Remove VESversion from event. This field is for internal use and must + // be removed after use. + if (event.has("VESversion")) + event.remove("VESversion"); + + log.debug("Modified event:" + event); + + } } |