aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/commonFunction
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java351
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java576
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java132
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java214
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java192
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventPublisher.java181
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/VESLogger.java170
7 files changed, 1816 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
new file mode 100644
index 00000000..b743f134
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
@@ -0,0 +1,351 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+
+
+import java.io.IOException;
+
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.servlet.ServletException;
+
+import org.apache.catalina.LifecycleException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.restapi.RestfulCollectorServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.att.nsa.apiServer.ApiServer;
+import com.att.nsa.apiServer.ApiServerConnector;
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
+import com.att.nsa.cmdLine.NsaCommandLineUtil;
+import com.att.nsa.drumlin.service.framework.DrumlinServlet;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jsonschema.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import com.github.fge.jsonschema.report.ProcessingMessage;
+import com.github.fge.jsonschema.report.ProcessingReport;
+import com.github.fge.jsonschema.util.JsonLoader;
+
+
+public class CommonStartup extends NsaBaseEndpoint implements Runnable
+{
+ public static final String kConfig = "c";
+
+ public static final String kSetting_Port = "collector.service.port";
+ public static final int kDefault_Port = 8080;
+
+ public static final String kSetting_SecurePort = "collector.service.secure.port";
+ public static final int kDefault_SecurePort = -1;
+
+ public static final String kSetting_KeystorePassfile = "collector.keystore.passwordfile";
+ public static final String kDefault_KeystorePassfile = "../etc/passwordfile";
+ public static final String kSetting_KeystoreFile = "collector.keystore.file.location";
+ public static final String kDefault_KeystoreFile = "../etc/keystore";
+ public static final String kSetting_KeyAlias = "collector.keystore.alias";
+ public static final String kDefault_KeyAlias = "tomcat";
+
+ public static final String kSetting_DmaapConfigs = "collector.dmaapfile";
+ protected static final String[] kDefault_DmaapConfigs = new String[] { "/etc/DmaapConfig.json" };
+
+ public static final String kSetting_MaxQueuedEvents = "collector.inputQueue.maxPending";
+ public static final int kDefault_MaxQueuedEvents = 1024*4;
+
+ public static final String kSetting_schemaValidator = "collector.schema.checkflag";
+ public static final int kDefault_schemaValidator = -1;
+
+ public static final String kSetting_schemaFile = "collector.schema.file";
+ public static final String kDefault_schemaFile = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
+ public static final String kSetting_ExceptionConfig = "exceptionConfig";
+
+ public static final String kSetting_dmaapStreamid = "collector.dmaap.streamid";
+
+ public static final String kSetting_authflag = "header.authflag";
+ public static final int kDefault_authflag = 0;
+
+ public static final String kSetting_authid = "header.authid";
+ public static final String kSetting_authpwd = "header.authpwd";
+ public static final String kSetting_authstore = "header.authstore";
+ public static final String kSetting_authlist = "header.authlist";
+
+ public static final String kSetting_eventTransformFlag = "event.transform.flag";
+ public static final int kDefault_eventTransformFlag = 1;
+
+
+ public static final Logger inlog = LoggerFactory.getLogger ("org.onap.dcae.commonFunction.input" );
+ public static final Logger oplog = LoggerFactory.getLogger ("org.onap.dcae.commonFunction.output");
+ public static final Logger eplog = LoggerFactory.getLogger ("org.onap.dcae.commonFunction.error");
+ public static final Logger metriclog = LoggerFactory.getLogger ("com.att.ecomp.metrics" );
+
+ public static int schema_Validatorflag = -1;
+ public static int authflag = 1;
+ public static int eventTransformFlag = 1;
+ public static String schemaFile = null;
+ public static JSONObject schemaFileJson = null;
+ public static String exceptionConfig = null;
+ public static String cambriaConfigFile = null;
+ private boolean listnerstatus = false;
+ static String streamid = null;
+
+
+ private CommonStartup(rrNvReadable settings) throws loadException, missingReqdSetting, IOException, rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, InterruptedException
+ {
+ final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector> ();
+
+ if (settings.getInt ( kSetting_Port, kDefault_Port ) > 0)
+ {
+ // http service
+ connectors.add (
+ new ApiServerConnector.Builder ( settings.getInt ( kSetting_Port, kDefault_Port ) )
+ .secure ( false )
+ .build ()
+ );
+ }
+
+ // optional https service
+ final int securePort = settings.getInt(kSetting_SecurePort, kDefault_SecurePort);
+ final String keystoreFile = settings.getString(kSetting_KeystoreFile, kDefault_KeystoreFile);
+ final String keystorePasswordFile = settings.getString(kSetting_KeystorePassfile, kDefault_KeystorePassfile);
+ final String keyAlias = settings.getString (kSetting_KeyAlias, kDefault_KeyAlias);
+
+
+ if (securePort > 0)
+ {
+ final String kSetting_KeystorePass = readFile(keystorePasswordFile, Charset.defaultCharset());
+ connectors.add(new ApiServerConnector.Builder(securePort)
+ .secure(true)
+ .keystorePassword(kSetting_KeystorePass)
+ .keystoreFile(keystoreFile)
+ .keyAlias(keyAlias)
+ .build());
+
+ }
+
+ //Reading other config properties
+
+ schema_Validatorflag = settings.getInt(kSetting_schemaValidator, kDefault_schemaValidator );
+ if (schema_Validatorflag > 0){
+ schemaFile = settings.getString(kSetting_schemaFile,kDefault_schemaFile);
+ //System.out.println("SchemaFile:" + schemaFile);
+ schemaFileJson = new JSONObject(schemaFile);
+
+ }
+ exceptionConfig = settings.getString(kSetting_ExceptionConfig, null);
+ authflag = settings.getInt(CommonStartup.kSetting_authflag, CommonStartup.kDefault_authflag );
+ String [] currentconffile = settings.getStrings (CommonStartup.kSetting_DmaapConfigs, CommonStartup.kDefault_DmaapConfigs ) ;
+ cambriaConfigFile= currentconffile[0] ;
+ streamid = settings.getString(kSetting_dmaapStreamid,null);
+ eventTransformFlag = settings.getInt(kSetting_eventTransformFlag, kDefault_eventTransformFlag);
+
+ fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings))
+ .encodeSlashes(true)
+ .name("collector")
+ .build();
+
+
+ //Load override exception map
+ CustomExceptionLoader.LoadMap();
+ setListnerstatus(true);
+ }
+
+ public static void main ( String[] args )
+ {
+ ExecutorService executor = null;
+ try
+ {
+ // process command line arguments
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine ( args, true );
+ final String config = NsaCommandLineUtil.getSetting ( argMap, kConfig, "collector.properties" );
+ final URL settingStream = DrumlinServlet.findStream ( config, CommonStartup.class );
+
+ final nvReadableStack settings = new nvReadableStack ();
+ settings.push ( new nvPropertiesFile ( settingStream ) );
+ settings.push ( new nvReadableTable ( argMap ) );
+
+ fProcessingInputQueue = new LinkedBlockingQueue<JSONObject> (CommonStartup.kDefault_MaxQueuedEvents);
+
+ VESLogger.setUpEcompLogging();
+
+ CommonStartup cs= new CommonStartup ( settings );
+
+ Thread csmain = new Thread(cs);
+ csmain.start();
+
+
+ EventProcessor ep = new EventProcessor ();
+ //Thread epThread=new Thread(ep);
+ //epThread.start();
+ executor = Executors.newFixedThreadPool(20);
+ executor.execute(ep);
+
+ }
+ catch ( loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException | InterruptedException e )
+ {
+ CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage() );
+ throw new RuntimeException ( e );
+ }
+ finally
+ {
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ if (executor != null){
+ executor.shutdown();
+ }
+
+ }
+ }
+
+ public void run() {
+ try {
+ fTomcatServer.start ();
+ } catch (LifecycleException | IOException e) {
+
+ e.printStackTrace();
+ }
+ fTomcatServer.await ();
+ }
+
+ public boolean isListnerstatus() {
+ return listnerstatus;
+ }
+
+ public void setListnerstatus(boolean listnerstatus) {
+ this.listnerstatus = listnerstatus;
+ }
+ public static Queue<JSONObject> getProcessingInputQueue ()
+ {
+ return fProcessingInputQueue;
+ }
+
+ public static class QueueFullException extends Exception
+ {
+ private static final long serialVersionUID = 1L;
+ }
+
+
+ public static void handleEvents ( JSONArray a ) throws QueueFullException, JSONException, IOException
+ {
+ final Queue<JSONObject> queue = getProcessingInputQueue ();
+ try
+ {
+
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START" );
+ for (int i = 0; i < a.length(); i++) {
+ if ( !queue.offer ( a.getJSONObject(i) ) ) {
+ throw new QueueFullException ();
+ }
+
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
+ //ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
+
+ }
+ catch ( JSONException e ){
+ throw e;
+
+ }
+ }
+
+
+ static String readFile(String path, Charset encoding)
+ throws IOException
+ {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ String pwd = new String(encoded);
+ return pwd.substring(0,pwd.length()-1);
+ }
+
+
+ public static String schemavalidate( String jsonData, String jsonSchema) {
+ ProcessingReport report = null;
+ String result = "false";
+
+ try {
+ //System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to data: #<#<"+jsonData+">#>#");
+ log.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ log.error("schemavalidate:JsonParseException for event:" + jsonData );
+ System.out.println(e.getMessage());
+ return e.getMessage().toString();
+ } catch (ProcessingException e) {
+ log.error("schemavalidate:Processing exception for event:" + jsonData );
+ System.out.println(e.getMessage());
+ return e.getMessage().toString();
+ } catch (IOException e) {
+ log.error("schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ System.out.println(e.getMessage());
+ return e.getMessage().toString();
+ }
+ if (report != null) {
+ Iterator<ProcessingMessage> iter = report.iterator();
+ while (iter.hasNext()) {
+ ProcessingMessage pm = iter.next();
+ log.trace("Processing Message: "+pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ log.debug("Validation Result:" +result + " Validation report:" + report);
+ }
+ catch (NullPointerException e){
+ log.error("schemavalidate:NullpointerException on report");
+ }
+ return result;
+ }
+
+
+
+ static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApiServer fTomcatServer = null;
+ private static final Logger log = LoggerFactory.getLogger ( CommonStartup.class );
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
new file mode 100644
index 00000000..ff6c328c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
@@ -0,0 +1,576 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigProcessors {
+
+ private static Logger log = LoggerFactory.getLogger(ConfigProcessors.class);
+
+ public ConfigProcessors(JSONObject eventJson)
+ {
+ event = eventJson;
+ }
+
+ /**
+ *
+ */
+ public void getValue(JSONObject J)
+ {
+ //log.info("addAttribute");
+ final String field = J.getString("field");
+ //final String value = J.getString("value");
+ final JSONObject filter = J.optJSONObject("filter");
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("field ==" + field);
+ //log.info("value ==" + value);
+ getEventObjectVal(field);
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void setValue(JSONObject J)
+ {
+ //log.info("addAttribute");
+ final String field = J.getString("field");
+ final String value = J.getString("value");
+ final JSONObject filter = J.optJSONObject("filter");
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("field ==" + field);
+ //log.info("value ==" + value);
+ setEventObjectVal(field, value);
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void addAttribute(JSONObject J)
+ {
+ //log.info("addAttribute");
+ final String field = J.getString("field");
+ final String value = J.getString("value");
+ final JSONObject filter = J.optJSONObject("filter");
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("field ==" + field);
+ //log.info("value ==" + value);
+ setEventObjectVal(field, value);
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void updateAttribute(JSONObject J)
+ {
+ //log.info("updateAttribute");
+ final String field = J.getString("field");
+ final String value = J.getString("value");
+ final JSONObject filter = J.optJSONObject("filter");
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("field ==" + field);
+ //log.info("value ==" + value);
+ setEventObjectVal(field, value);
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void removeAttribute(JSONObject J)
+ {
+ //log.info("removeAttribute");
+ final String field = J.getString("field");
+ final JSONObject filter = J.optJSONObject("filter");
+
+ if (filter == null || isFilterMet(filter))
+ {
+ removeEventKey(field);
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void renameArrayInArray(JSONObject J) //map
+ {
+ log.info("renameArrayInArray");
+ final String field = J.getString("field");
+ final String oldField = J.getString("oldField");
+ final JSONObject filter = J.optJSONObject("filter");
+ //String value = "";
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("field ==" + field);
+ final String[] fsplit = field.split("\\[\\]", field.length());
+ final String[] oldfsplit = oldField.split("\\[\\]", oldField.length());
+ /*for (int i=0; i< oldfsplit.length; i++ )
+ {
+ log.info( "renameArrayInArray " + i + " ==" + oldfsplit[i]);
+ }*/
+
+ final String oldValue = getEventObjectVal(oldfsplit[0]).toString();
+ if (!oldValue.equals("ObjectNotFound")){
+ final String oldArrayName = oldfsplit[1].substring(1);
+ final String newArrayName = fsplit[1].substring(1);
+ final String value = oldValue.replaceAll(oldArrayName, newArrayName);
+ //log.info("oldArrayName ==" + oldArrayName);
+ //log.info("newArrayName ==" + newArrayName);
+ log.info("oldValue ==" + oldValue);
+ log.info("value ==" + value);
+ JSONArray ja = new JSONArray(value);
+ removeEventKey(oldfsplit[0]);
+ setEventObjectVal(fsplit[0], ja);
+ }
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void map(JSONObject J)
+ {
+ //log.info("mapAttribute");
+ final String field = J.getString("field");
+ if (field.contains("[]"))
+ {
+ if (field.matches(".*\\[\\]\\..*\\[\\]"))
+ renameArrayInArray(J);
+ else
+ mapToJArray(J);
+ }
+ else
+ mapAttribute(J);
+ }
+
+ /**
+ *
+ */
+ public String performOperation(String operation, String value)
+ {
+ log.info("performOperation");
+ if (operation != null)
+ {
+ if (operation.equals("convertMBtoKB"))
+ {
+ float kbValue = Float.parseFloat(value) * 1024;
+ value = String.valueOf(kbValue);
+ }
+ }
+ return value;
+ }
+
+ /**
+ *
+ */
+ //public void mapAttributeToArrayAttribute(JSONObject J)
+ public void mapAttribute(JSONObject J)
+ {
+ //log.info("mapAttribute");
+ final String field = J.getString("field");
+ final String oldField = J.getString("oldField");
+ final JSONObject filter = J.optJSONObject("filter");
+ final String operation = J.optString("operation");
+ String value = "";
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("field ==" + field);
+
+ value = getEventObjectVal(oldField).toString();
+ if (!value.equals("ObjectNotFound"))
+ {
+ if (operation != null && !operation.equals(""))
+ value = performOperation(operation, value);
+ //log.info("value ==" + value);
+ setEventObjectVal(field, value);
+
+ removeEventKey(oldField);
+ }
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ public void mapToJArray(JSONObject J)
+ {
+ log.info("mapToJArray");
+ String field = J.getString("field");
+ String oldField = J.getString("oldField");
+ final JSONObject filter = J.optJSONObject("filter");
+ final JSONObject attrMap = J.optJSONObject("attrMap");
+ oldField = oldField.replaceAll("\\[\\]", "");
+ field = field.replaceAll("\\[\\]", "");
+
+ //log.info("oldField ==" + field);
+ if (filter == null || isFilterMet(filter))
+ {
+ //log.info("oldField ==" + field);
+ String value = getEventObjectVal(oldField).toString();
+ if (!value.equals("ObjectNotFound"))
+ {
+ log.info("old value ==" + value.toString());
+ //update old value based on attrMap
+ if (attrMap != null)
+ {
+ //loop thru attrMap and update attribute name to new name
+ for (String key : attrMap.keySet())
+ {
+ //log.info("attr key==" + key + " value==" + attrMap.getString(key));
+ value = value.replaceAll(key, attrMap.getString(key));
+ }
+ }
+
+ log.info("new value ==" + value);
+ char c = value.charAt(0);
+ if (c != '[')
+ {
+ //oldfield is JsonObject
+ JSONObject valueJO = new JSONObject(value);
+ // if the array already exists
+
+ String existingValue = getEventObjectVal(field).toString();
+ if (!existingValue.equals("ObjectNotFound"))
+ {
+ JSONArray ja = new JSONArray(existingValue);
+ JSONObject jo = ja.optJSONObject(0);
+ if (jo != null)
+ {
+ for (String key : valueJO.keySet())
+ {
+ jo.put(key, valueJO.get(key));
+
+ }
+ ja.put(0, jo);
+ //log.info("jarray== " + ja.toString());
+ setEventObjectVal(field,ja);
+ }
+ }
+ else //if new array
+ setEventObjectVal(field + "[0]", new JSONObject(value));
+ }
+ else //oldfield is jsonArray
+ setEventObjectVal(field, new JSONArray(value));
+
+ removeEventKey(oldField);
+ }
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ * example -
+ {
+ "functionName": "concatenateValue",
+ "args":{
+ "filter": {"event.commonEventHeader.event":"heartbeat"},
+ "field":"event.commonEventHeader.eventName",
+ "concatenate": ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"],
+ "delimiter":"_"
+ }
+ }
+ **/
+ public void concatenateValue(JSONObject J)
+ {
+ //log.info("concatenateValue");
+ final String field = J.getString("field");
+ final String delimiter = J.getString("delimiter");
+ final JSONArray values = J.getJSONArray("concatenate");
+ final JSONObject filter = J.optJSONObject("filter");
+ if (filter == null || isFilterMet(filter))
+ {
+ String value = "";
+ for (int i=0; i < values.length(); i++)
+ {
+ //log.info(values.getString(i));
+ String tempVal = getEventObjectVal(values.getString(i)).toString();
+ if (!tempVal.equals("ObjectNotFound"))
+ {
+ if (i ==0)
+ value = value + getEventObjectVal(values.getString(i));
+ else
+ value = value + delimiter + getEventObjectVal(values.getString(i));
+ }
+ }
+ //log.info("value ==" + value);
+ setEventObjectVal(field, value);
+ }
+ else
+ log.info("Filter not met");
+ }
+
+ /**
+ *
+ */
+ private void removeEventKey(String field)
+ {
+ String[] keySet = field.split("\\.",field.length());
+ JSONObject keySeries = event;
+ for (int i=0; i<(keySet.length -1); i++ )
+ {
+ //log.info( i + " ==" + keySet[i]);
+ keySeries = keySeries.getJSONObject(keySet[i]);
+ }
+ //log.info(keySet[keySet.length -1]);
+
+ keySeries.remove(keySet[keySet.length -1]);
+
+ }
+
+ /**
+ *
+ */
+ private boolean checkFilter(JSONObject jo, String key, String logicKey)
+ {
+ String filterValue = jo.getString(key);
+ boolean retVal = true;
+
+ if(filterValue.contains(":"))
+ {
+ String[] splitVal = filterValue.split(":");
+ //log.info(splitVal[0] + " " + splitVal[1]);
+ if (splitVal[0].equals("matches"))
+ {
+ if (logicKey.equals("not"))
+ {
+ //log.info("not");
+ //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]);
+ if (getEventObjectVal(key).toString().matches(splitVal[1]))
+ {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ return false;
+ }
+ }
+ else
+ {
+ if (!(getEventObjectVal(key).toString().matches(splitVal[1])))
+ {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ return false;
+ }
+ }
+
+ }
+ if (splitVal[0].equals("contains"))
+ {
+ if (logicKey.equals("not"))
+ {
+ //log.info("not");
+ //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]);
+ if (getEventObjectVal(key).toString().contains(splitVal[1]))
+ {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ return false;
+ }
+ }
+ else
+ {
+ if (!(getEventObjectVal(key).toString().contains(splitVal[1])))
+ {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ return false;
+ }
+ }
+
+ }
+ }
+ else
+ {
+ if (logicKey.equals("not"))
+ {
+ if(getEventObjectVal(key).toString().equals(filterValue))
+ {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ return false;
+ }
+ }
+ else
+ {
+ if(!(getEventObjectVal(key).toString().equals(filterValue)))
+ {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ return false;
+ }
+ }
+ }
+ return retVal;
+ }
+ /**
+ *
+ */
+ public boolean isFilterMet(JSONObject jo)
+ {
+ boolean retval = true;
+ //log.info("Filter==" + jo.toString());
+ for (String key : jo.keySet())
+ {
+ if (key.equals("not"))
+ {
+ JSONObject njo = jo.getJSONObject(key);
+ for (String njoKey : njo.keySet())
+ {
+ //log.info(njoKey);
+ retval = checkFilter(njo, njoKey, key);
+ if (retval == false)
+ return retval;
+ }
+ }
+ else
+ {
+ //log.info(key);
+ //final String filterKey = key;
+ retval = checkFilter(jo, key, key);
+ if (retval == false)
+ return retval;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * returns a string or JSONObject or JSONArray
+ **/
+ public Object getEventObjectVal(String keySeriesStr)
+ {
+ keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
+ keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
+ if (keySeriesStr.contains(".."))
+ {
+ keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
+ }
+ //log.info(Integer.toString(keySeriesStr.lastIndexOf(".")));
+ //log.info(Integer.toString(keySeriesStr.length() -1));
+ if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 )
+ keySeriesStr = keySeriesStr.substring(0,keySeriesStr.length()-1 );
+ String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
+ Object keySeriesObj = event;
+ for (int i=0; i<(keySet.length); i++ )
+ {
+ //log.info( "getEventObject " + i + " ==" + keySet[i]);
+ if (keySeriesObj != null)
+ {
+ if (keySeriesObj instanceof String)
+ {
+ //keySeriesObj = keySeriesObj.get(keySet[i]);
+ log.info("STRING==" + keySeriesObj);
+ }
+ else if (keySeriesObj instanceof JSONArray) {
+ keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
+ //log.info("ARRAY==" + keySeriesObj);
+ }
+ else if (keySeriesObj instanceof JSONObject) {
+ keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]);
+ //log.info("JSONObject==" + keySeriesObj);
+ }
+ else
+ {
+ log.info("unknown object==" + keySeriesObj);
+ }
+ }
+ }
+
+ if (keySeriesObj == null)
+ return "ObjectNotFound";
+ return keySeriesObj;
+ }
+
+ /**
+ * returns a string or JSONObject or JSONArray
+ **/
+ public void setEventObjectVal(String keySeriesStr, Object value)
+ {
+ keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
+ keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
+ if (keySeriesStr.contains(".."))
+ {
+ keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
+ }
+ //log.info(Integer.toString(keySeriesStr.lastIndexOf(".")));
+ //log.info(Integer.toString(keySeriesStr.length() -1));
+ if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 )
+ keySeriesStr = keySeriesStr.substring(0,keySeriesStr.length()-1 );
+ String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
+ Object keySeriesObj = event;
+ for (int i=0; i<(keySet.length -1); i++ )
+ {
+ //log.info( "setEventObject " + i + " ==" + keySet[i]);
+ if (keySeriesObj instanceof JSONArray) {
+ //keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
+ if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) //if the object is not there then add it
+ {
+ log.info("Object is null, must add it");
+ if (keySet[i+1].matches("[0-9]*")) // if index then array
+ ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray());
+ else
+ ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject());
+ }
+ keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
+ //log.info("ARRAY==" + keySeriesObj);
+ }
+ else if (keySeriesObj instanceof JSONObject) {
+ if (( (JSONObject) keySeriesObj).opt(keySet[i]) == null) //if the object is not there then add it
+ {
+ if (keySet[i+1].matches("[0-9]*")) // if index then array
+ ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray());
+ else
+ ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject());
+ log.info("Object is null, must add it");
+ }
+ keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]);
+ //log.info("JSONObject==" + keySeriesObj);
+ }
+ else
+ {
+ log.info("unknown object==" + keySeriesObj);
+ }
+ }
+
+ ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], value);
+ }
+
+ private JSONObject event = new JSONObject();
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
new file mode 100644
index 00000000..175ad443
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
@@ -0,0 +1,132 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
+
+public class CustomExceptionLoader {
+
+ public static HashMap<String, JsonArray> map = null;
+ private static final Logger log = LoggerFactory.getLogger ( CustomExceptionLoader.class );
+
+ //For standalone test
+ //LoadMap Invoked from servletSetup
+ /*
+ public static void main(String[] args) {
+
+ System.out.println("CustomExceptionLoader.main --> Arguments -- ExceptionConfig file: " + args[0] + "StatusCode:" + args[1]+ " Error Msg:" + args[2]);
+ CommonStartup.exceptionConfig = args[0];
+
+ //Read the Custom exception JSON file into map
+ LoadMap();
+ System.out.println("CustomExceptionLoader.main --> Map info post LoadMap:" + map);
+
+ String[] str= LookupMap(args[1],args[2]);
+ if (! (str==null)) {
+ System.out.println("CustomExceptionLoader.main --> Return from lookup function" + str[0] + "value:" + str[1]);
+ }
+
+ }
+ */
+
+ public static void LoadMap () {
+
+ map = new HashMap<String, JsonArray>();
+ FileReader fr = null;
+ try {
+ JsonElement root = null;
+ fr = new FileReader(CommonStartup.exceptionConfig);
+ root = new JsonParser().parse(fr);
+ JsonObject jsonObject = root.getAsJsonObject().get("code").getAsJsonObject();
+
+ for (Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+ map.put(entry.getKey(), (JsonArray) entry.getValue());
+ }
+
+ log.debug("CustomExceptionLoader.LoadMap --> Map loaded - " + map);
+ } catch (JsonIOException e) {
+ e.printStackTrace();
+ } catch (JsonSyntaxException e) {
+ e.printStackTrace();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ log.error("Error closing file reader stream : " +e.toString());
+ }
+ }
+ }
+ }
+
+ public static String[] LookupMap (String error, String errormsg) {
+
+ String[] retarray = null;
+
+ log.debug("CustomExceptionLoader.LookupMap -->" + " HTTP StatusCode:" + error + " Msg:" + errormsg);
+ try{
+
+ JsonArray jarray = map.get(error);
+ for (int i = 0; i < jarray.size(); i++) {
+
+ JsonElement val = jarray.get(i).getAsJsonObject().get("Reason");
+ JsonArray ec = (JsonArray) jarray.get(i).getAsJsonObject().get("ErrorCode");
+ log.trace("CustomExceptionLoader.LookupMap Parameter -> Error msg : " + errormsg + " Reason text being matched:" + val);
+ if (errormsg.contains(val.toString().replace("\"", ""))){
+ log.trace("CustomExceptionLoader.LookupMap Successful! Exception matched to error message StatusCode:" + ec.get(0).toString() + "ErrorMessage:" + ec.get(1).toString());
+ retarray = new String[2];
+ retarray[0]=ec.get(0).toString();
+ retarray[1]=ec.get(1).toString();
+ return retarray;
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ System.out.println(e.getMessage());
+ }
+
+ return retarray;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
new file mode 100644
index 00000000..62383f5e
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
@@ -0,0 +1,214 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import java.io.FileNotFoundException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
+public class DmaapPropertyReader {
+
+ private static DmaapPropertyReader instance = null;
+
+ private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class);
+
+ public HashMap<String, String> dmaap_hash = new HashMap<String, String>();
+
+ private DmaapPropertyReader(String CambriaConfigFile) {
+
+ FileReader fr = null;
+ try {
+ JsonElement root = null;
+ fr = new FileReader(CambriaConfigFile);
+ root = new JsonParser().parse(fr);
+
+ //Check if dmaap config is handled by legacy controller/service manager
+ if (root.getAsJsonObject().has("channels")) {
+ JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels");
+
+ for (int i = 0; i < jsonObject.size(); i++) {
+ log.debug("TOPIC:" + jsonObject.get(i).getAsJsonObject().get("cambria.topic") + " HOST-URL:"
+ + jsonObject.get(i).getAsJsonObject().get("cambria.url") + " HOSTS:"
+ + jsonObject.get(i).getAsJsonObject().get("cambria.hosts") + " PWD:"
+ + jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") + " USER:"
+ + jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") + " NAME:"
+ + jsonObject.get(i).getAsJsonObject().get("name"));
+
+ String convertedname = jsonObject.get(i).getAsJsonObject().get("name").toString().replace("\"", "");
+ dmaap_hash.put(convertedname + ".cambria.topic",
+ jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString().replace("\"", ""));
+
+ if (jsonObject.get(i).getAsJsonObject().get("cambria.hosts") != null) {
+ dmaap_hash.put(convertedname + ".cambria.hosts",
+ jsonObject.get(i).getAsJsonObject().get("cambria.hosts").toString().replace("\"", ""));
+ }
+ if (jsonObject.get(i).getAsJsonObject().get("cambria.url") != null) {
+ dmaap_hash.put(convertedname + ".cambria.url",
+ jsonObject.get(i).getAsJsonObject().get("cambria.url").toString().replace("\"", ""));
+ }
+ if (jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") != null) {
+ dmaap_hash.put(convertedname + ".basicAuthPassword", jsonObject.get(i).getAsJsonObject()
+ .get("basicAuthPassword").toString().replace("\"", ""));
+ }
+ if (jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") != null) {
+ dmaap_hash.put(convertedname + ".basicAuthUsername", jsonObject.get(i).getAsJsonObject()
+ .get("basicAuthUsername").toString().replace("\"", ""));
+ }
+
+ }
+ } else {
+
+ //Handing new format from controllergen2/config_binding_service
+ JsonObject jsonObject = root.getAsJsonObject();
+ Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet();
+
+ for (Map.Entry<String, JsonElement> entry : entries) {
+
+ JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info").getAsJsonObject().get("topic_url");
+ String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", ""));
+
+ String mrTopic = null;
+ String mrUrl = null;
+ String[] hostport = null;
+ String username = null;
+ String userpwd = null;
+
+ try {
+
+ if (null != urlParts) {
+ mrUrl = urlParts[2];
+
+ // DCAE internal dmaap topic convention
+ if (urlParts[3].equals("events")) {
+ mrTopic = urlParts[4];
+ } else {
+ // ONAP dmaap topic convention
+ mrTopic = urlParts[3];
+ hostport = mrUrl.split(":");
+ }
+
+ }
+ } catch (NullPointerException e) {
+ System.out.println("NullPointerException");
+ e.getMessage();
+ }
+
+ if (entry.getValue().getAsJsonObject().has("aaf_username")) {
+ username = entry.getValue().getAsJsonObject().get("aaf_username").toString().replace("\"", "");
+ }
+ if (entry.getValue().getAsJsonObject().has("aaf_password")) {
+ userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString().replace("\"", "");
+ }
+ if (hostport == null) {
+ log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " PWD:" + userpwd + " USER:" + username);
+ } else {
+ log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " HOSTS:" + hostport[0] + " PWD:"
+ + userpwd + " USER:" + username + " NAME:" + entry.getKey());
+ }
+
+ dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic);
+
+ if (!(hostport == null)) {
+ dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]);
+ }
+
+ if (!(mrUrl == null)) {
+ dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl);
+ }
+
+ if (!(username == null)) {
+ dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username);
+ }
+
+ if (!(userpwd == null)) {
+ dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd);
+ }
+
+ }
+
+ }
+
+ } catch (JsonIOException | JsonSyntaxException |
+
+ FileNotFoundException e1) {
+ e1.printStackTrace();
+ log.error("Problem loading Dmaap Channel configuration file: " + e1.toString());
+ } finally {
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ log.error("Error closing file reader stream : " + e.toString());
+ }
+ }
+ }
+
+ }
+
+ /***
+ * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/
+ * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>:
+ * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1";
+ *
+ * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>.
+ * <topic>,
+ */
+
+ private String[] dmaapUrlSplit(String dmUrl) {
+ String[] multUrls = dmUrl.split(",");
+
+ StringBuffer newUrls = new StringBuffer();
+ String urlParts[] = null;
+ for (int i = 0; i < multUrls.length; i++) {
+ urlParts = multUrls[i].split("/");
+ if (i == 0) {
+ newUrls = newUrls.append(urlParts[2]);
+ } else {
+ newUrls = newUrls.append(",").append(urlParts[2]);
+ }
+ }
+ return urlParts;
+ }
+
+ public static synchronized DmaapPropertyReader getInstance(String ChannelConfig) {
+ if (instance == null) {
+ instance = new DmaapPropertyReader(ChannelConfig);
+ }
+ return instance;
+ }
+
+ public String getKeyValue(String HashKey) {
+ return this.dmaap_hash.get(HashKey);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
new file mode 100644
index 00000000..4cdba066
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -0,0 +1,192 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import java.io.FileReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonParser;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+public class EventProcessor implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+
+ private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
+ private JSONObject event = null;
+
+ public EventProcessor() {
+ log.debug("EventProcessor: Default Constructor");
+
+ String list[] = CommonStartup.streamid.split("\\|");
+ for (int i = 0; i < list.length; i++) {
+ String domain = list[i].split("=")[0];
+ //String streamIdList[] = list[i].split("=")[1].split(",");
+ String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(",");
+
+ log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList));
+ streamid_hash.put(domain, streamIdList);
+ }
+
+ }
+
+ @Override
+ public void run() {
+
+ try {
+
+ event = CommonStartup.fProcessingInputQueue.take();
+ log.info("EventProcessor\tRemoving element: " + event);
+
+ //EventPublisher Ep=new EventPublisher();
+ while (event != null) {
+ // As long as the producer is running we remove elements from the queue.
+
+ //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString());
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
+ localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
+
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
+ String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
+ log.debug("streamIdList:" + streamIdList);
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event.toString());
+ }
+
+ else {
+ for (int i=0; i < streamIdList.length; i++)
+ {
+ log.info("Invoking publisher for streamId:" + streamIdList[i]);
+ this.overrideEvent();
+ EventPublisher.getInstance(streamIdList[i]).sendEvent(event);
+
+ }
+ }
+ log.debug("Message published" + event.toString());
+ event = CommonStartup.fProcessingInputQueue.take();
+ // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
+ }
+ } catch (InterruptedException e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ }
+
+ }
+
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void overrideEvent()
+ {
+ //Set collector timestamp in event payload before publish
+ final Date currentTime = new Date();
+ final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
+ JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
+ JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
+ commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/
+
+
+/* "event": {
+ "commonEventHeader": {
+ ….
+ "internalHeaderFields": {
+ "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT"
+ },
+*/
+
+ //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
+ JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) );
+ JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
+ commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
+ event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey);
+
+ if (CommonStartup.eventTransformFlag == 1)
+ {
+ // read the mapping json file
+ final JsonParser parser = new JsonParser();
+ try {
+ final JsonArray jo = (JsonArray) parser.parse ( new FileReader ( "./etc/eventTransform.json" ) );
+ log.info("parse eventTransform.json");
+ // now convert to org.json
+ final String jsonText = jo.toString ();
+ final JSONArray topLevel = new JSONArray ( jsonText );
+ //log.info("topLevel == " + topLevel);
+
+ Class[] paramJSONObject = new Class[1];
+ paramJSONObject[0] = JSONObject.class;
+ //load VESProcessors class at runtime
+ Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors");
+ Constructor constr = cls.getConstructor(paramJSONObject);
+ Object obj = constr.newInstance(event);
+
+ for (int j=0; j<topLevel.length(); j++)
+ {
+ JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter");
+ Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject);
+ boolean filterMet = (boolean) method.invoke (obj, filterObj );
+ if (filterMet)
+ {
+ final JSONArray processors = (JSONArray)topLevel.getJSONObject(j).getJSONArray("processors");
+
+ //call the processor method
+ for (int i=0; i < processors.length(); i++)
+ {
+ final JSONObject processorList = processors.getJSONObject(i);
+ final String functionName = processorList.getString("functionName");
+ final JSONObject args = processorList.getJSONObject("args");
+ //final JSONObject filter = processorList.getJSONObject("filter");
+
+ log.info("functionName==" + functionName + " | args==" + args);
+ //reflect method call
+ method = cls.getDeclaredMethod(functionName, paramJSONObject);
+ method.invoke(obj, args);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+
+ log.error("EventProcessor Exception" + e.getMessage() + e);
+ log.error("EventProcessor Exception" + e.getCause());
+ }
+ }
+ log.debug("Modified event:" + event);
+
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
new file mode 100644
index 00000000..a5acb857
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
@@ -0,0 +1,181 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import java.io.IOException;
+
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import java.security.GeneralSecurityException;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+
+
+public class EventPublisher {
+
+ private static EventPublisher instance = null;
+ private static CambriaBatchingPublisher pub = null;
+
+ private String streamid = "";
+ private String ueburl="";
+ private String topic="";
+ private String authuser="";
+ private String authpwd="";
+
+ private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
+
+
+ private EventPublisher( String newstreamid) {
+
+ this.streamid = newstreamid;
+ try {
+ ueburl=DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.url");
+
+ if (ueburl==null){
+ ueburl= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.hosts");
+ }
+ topic= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".cambria.topic");
+ authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".basicAuthUsername");
+
+
+ if (authuser != null) {
+ authpwd= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".basicAuthPassword");
+ }
+ }
+ catch(Exception e) {
+ log.error("CambriaClientBuilders connection reader exception : " + e.getMessage());
+
+ }
+
+ }
+
+
+ public static synchronized EventPublisher getInstance( String streamid){
+ if (instance == null) {
+ instance = new EventPublisher(streamid);
+ }
+ if (!instance.streamid.equals(streamid)){
+ instance.closePublisher();
+ instance = new EventPublisher(streamid);
+ }
+ return instance;
+
+ }
+
+
+ public synchronized void sendEvent(JSONObject event) {
+
+ log.debug("EventPublisher.sendEvent: instance for publish is ready");
+
+
+ if (event.has("VESuniqueId"))
+ {
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
+ localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
+ log.debug("Removing VESuniqueid object from event");
+ event.remove("VESuniqueId");
+ }
+
+
+
+
+ try {
+
+ if (authuser != null)
+ {
+ log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd);
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts (ueburl)
+ .onTopic (topic)
+ .usingHttps()
+ .authenticatedByHttp (authuser, authpwd )
+ .logSendFailuresAfter(5)
+ // .logTo(log)
+ // .limitBatch(100, 10)
+ .build ();
+ }
+ else
+ {
+
+ log.debug("URL:" + ueburl + "TOPIC:" + topic );
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts (ueburl)
+ .onTopic (topic)
+ // .logTo(log)
+ .logSendFailuresAfter(5)
+ // .limitBatch(100, 10)
+ .build ();
+
+ }
+
+ int pendingMsgs = pub.send("MyPartitionKey", event.toString());
+ //this.wait(2000);
+
+ if(pendingMsgs > 100) {
+ log.info("Pending Message Count="+pendingMsgs);
+ }
+
+ //closePublisher();
+ log.info("pub.send invoked - no error");
+ CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event);
+
+ } catch(IOException e) {
+ log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ } catch (GeneralSecurityException e) {
+ // TODO Auto-generated catch block
+ log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ }
+ catch (IllegalArgumentException e)
+ {
+ log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ }
+
+ }
+
+
+ public synchronized void closePublisher() {
+
+ try {
+ if (pub!= null)
+ {
+ final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+ if ( stuck.size () > 0 ) {
+ log.error(stuck.size() + " messages unsent" );
+ }
+ }
+ }
+ catch(InterruptedException ie) {
+ log.error("Caught an Interrupted Exception on Close event");
+ }catch(IOException ioe) {
+ log.error("Caught IO Exception: " + ioe.toString());
+ }
+
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
new file mode 100644
index 00000000..5d60a016
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
@@ -0,0 +1,170 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+
+import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.clock.SaClock;
+
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.LoggingContextFactory;
+import com.att.nsa.logging.log4j.EcompFields;
+
+import jline.internal.Log;
+
+
+public class VESLogger {
+
+ public static final String VES_AGENT = "VES_AGENT";
+
+ public static Logger auditLog;
+ public static Logger metricsLog;
+ public static Logger errorLog;
+ public static Logger debugLog;
+
+ // Common LoggingContext
+ private static LoggingContext commonLC = null;
+ // Thread-specific LoggingContext
+ private static LoggingContext threadLC = null;
+ public LoggingContext lc ;
+
+
+
+ /**
+ * Returns the common LoggingContext instance that is the base context
+ * for all subsequent instances.
+ *
+ * @return the common LoggingContext
+ */
+ public static LoggingContext getCommonLoggingContext()
+ {
+ if (commonLC == null)
+ {
+ commonLC = new LoggingContextFactory.Builder().build();
+ final UUID uuid = java.util.UUID.randomUUID();
+
+ commonLC.put("requestId", uuid.toString());
+ }
+ return commonLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common logging context.
+ * Populate the context with context-specific values.
+ *
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread (UUID aUuid)
+ {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it actually
+ // helps prevent the thread from overwriting supposedly common data. It also
+ // should be fairly quick compared with the overhead of handling the actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().
+ withBaseContext ( getCommonLoggingContext () ).
+ build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put("requestId", aUuid.toString());
+ threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () );
+
+ return threadLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common logging context.
+ * Populate the context with context-specific values.
+ *
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread (String aUuid)
+ {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it actually
+ // helps prevent the thread from overwriting supposedly common data. It also
+ // should be fairly quick compared with the overhead of handling the actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().
+ withBaseContext ( getCommonLoggingContext () ).
+ build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put("requestId", aUuid);
+ threadLC.put ( "statusCode", "COMPLETE" );
+ threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () );
+ return threadLC;
+ }
+ public static void setUpEcompLogging()
+ {
+
+
+ // Create ECOMP Logger instances
+ auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
+ metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
+ errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
+
+
+ final LoggingContext lc = getCommonLoggingContext();
+
+ String ipAddr = "127.0.0.1";
+ String hostname = "localhost";
+ try
+ {
+ final InetAddress ip = InetAddress.getLocalHost ();
+ hostname = ip.getCanonicalHostName ();
+ ipAddr = ip.getHostAddress();
+ }
+ catch ( UnknownHostException x )
+ {
+ Log.debug(x.getMessage());
+ }
+
+ lc.put ( "serverName", hostname );
+ lc.put ( "serviceName", "VESCollecor" );
+ lc.put ( "statusCode", "RUNNING" );
+ lc.put ( "targetEntity", "NULL");
+ lc.put ( "targetServiceName", "NULL");
+ lc.put ( "server", hostname );
+ lc.put ( "serverIpAddress", ipAddr.toString () );
+
+ // instance UUID is meaningless here, so we just create a new one each time the
+ // server starts. One could argue each new instantiation of the service should
+ // have a new instance ID.
+ lc.put ( "instanceUuid", "" );
+ lc.put ( "severity", "" );
+ lc.put ( EcompFields.kEndTimestamp, SaClock.now () );
+ lc.put("EndTimestamp", SaClock.now ());
+ lc.put("partnerName", "NA");
+
+
+ }
+
+
+}