diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/EventProcessor.java')
-rw-r--r-- | src/main/java/org/onap/dcae/commonFunction/EventProcessor.java | 365 |
1 files changed, 172 insertions, 193 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 2bc5e45b..a57ea3f0 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -1,193 +1,172 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import com.google.gson.JsonArray; -import com.google.gson.JsonParser; -import org.json.JSONArray; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileReader; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.TimeZone; - -public class EventProcessor implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private static final String EVENT_LITERAL = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - - private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); - public JSONObject event; - - public EventProcessor() { - log.debug("EventProcessor: Default Constructor"); - - String[] list = CommonStartup.streamid.split("\\|"); - for (String aList : list) { - String domain = aList.split("=")[0]; - //String streamIdList[] = list[i].split("=")[1].split(","); - String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); - - log.debug(String.format("Domain: %s streamIdList:%s", domain, - Arrays.toString(streamIdList))); - streamid_hash.put(domain, streamIdList); - } - - } - - @Override - public void run() { - - try { - - event = CommonStartup.fProcessingInputQueue.take(); - log.info("EventProcessor\tRemoving element: " + event); - - //EventPublisher Ep=new EventPublisher(); - while (event != null) { - // As long as the producer is running we remove elements from the queue. - - //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString()); - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") - + "event.commonEventHeader.domain:" + event.getJSONObject(EVENT_LITERAL) - .getJSONObject(COMMON_EVENT_HEADER).getString("domain")); - String[] streamIdList = streamid_hash.get( - event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER) - .getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event); - } else { - for (String aStreamIdList : streamIdList) { - log.info("Invoking publisher for streamId:" + aStreamIdList); - this.overrideEvent(); - EventPublisher.getInstance(aStreamIdList).sendEvent(event); - - } - } - log.debug("Message published" + event); - event = CommonStartup.fProcessingInputQueue.take(); - // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - } - - } - - - @SuppressWarnings({"unchecked", "rawtypes"}) - public void overrideEvent() { - //Set collector timestamp in event payload before publish - final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); - JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray ); - JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); - commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/ - - -/* "event": { - "commonEventHeader": { - "internalHeaderFields": { - "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" - }, -*/ - - //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); - JSONObject collectorTimeStamp = new JSONObject() - .put("collectorTimeStamp", sdf.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL) - .getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - - if (CommonStartup.eventTransformFlag == 1) { - // read the mapping json file - final JsonParser parser = new JsonParser(); - try { - final JsonArray jo = (JsonArray) parser - .parse(new FileReader("./etc/eventTransform.json")); - log.info("parse eventTransform.json"); - // now convert to org.json - final String jsonText = jo.toString(); - final JSONArray topLevel = new JSONArray(jsonText); - //log.info("topLevel == " + topLevel); - - Class[] paramJSONObject = new Class[1]; - paramJSONObject[0] = JSONObject.class; - //load VESProcessors class at runtime - Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); - Constructor constr = cls.getConstructor(paramJSONObject); - Object obj = constr.newInstance(event); - - for (int j = 0; j < topLevel.length(); j++) { - JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter"); - Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject); - boolean filterMet = (boolean) method.invoke(obj, filterObj); - if (filterMet) { - final JSONArray processors = topLevel.getJSONObject(j) - .getJSONArray("processors"); - - //call the processor method - for (int i = 0; i < processors.length(); i++) { - final JSONObject processorList = processors.getJSONObject(i); - final String functionName = processorList.getString("functionName"); - final JSONObject args = processorList.getJSONObject("args"); - //final JSONObject filter = processorList.getJSONObject("filter"); - - log.info(String.format("functionName==%s | args==%s", functionName, - args)); - //reflect method call - method = cls.getDeclaredMethod(functionName, paramJSONObject); - method.invoke(obj, args); - } - } - } - - } catch (Exception e) { - - log.error("EventProcessor Exception" + e.getMessage() + e); - log.error("EventProcessor Exception" + e.getCause()); - } - } - log.debug("Modified event:" + event); - - } -} +/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class EventProcessor implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+ private static final String EVENT_LITERAL = "event";
+ private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+ static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+
+ static Map<String, String[]> streamidHash = new HashMap<>();
+ public JSONObject event;
+ private EventPublisher eventPublisher;
+
+ public EventProcessor(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
+ streamidHash = CommonStartup.streamID.toJavaMap();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ event = CommonStartup.fProcessingInputQueue.take();
+ // As long as the producer is running we remove elements from
+ // the queue.
+ log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
+
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain);
+ String[] streamIdList = streamidHash.get(domain);
+ log.debug("streamIdList:" + Arrays.toString(streamIdList));
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ } else {
+ sendEventsToStreams(streamIdList);
+ }
+ log.debug("Message published" + event);
+ }
+ } catch (InterruptedException e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void overrideEvent() {
+ // Set collector timestamp in event payload before publish
+ addCurrentTimeToEvent(event);
+
+ if (CommonStartup.eventTransformFlag) {
+ // read the mapping json file
+ try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
+ log.info("parse eventTransform.json");
+ List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
+ parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(event)));
+ } catch (IOException e) {
+ log.error("Couldn't find file ./etc/eventTransform.json" + e.toString());
+ }
+ }
+ // Remove VESversion from event. This field is for internal use and must be removed after use.
+ if (event.has("VESversion"))
+ event.remove("VESversion");
+
+ log.debug("Modified event:" + event);
+ }
+
+ private void sendEventsToStreams(String[] streamIdList) {
+ for (String aStreamIdList : streamIdList) {
+ log.info("Invoking publisher for streamId:" + aStreamIdList);
+ this.overrideEvent();
+ eventPublisher.sendEvent(event, aStreamIdList);
+ }
+ }
+
+ private void addCurrentTimeToEvent(JSONObject event) {
+ final Date currentTime = new Date();
+ JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
+ JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
+ commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
+ event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
+ }
+
+ void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
+ // load VESProcessors class at runtime
+ for (Event eventTransform : eventsTransform) {
+ JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
+ if (configProcessorAdapter.isFilterMet(filterObj)) {
+ callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
+ }
+ }
+ }
+
+ private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
+ // call the processor method
+ for (Processor processor : processors) {
+ final String functionName = processor.functionName;
+ final JSONObject args = new JSONObject(processor.args.toString());
+
+ log.info(String.format("functionName==%s | args==%s", functionName, args));
+ // reflect method call
+ try {
+ configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
+ } catch (ReflectiveOperationException e) {
+ log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
+ }
+ }
+ }
+
+ static class ConfigProcessorAdapter {
+ private final ConfigProcessors configProcessors;
+
+ ConfigProcessorAdapter(ConfigProcessors configProcessors) {
+ this.configProcessors = configProcessors;
+ }
+
+ boolean isFilterMet(JSONObject parameter) {
+ return configProcessors.isFilterMet(parameter);
+ }
+
+ void runConfigProcessorFunctionByName(String functionName, JSONObject parameter) throws ReflectiveOperationException {
+ Method method = configProcessors.getClass().getDeclaredMethod(functionName, parameter.getClass());
+ method.invoke(configProcessors, parameter);
+ }
+ }
+}
+
|