From 15a1bd474bdc463d744f6621e3c49761f6bf2927 Mon Sep 17 00:00:00 2001 From: Krysiak Adam Gabriel Date: Wed, 23 May 2018 15:34:37 +0200 Subject: Refactored event processor + sonar Issue-ID: DCAEGEN2-521 Change-Id: I9290f21701945cd1bb5e7a43a671991417f25491 Signed-off-by: Krysiak Adam Gabriel --- .../java/org/onap/dcae/commonFunction/Event.java | 34 +++ .../onap/dcae/commonFunction/EventProcessor.java | 298 +++++++++++---------- .../org/onap/dcae/commonFunction/Processor.java | 33 +++ 3 files changed, 218 insertions(+), 147 deletions(-) create mode 100644 src/main/java/org/onap/dcae/commonFunction/Event.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/Processor.java (limited to 'src/main/java/org/onap/dcae') diff --git a/src/main/java/org/onap/dcae/commonFunction/Event.java b/src/main/java/org/onap/dcae/commonFunction/Event.java new file mode 100644 index 00000000..faae2451 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/Event.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import com.google.gson.JsonObject; + +import java.util.List; + +class Event { + final JsonObject filter; + final List processors; + + Event(JsonObject filter, List processors) { + this.filter = filter; + this.processors = processors; + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index c7c052f9..04687b32 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -23,165 +23,169 @@ package org.onap.dcae.commonFunction; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.google.gson.JsonArray; -import com.google.gson.JsonParser; -import org.json.JSONArray; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.FileReader; -import java.lang.reflect.Constructor; +import java.io.IOException; import java.lang.reflect.Method; +import java.lang.reflect.Type; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.TimeZone; + public class EventProcessor implements Runnable { - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private static final String EVENT_LITERAL = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - - private static HashMap streamidHash = new HashMap<>(); - public JSONObject event; - - public EventProcessor() { - log.debug("EventProcessor: Default Constructor"); - - String[] list = CommonStartup.streamid.split("\\|"); - for (String aList : list) { - String domain = aList.split("=")[0]; - - String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); - - log.debug(String.format("Domain: %s streamIdList:%s", domain, Arrays.toString(streamIdList))); - streamidHash.put(domain, streamIdList); - } - - } - - @Override - public void run() { - - try { - - event = CommonStartup.fProcessingInputQueue.take(); - - while (event != null) { - // As long as the producer is running we remove elements from - // the queue. - log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size()+ "\tEventProcessor\tRemoving element: " + event ); - - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" - + event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain")); - String[] streamIdList = streamidHash - .get(event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event); - } else { - for (String aStreamIdList : streamIdList) { - log.info("Invoking publisher for streamId:" + aStreamIdList); - this.overrideEvent(); - - EventPublisherHash.getInstance().sendEvent(event, aStreamIdList); - - } - } - log.debug("Message published" + event); - event = CommonStartup.fProcessingInputQueue.take(); - - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - Thread.currentThread().interrupt(); - } - - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void overrideEvent() { - // Set collector timestamp in event payload before publish - final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - - if (CommonStartup.eventTransformFlag == 1) { - // read the mapping json file - final JsonParser parser = new JsonParser(); - FileReader fr = null; - try { - fr = new FileReader("./etc/eventTransform.json"); - final JsonArray jo = (JsonArray) parser.parse(fr); - log.info("parse eventTransform.json"); - // now convert to org.json - final String jsonText = jo.toString(); - final JSONArray topLevel = new JSONArray(jsonText); - - Class[] paramJSONObject = new Class[1]; - paramJSONObject[0] = JSONObject.class; - // load VESProcessors class at runtime - Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); - Constructor constr = cls.getConstructor(paramJSONObject); - Object obj = constr.newInstance(event); - - for (int j = 0; j < topLevel.length(); j++) { - JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter"); - Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject); - boolean filterMet = (boolean) method.invoke(obj, filterObj); - if (filterMet) { - final JSONArray processors = topLevel.getJSONObject(j).getJSONArray("processors"); - - // call the processor method - for (int i = 0; i < processors.length(); i++) { - final JSONObject processorList = processors.getJSONObject(i); - final String functionName = processorList.getString("functionName"); - final JSONObject args = processorList.getJSONObject("args"); - - - log.info(String.format("functionName==%s | args==%s", functionName, args)); - // reflect method call - method = cls.getDeclaredMethod(functionName, paramJSONObject); - method.invoke(obj, args); - } - } - } - - } catch (Exception e) { - - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } finally { - // close the file - if (fr != null) { - try { - fr.close(); - } catch (IOException e) { - log.error("Error closing file reader stream : " + e.toString()); - } - - } - } - } - // Remove VESversion from event. This field is for internal use and must - // be removed after use. - if (event.has("VESversion")) - event.remove("VESversion"); - - log.debug("Modified event:" + event); - - } + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + static final Type EVENT_LIST_TYPE = new TypeToken>() { + }.getType(); + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + private static HashMap streamidHash = new HashMap<>(); + JSONObject event; + + public EventProcessor() { + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + String[] list = CommonStartup.streamid.split("\\|"); + for (String aList : list) { + String domain = aList.split("=")[0]; + + String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); + + log.debug(String.format("Domain: %s streamIdList:%s", domain, Arrays.toString(streamIdList))); + streamidHash.put(domain, streamIdList); + } + + } + + @Override + public void run() { + + try { + + event = CommonStartup.fProcessingInputQueue.take(); + + while (event != null) { + // As long as the producer is running we remove elements from + // the queue. + log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event); + + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + + String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain); + String[] streamIdList = streamidHash.get(domain); + log.debug("streamIdList:" + Arrays.toString(streamIdList)); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event); + } else { + sendEventsToStreams(streamIdList); + } + log.debug("Message published" + event); + event = CommonStartup.fProcessingInputQueue.take(); + + } + } catch (InterruptedException e) { + log.error("EventProcessor InterruptedException" + e.getMessage()); + Thread.currentThread().interrupt(); + } + + } + + public void overrideEvent() { + // Set collector timestamp in event payload before publish + addCurrentTimeToEvent(event); + + if (CommonStartup.eventTransformFlag == 1) { + // read the mapping json file + try (FileReader fr = new FileReader("./etc/eventTransform.json")) { + log.info("parse eventTransform.json"); + List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(event))); + } catch (IOException e) { + log.error("Couldn't find file ./etc/eventTransform.json" + e.toString()); + } + } + // Remove VESversion from event. This field is for internal use and must + // be removed after use. + if (event.has("VESversion")) + event.remove("VESversion"); + + log.debug("Modified event:" + event); + + } + + private void sendEventsToStreams(String[] streamIdList) { + for (String aStreamIdList : streamIdList) { + log.info("Invoking publisher for streamId:" + aStreamIdList); + this.overrideEvent(); + EventPublisherHash.getInstance().sendEvent(event, aStreamIdList); + + } + } + + private void addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + } + + void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + + // load VESProcessors class at runtime + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { + // call the processor method + for (Processor processor : processors) { + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + + log.info(String.format("functionName==%s | args==%s", functionName, args)); + // reflect method call + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } + + static class ConfigProcessorAdapter { + private final ConfigProcessors configProcessors; + + ConfigProcessorAdapter(ConfigProcessors configProcessors) { + this.configProcessors = configProcessors; + } + + boolean isFilterMet(JSONObject parameter) { + return configProcessors.isFilterMet(parameter); + } + + void runConfigProcessorFunctionByName(String functionName, JSONObject parameter) throws ReflectiveOperationException { + Method method = configProcessors.getClass().getDeclaredMethod(functionName, parameter.getClass()); + method.invoke(configProcessors, parameter); + } + } } + diff --git a/src/main/java/org/onap/dcae/commonFunction/Processor.java b/src/main/java/org/onap/dcae/commonFunction/Processor.java new file mode 100644 index 00000000..ea79f1d3 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/Processor.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import com.google.gson.JsonObject; + +class Processor { + final String functionName; + final JsonObject args; + + Processor(String functionName, JsonObject args) { + this.functionName = functionName; + this.args = args; + } +} -- cgit 1.2.3-korg