diff options
author | VENKATESH KUMAR <vv770d@att.com> | 2017-08-22 23:36:51 +0100 |
---|---|---|
committer | VENKATESH KUMAR <vv770d@att.com> | 2017-08-23 00:57:24 -0400 |
commit | 64dd2f365ce28e8254ba8fa4407dc5d7f192dacf (patch) | |
tree | f950dd24404a02acfdd0a853cbb9fef3fb4b65ce /src/main/java | |
parent | ef607b769611ddb809a4c13ce421f88ece16017d (diff) |
dcaegen2 vescollector seedcode
Initial seed code delivery for vescollector for support
on the gen2dcae platform
Issue-ID: DCAEGEN2-52
Change-Id: Id2477eb266f05caf64c67dd809b1ad146ff4fb92
Signed-off-by: VENKATESH KUMAR <vv770d@att.com>
Diffstat (limited to 'src/main/java')
12 files changed, 2558 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java new file mode 100644 index 00000000..b743f134 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java @@ -0,0 +1,351 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + + + +import java.io.IOException; + +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.servlet.ServletException; + +import org.apache.catalina.LifecycleException; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.onap.dcae.restapi.RestfulCollectorServlet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import com.att.nsa.apiServer.ApiServer; +import com.att.nsa.apiServer.ApiServerConnector; +import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; +import com.att.nsa.cmdLine.NsaCommandLineUtil; +import com.att.nsa.drumlin.service.framework.DrumlinServlet; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile; +import com.att.nsa.drumlin.till.nv.impl.nvReadableStack; +import com.att.nsa.drumlin.till.nv.impl.nvReadableTable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; +import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonNode; +import com.github.fge.jsonschema.exceptions.ProcessingException; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import com.github.fge.jsonschema.report.ProcessingMessage; +import com.github.fge.jsonschema.report.ProcessingReport; +import com.github.fge.jsonschema.util.JsonLoader; + + +public class CommonStartup extends NsaBaseEndpoint implements Runnable +{ + public static final String kConfig = "c"; + + public static final String kSetting_Port = "collector.service.port"; + public static final int kDefault_Port = 8080; + + public static final String kSetting_SecurePort = "collector.service.secure.port"; + public static final int kDefault_SecurePort = -1; + + public static final String kSetting_KeystorePassfile = "collector.keystore.passwordfile"; + public static final String kDefault_KeystorePassfile = "../etc/passwordfile"; + public static final String kSetting_KeystoreFile = "collector.keystore.file.location"; + public static final String kDefault_KeystoreFile = "../etc/keystore"; + public static final String kSetting_KeyAlias = "collector.keystore.alias"; + public static final String kDefault_KeyAlias = "tomcat"; + + public static final String kSetting_DmaapConfigs = "collector.dmaapfile"; + protected static final String[] kDefault_DmaapConfigs = new String[] { "/etc/DmaapConfig.json" }; + + public static final String kSetting_MaxQueuedEvents = "collector.inputQueue.maxPending"; + public static final int kDefault_MaxQueuedEvents = 1024*4; + + public static final String kSetting_schemaValidator = "collector.schema.checkflag"; + public static final int kDefault_schemaValidator = -1; + + public static final String kSetting_schemaFile = "collector.schema.file"; + public static final String kDefault_schemaFile = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"; + public static final String kSetting_ExceptionConfig = "exceptionConfig"; + + public static final String kSetting_dmaapStreamid = "collector.dmaap.streamid"; + + public static final String kSetting_authflag = "header.authflag"; + public static final int kDefault_authflag = 0; + + public static final String kSetting_authid = "header.authid"; + public static final String kSetting_authpwd = "header.authpwd"; + public static final String kSetting_authstore = "header.authstore"; + public static final String kSetting_authlist = "header.authlist"; + + public static final String kSetting_eventTransformFlag = "event.transform.flag"; + public static final int kDefault_eventTransformFlag = 1; + + + public static final Logger inlog = LoggerFactory.getLogger ("org.onap.dcae.commonFunction.input" ); + public static final Logger oplog = LoggerFactory.getLogger ("org.onap.dcae.commonFunction.output"); + public static final Logger eplog = LoggerFactory.getLogger ("org.onap.dcae.commonFunction.error"); + public static final Logger metriclog = LoggerFactory.getLogger ("com.att.ecomp.metrics" ); + + public static int schema_Validatorflag = -1; + public static int authflag = 1; + public static int eventTransformFlag = 1; + public static String schemaFile = null; + public static JSONObject schemaFileJson = null; + public static String exceptionConfig = null; + public static String cambriaConfigFile = null; + private boolean listnerstatus = false; + static String streamid = null; + + + private CommonStartup(rrNvReadable settings) throws loadException, missingReqdSetting, IOException, rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, InterruptedException + { + final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector> (); + + if (settings.getInt ( kSetting_Port, kDefault_Port ) > 0) + { + // http service + connectors.add ( + new ApiServerConnector.Builder ( settings.getInt ( kSetting_Port, kDefault_Port ) ) + .secure ( false ) + .build () + ); + } + + // optional https service + final int securePort = settings.getInt(kSetting_SecurePort, kDefault_SecurePort); + final String keystoreFile = settings.getString(kSetting_KeystoreFile, kDefault_KeystoreFile); + final String keystorePasswordFile = settings.getString(kSetting_KeystorePassfile, kDefault_KeystorePassfile); + final String keyAlias = settings.getString (kSetting_KeyAlias, kDefault_KeyAlias); + + + if (securePort > 0) + { + final String kSetting_KeystorePass = readFile(keystorePasswordFile, Charset.defaultCharset()); + connectors.add(new ApiServerConnector.Builder(securePort) + .secure(true) + .keystorePassword(kSetting_KeystorePass) + .keystoreFile(keystoreFile) + .keyAlias(keyAlias) + .build()); + + } + + //Reading other config properties + + schema_Validatorflag = settings.getInt(kSetting_schemaValidator, kDefault_schemaValidator ); + if (schema_Validatorflag > 0){ + schemaFile = settings.getString(kSetting_schemaFile,kDefault_schemaFile); + //System.out.println("SchemaFile:" + schemaFile); + schemaFileJson = new JSONObject(schemaFile); + + } + exceptionConfig = settings.getString(kSetting_ExceptionConfig, null); + authflag = settings.getInt(CommonStartup.kSetting_authflag, CommonStartup.kDefault_authflag ); + String [] currentconffile = settings.getStrings (CommonStartup.kSetting_DmaapConfigs, CommonStartup.kDefault_DmaapConfigs ) ; + cambriaConfigFile= currentconffile[0] ; + streamid = settings.getString(kSetting_dmaapStreamid,null); + eventTransformFlag = settings.getInt(kSetting_eventTransformFlag, kDefault_eventTransformFlag); + + fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)) + .encodeSlashes(true) + .name("collector") + .build(); + + + //Load override exception map + CustomExceptionLoader.LoadMap(); + setListnerstatus(true); + } + + public static void main ( String[] args ) + { + ExecutorService executor = null; + try + { + // process command line arguments + final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine ( args, true ); + final String config = NsaCommandLineUtil.getSetting ( argMap, kConfig, "collector.properties" ); + final URL settingStream = DrumlinServlet.findStream ( config, CommonStartup.class ); + + final nvReadableStack settings = new nvReadableStack (); + settings.push ( new nvPropertiesFile ( settingStream ) ); + settings.push ( new nvReadableTable ( argMap ) ); + + fProcessingInputQueue = new LinkedBlockingQueue<JSONObject> (CommonStartup.kDefault_MaxQueuedEvents); + + VESLogger.setUpEcompLogging(); + + CommonStartup cs= new CommonStartup ( settings ); + + Thread csmain = new Thread(cs); + csmain.start(); + + + EventProcessor ep = new EventProcessor (); + //Thread epThread=new Thread(ep); + //epThread.start(); + executor = Executors.newFixedThreadPool(20); + executor.execute(ep); + + } + catch ( loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException | InterruptedException e ) + { + CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage() ); + throw new RuntimeException ( e ); + } + finally + { + // This will make the executor accept no new threads + // and finish all existing threads in the queue + if (executor != null){ + executor.shutdown(); + } + + } + } + + public void run() { + try { + fTomcatServer.start (); + } catch (LifecycleException | IOException e) { + + e.printStackTrace(); + } + fTomcatServer.await (); + } + + public boolean isListnerstatus() { + return listnerstatus; + } + + public void setListnerstatus(boolean listnerstatus) { + this.listnerstatus = listnerstatus; + } + public static Queue<JSONObject> getProcessingInputQueue () + { + return fProcessingInputQueue; + } + + public static class QueueFullException extends Exception + { + private static final long serialVersionUID = 1L; + } + + + public static void handleEvents ( JSONArray a ) throws QueueFullException, JSONException, IOException + { + final Queue<JSONObject> queue = getProcessingInputQueue (); + try + { + + CommonStartup.metriclog.info("EVENT_PUBLISH_START" ); + for (int i = 0; i < a.length(); i++) { + if ( !queue.offer ( a.getJSONObject(i) ) ) { + throw new QueueFullException (); + } + + } + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + CommonStartup.metriclog.info("EVENT_PUBLISH_END"); + //ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS); + + } + catch ( JSONException e ){ + throw e; + + } + } + + + static String readFile(String path, Charset encoding) + throws IOException + { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + String pwd = new String(encoded); + return pwd.substring(0,pwd.length()-1); + } + + + public static String schemavalidate( String jsonData, String jsonSchema) { + ProcessingReport report = null; + String result = "false"; + + try { + //System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to data: #<#<"+jsonData+">#>#"); + log.trace("Schema validation for event:" + jsonData); + JsonNode schemaNode = JsonLoader.fromString(jsonSchema); + JsonNode data = JsonLoader.fromString(jsonData); + JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); + JsonSchema schema = factory.getJsonSchema(schemaNode); + report = schema.validate(data); + } catch (JsonParseException e) { + log.error("schemavalidate:JsonParseException for event:" + jsonData ); + System.out.println(e.getMessage()); + return e.getMessage().toString(); + } catch (ProcessingException e) { + log.error("schemavalidate:Processing exception for event:" + jsonData ); + System.out.println(e.getMessage()); + return e.getMessage().toString(); + } catch (IOException e) { + log.error("schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData); + System.out.println(e.getMessage()); + return e.getMessage().toString(); + } + if (report != null) { + Iterator<ProcessingMessage> iter = report.iterator(); + while (iter.hasNext()) { + ProcessingMessage pm = iter.next(); + log.trace("Processing Message: "+pm.getMessage()); + } + result = String.valueOf(report.isSuccess()); + } + try { + log.debug("Validation Result:" +result + " Validation report:" + report); + } + catch (NullPointerException e){ + log.error("schemavalidate:NullpointerException on report"); + } + return result; + } + + + + static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; + private static ApiServer fTomcatServer = null; + private static final Logger log = LoggerFactory.getLogger ( CommonStartup.class ); + +} diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java new file mode 100644 index 00000000..ff6c328c --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java @@ -0,0 +1,576 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigProcessors { + + private static Logger log = LoggerFactory.getLogger(ConfigProcessors.class); + + public ConfigProcessors(JSONObject eventJson) + { + event = eventJson; + } + + /** + * + */ + public void getValue(JSONObject J) + { + //log.info("addAttribute"); + final String field = J.getString("field"); + //final String value = J.getString("value"); + final JSONObject filter = J.optJSONObject("filter"); + if (filter == null || isFilterMet(filter)) + { + //log.info("field ==" + field); + //log.info("value ==" + value); + getEventObjectVal(field); + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void setValue(JSONObject J) + { + //log.info("addAttribute"); + final String field = J.getString("field"); + final String value = J.getString("value"); + final JSONObject filter = J.optJSONObject("filter"); + if (filter == null || isFilterMet(filter)) + { + //log.info("field ==" + field); + //log.info("value ==" + value); + setEventObjectVal(field, value); + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void addAttribute(JSONObject J) + { + //log.info("addAttribute"); + final String field = J.getString("field"); + final String value = J.getString("value"); + final JSONObject filter = J.optJSONObject("filter"); + if (filter == null || isFilterMet(filter)) + { + //log.info("field ==" + field); + //log.info("value ==" + value); + setEventObjectVal(field, value); + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void updateAttribute(JSONObject J) + { + //log.info("updateAttribute"); + final String field = J.getString("field"); + final String value = J.getString("value"); + final JSONObject filter = J.optJSONObject("filter"); + if (filter == null || isFilterMet(filter)) + { + //log.info("field ==" + field); + //log.info("value ==" + value); + setEventObjectVal(field, value); + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void removeAttribute(JSONObject J) + { + //log.info("removeAttribute"); + final String field = J.getString("field"); + final JSONObject filter = J.optJSONObject("filter"); + + if (filter == null || isFilterMet(filter)) + { + removeEventKey(field); + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void renameArrayInArray(JSONObject J) //map + { + log.info("renameArrayInArray"); + final String field = J.getString("field"); + final String oldField = J.getString("oldField"); + final JSONObject filter = J.optJSONObject("filter"); + //String value = ""; + if (filter == null || isFilterMet(filter)) + { + //log.info("field ==" + field); + final String[] fsplit = field.split("\\[\\]", field.length()); + final String[] oldfsplit = oldField.split("\\[\\]", oldField.length()); + /*for (int i=0; i< oldfsplit.length; i++ ) + { + log.info( "renameArrayInArray " + i + " ==" + oldfsplit[i]); + }*/ + + final String oldValue = getEventObjectVal(oldfsplit[0]).toString(); + if (!oldValue.equals("ObjectNotFound")){ + final String oldArrayName = oldfsplit[1].substring(1); + final String newArrayName = fsplit[1].substring(1); + final String value = oldValue.replaceAll(oldArrayName, newArrayName); + //log.info("oldArrayName ==" + oldArrayName); + //log.info("newArrayName ==" + newArrayName); + log.info("oldValue ==" + oldValue); + log.info("value ==" + value); + JSONArray ja = new JSONArray(value); + removeEventKey(oldfsplit[0]); + setEventObjectVal(fsplit[0], ja); + } + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void map(JSONObject J) + { + //log.info("mapAttribute"); + final String field = J.getString("field"); + if (field.contains("[]")) + { + if (field.matches(".*\\[\\]\\..*\\[\\]")) + renameArrayInArray(J); + else + mapToJArray(J); + } + else + mapAttribute(J); + } + + /** + * + */ + public String performOperation(String operation, String value) + { + log.info("performOperation"); + if (operation != null) + { + if (operation.equals("convertMBtoKB")) + { + float kbValue = Float.parseFloat(value) * 1024; + value = String.valueOf(kbValue); + } + } + return value; + } + + /** + * + */ + //public void mapAttributeToArrayAttribute(JSONObject J) + public void mapAttribute(JSONObject J) + { + //log.info("mapAttribute"); + final String field = J.getString("field"); + final String oldField = J.getString("oldField"); + final JSONObject filter = J.optJSONObject("filter"); + final String operation = J.optString("operation"); + String value = ""; + if (filter == null || isFilterMet(filter)) + { + //log.info("field ==" + field); + + value = getEventObjectVal(oldField).toString(); + if (!value.equals("ObjectNotFound")) + { + if (operation != null && !operation.equals("")) + value = performOperation(operation, value); + //log.info("value ==" + value); + setEventObjectVal(field, value); + + removeEventKey(oldField); + } + } + else + log.info("Filter not met"); + } + + /** + * + */ + public void mapToJArray(JSONObject J) + { + log.info("mapToJArray"); + String field = J.getString("field"); + String oldField = J.getString("oldField"); + final JSONObject filter = J.optJSONObject("filter"); + final JSONObject attrMap = J.optJSONObject("attrMap"); + oldField = oldField.replaceAll("\\[\\]", ""); + field = field.replaceAll("\\[\\]", ""); + + //log.info("oldField ==" + field); + if (filter == null || isFilterMet(filter)) + { + //log.info("oldField ==" + field); + String value = getEventObjectVal(oldField).toString(); + if (!value.equals("ObjectNotFound")) + { + log.info("old value ==" + value.toString()); + //update old value based on attrMap + if (attrMap != null) + { + //loop thru attrMap and update attribute name to new name + for (String key : attrMap.keySet()) + { + //log.info("attr key==" + key + " value==" + attrMap.getString(key)); + value = value.replaceAll(key, attrMap.getString(key)); + } + } + + log.info("new value ==" + value); + char c = value.charAt(0); + if (c != '[') + { + //oldfield is JsonObject + JSONObject valueJO = new JSONObject(value); + // if the array already exists + + String existingValue = getEventObjectVal(field).toString(); + if (!existingValue.equals("ObjectNotFound")) + { + JSONArray ja = new JSONArray(existingValue); + JSONObject jo = ja.optJSONObject(0); + if (jo != null) + { + for (String key : valueJO.keySet()) + { + jo.put(key, valueJO.get(key)); + + } + ja.put(0, jo); + //log.info("jarray== " + ja.toString()); + setEventObjectVal(field,ja); + } + } + else //if new array + setEventObjectVal(field + "[0]", new JSONObject(value)); + } + else //oldfield is jsonArray + setEventObjectVal(field, new JSONArray(value)); + + removeEventKey(oldField); + } + } + else + log.info("Filter not met"); + } + + /** + * example - + { + "functionName": "concatenateValue", + "args":{ + "filter": {"event.commonEventHeader.event":"heartbeat"}, + "field":"event.commonEventHeader.eventName", + "concatenate": ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"], + "delimiter":"_" + } + } + **/ + public void concatenateValue(JSONObject J) + { + //log.info("concatenateValue"); + final String field = J.getString("field"); + final String delimiter = J.getString("delimiter"); + final JSONArray values = J.getJSONArray("concatenate"); + final JSONObject filter = J.optJSONObject("filter"); + if (filter == null || isFilterMet(filter)) + { + String value = ""; + for (int i=0; i < values.length(); i++) + { + //log.info(values.getString(i)); + String tempVal = getEventObjectVal(values.getString(i)).toString(); + if (!tempVal.equals("ObjectNotFound")) + { + if (i ==0) + value = value + getEventObjectVal(values.getString(i)); + else + value = value + delimiter + getEventObjectVal(values.getString(i)); + } + } + //log.info("value ==" + value); + setEventObjectVal(field, value); + } + else + log.info("Filter not met"); + } + + /** + * + */ + private void removeEventKey(String field) + { + String[] keySet = field.split("\\.",field.length()); + JSONObject keySeries = event; + for (int i=0; i<(keySet.length -1); i++ ) + { + //log.info( i + " ==" + keySet[i]); + keySeries = keySeries.getJSONObject(keySet[i]); + } + //log.info(keySet[keySet.length -1]); + + keySeries.remove(keySet[keySet.length -1]); + + } + + /** + * + */ + private boolean checkFilter(JSONObject jo, String key, String logicKey) + { + String filterValue = jo.getString(key); + boolean retVal = true; + + if(filterValue.contains(":")) + { + String[] splitVal = filterValue.split(":"); + //log.info(splitVal[0] + " " + splitVal[1]); + if (splitVal[0].equals("matches")) + { + if (logicKey.equals("not")) + { + //log.info("not"); + //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]); + if (getEventObjectVal(key).toString().matches(splitVal[1])) + { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + return false; + } + } + else + { + if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) + { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + return false; + } + } + + } + if (splitVal[0].equals("contains")) + { + if (logicKey.equals("not")) + { + //log.info("not"); + //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]); + if (getEventObjectVal(key).toString().contains(splitVal[1])) + { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + return false; + } + } + else + { + if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) + { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + return false; + } + } + + } + } + else + { + if (logicKey.equals("not")) + { + if(getEventObjectVal(key).toString().equals(filterValue)) + { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + return false; + } + } + else + { + if(!(getEventObjectVal(key).toString().equals(filterValue))) + { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + return false; + } + } + } + return retVal; + } + /** + * + */ + public boolean isFilterMet(JSONObject jo) + { + boolean retval = true; + //log.info("Filter==" + jo.toString()); + for (String key : jo.keySet()) + { + if (key.equals("not")) + { + JSONObject njo = jo.getJSONObject(key); + for (String njoKey : njo.keySet()) + { + //log.info(njoKey); + retval = checkFilter(njo, njoKey, key); + if (retval == false) + return retval; + } + } + else + { + //log.info(key); + //final String filterKey = key; + retval = checkFilter(jo, key, key); + if (retval == false) + return retval; + } + } + return true; + } + + /** + * returns a string or JSONObject or JSONArray + **/ + public Object getEventObjectVal(String keySeriesStr) + { + keySeriesStr = keySeriesStr.replaceAll("\\[", "."); + keySeriesStr = keySeriesStr.replaceAll("\\]", "."); + if (keySeriesStr.contains("..")) + { + keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); + } + //log.info(Integer.toString(keySeriesStr.lastIndexOf("."))); + //log.info(Integer.toString(keySeriesStr.length() -1)); + if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 ) + keySeriesStr = keySeriesStr.substring(0,keySeriesStr.length()-1 ); + String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); + Object keySeriesObj = event; + for (int i=0; i<(keySet.length); i++ ) + { + //log.info( "getEventObject " + i + " ==" + keySet[i]); + if (keySeriesObj != null) + { + if (keySeriesObj instanceof String) + { + //keySeriesObj = keySeriesObj.get(keySet[i]); + log.info("STRING==" + keySeriesObj); + } + else if (keySeriesObj instanceof JSONArray) { + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); + //log.info("ARRAY==" + keySeriesObj); + } + else if (keySeriesObj instanceof JSONObject) { + keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]); + //log.info("JSONObject==" + keySeriesObj); + } + else + { + log.info("unknown object==" + keySeriesObj); + } + } + } + + if (keySeriesObj == null) + return "ObjectNotFound"; + return keySeriesObj; + } + + /** + * returns a string or JSONObject or JSONArray + **/ + public void setEventObjectVal(String keySeriesStr, Object value) + { + keySeriesStr = keySeriesStr.replaceAll("\\[", "."); + keySeriesStr = keySeriesStr.replaceAll("\\]", "."); + if (keySeriesStr.contains("..")) + { + keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); + } + //log.info(Integer.toString(keySeriesStr.lastIndexOf("."))); + //log.info(Integer.toString(keySeriesStr.length() -1)); + if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 ) + keySeriesStr = keySeriesStr.substring(0,keySeriesStr.length()-1 ); + String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); + Object keySeriesObj = event; + for (int i=0; i<(keySet.length -1); i++ ) + { + //log.info( "setEventObject " + i + " ==" + keySet[i]); + if (keySeriesObj instanceof JSONArray) { + //keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); + if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) //if the object is not there then add it + { + log.info("Object is null, must add it"); + if (keySet[i+1].matches("[0-9]*")) // if index then array + ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray()); + else + ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject()); + } + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); + //log.info("ARRAY==" + keySeriesObj); + } + else if (keySeriesObj instanceof JSONObject) { + if (( (JSONObject) keySeriesObj).opt(keySet[i]) == null) //if the object is not there then add it + { + if (keySet[i+1].matches("[0-9]*")) // if index then array + ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray()); + else + ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject()); + log.info("Object is null, must add it"); + } + keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]); + //log.info("JSONObject==" + keySeriesObj); + } + else + { + log.info("unknown object==" + keySeriesObj); + } + } + + ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], value); + } + + private JSONObject event = new JSONObject(); +} diff --git a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java new file mode 100644 index 00000000..175ad443 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java @@ -0,0 +1,132 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashMap; + +import java.util.Map.Entry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; + + +public class CustomExceptionLoader { + + public static HashMap<String, JsonArray> map = null; + private static final Logger log = LoggerFactory.getLogger ( CustomExceptionLoader.class ); + + //For standalone test + //LoadMap Invoked from servletSetup + /* + public static void main(String[] args) { + + System.out.println("CustomExceptionLoader.main --> Arguments -- ExceptionConfig file: " + args[0] + "StatusCode:" + args[1]+ " Error Msg:" + args[2]); + CommonStartup.exceptionConfig = args[0]; + + //Read the Custom exception JSON file into map + LoadMap(); + System.out.println("CustomExceptionLoader.main --> Map info post LoadMap:" + map); + + String[] str= LookupMap(args[1],args[2]); + if (! (str==null)) { + System.out.println("CustomExceptionLoader.main --> Return from lookup function" + str[0] + "value:" + str[1]); + } + + } + */ + + public static void LoadMap () { + + map = new HashMap<String, JsonArray>(); + FileReader fr = null; + try { + JsonElement root = null; + fr = new FileReader(CommonStartup.exceptionConfig); + root = new JsonParser().parse(fr); + JsonObject jsonObject = root.getAsJsonObject().get("code").getAsJsonObject(); + + for (Entry<String, JsonElement> entry : jsonObject.entrySet()) { + map.put(entry.getKey(), (JsonArray) entry.getValue()); + } + + log.debug("CustomExceptionLoader.LoadMap --> Map loaded - " + map); + } catch (JsonIOException e) { + e.printStackTrace(); + } catch (JsonSyntaxException e) { + e.printStackTrace(); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + finally { + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + log.error("Error closing file reader stream : " +e.toString()); + } + } + } + } + + public static String[] LookupMap (String error, String errormsg) { + + String[] retarray = null; + + log.debug("CustomExceptionLoader.LookupMap -->" + " HTTP StatusCode:" + error + " Msg:" + errormsg); + try{ + + JsonArray jarray = map.get(error); + for (int i = 0; i < jarray.size(); i++) { + + JsonElement val = jarray.get(i).getAsJsonObject().get("Reason"); + JsonArray ec = (JsonArray) jarray.get(i).getAsJsonObject().get("ErrorCode"); + log.trace("CustomExceptionLoader.LookupMap Parameter -> Error msg : " + errormsg + " Reason text being matched:" + val); + if (errormsg.contains(val.toString().replace("\"", ""))){ + log.trace("CustomExceptionLoader.LookupMap Successful! Exception matched to error message StatusCode:" + ec.get(0).toString() + "ErrorMessage:" + ec.get(1).toString()); + retarray = new String[2]; + retarray[0]=ec.get(0).toString(); + retarray[1]=ec.get(1).toString(); + return retarray; + } + } + + } + catch (Exception e) + { + System.out.println(e.getMessage()); + } + + return retarray; + } + +} diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java new file mode 100644 index 00000000..62383f5e --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java @@ -0,0 +1,214 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import java.io.FileNotFoundException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; + +public class DmaapPropertyReader { + + private static DmaapPropertyReader instance = null; + + private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class); + + public HashMap<String, String> dmaap_hash = new HashMap<String, String>(); + + private DmaapPropertyReader(String CambriaConfigFile) { + + FileReader fr = null; + try { + JsonElement root = null; + fr = new FileReader(CambriaConfigFile); + root = new JsonParser().parse(fr); + + //Check if dmaap config is handled by legacy controller/service manager + if (root.getAsJsonObject().has("channels")) { + JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels"); + + for (int i = 0; i < jsonObject.size(); i++) { + log.debug("TOPIC:" + jsonObject.get(i).getAsJsonObject().get("cambria.topic") + " HOST-URL:" + + jsonObject.get(i).getAsJsonObject().get("cambria.url") + " HOSTS:" + + jsonObject.get(i).getAsJsonObject().get("cambria.hosts") + " PWD:" + + jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") + " USER:" + + jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") + " NAME:" + + jsonObject.get(i).getAsJsonObject().get("name")); + + String convertedname = jsonObject.get(i).getAsJsonObject().get("name").toString().replace("\"", ""); + dmaap_hash.put(convertedname + ".cambria.topic", + jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString().replace("\"", "")); + + if (jsonObject.get(i).getAsJsonObject().get("cambria.hosts") != null) { + dmaap_hash.put(convertedname + ".cambria.hosts", + jsonObject.get(i).getAsJsonObject().get("cambria.hosts").toString().replace("\"", "")); + } + if (jsonObject.get(i).getAsJsonObject().get("cambria.url") != null) { + dmaap_hash.put(convertedname + ".cambria.url", + jsonObject.get(i).getAsJsonObject().get("cambria.url").toString().replace("\"", "")); + } + if (jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") != null) { + dmaap_hash.put(convertedname + ".basicAuthPassword", jsonObject.get(i).getAsJsonObject() + .get("basicAuthPassword").toString().replace("\"", "")); + } + if (jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") != null) { + dmaap_hash.put(convertedname + ".basicAuthUsername", jsonObject.get(i).getAsJsonObject() + .get("basicAuthUsername").toString().replace("\"", "")); + } + + } + } else { + + //Handing new format from controllergen2/config_binding_service + JsonObject jsonObject = root.getAsJsonObject(); + Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet(); + + for (Map.Entry<String, JsonElement> entry : entries) { + + JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info").getAsJsonObject().get("topic_url"); + String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", "")); + + String mrTopic = null; + String mrUrl = null; + String[] hostport = null; + String username = null; + String userpwd = null; + + try { + + if (null != urlParts) { + mrUrl = urlParts[2]; + + // DCAE internal dmaap topic convention + if (urlParts[3].equals("events")) { + mrTopic = urlParts[4]; + } else { + // ONAP dmaap topic convention + mrTopic = urlParts[3]; + hostport = mrUrl.split(":"); + } + + } + } catch (NullPointerException e) { + System.out.println("NullPointerException"); + e.getMessage(); + } + + if (entry.getValue().getAsJsonObject().has("aaf_username")) { + username = entry.getValue().getAsJsonObject().get("aaf_username").toString().replace("\"", ""); + } + if (entry.getValue().getAsJsonObject().has("aaf_password")) { + userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString().replace("\"", ""); + } + if (hostport == null) { + log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " PWD:" + userpwd + " USER:" + username); + } else { + log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " HOSTS:" + hostport[0] + " PWD:" + + userpwd + " USER:" + username + " NAME:" + entry.getKey()); + } + + dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic); + + if (!(hostport == null)) { + dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]); + } + + if (!(mrUrl == null)) { + dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl); + } + + if (!(username == null)) { + dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username); + } + + if (!(userpwd == null)) { + dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd); + } + + } + + } + + } catch (JsonIOException | JsonSyntaxException | + + FileNotFoundException e1) { + e1.printStackTrace(); + log.error("Problem loading Dmaap Channel configuration file: " + e1.toString()); + } finally { + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + log.error("Error closing file reader stream : " + e.toString()); + } + } + } + + } + + /*** + * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/ + * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>: + * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1"; + * + * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>. + * <topic>, + */ + + private String[] dmaapUrlSplit(String dmUrl) { + String[] multUrls = dmUrl.split(","); + + StringBuffer newUrls = new StringBuffer(); + String urlParts[] = null; + for (int i = 0; i < multUrls.length; i++) { + urlParts = multUrls[i].split("/"); + if (i == 0) { + newUrls = newUrls.append(urlParts[2]); + } else { + newUrls = newUrls.append(",").append(urlParts[2]); + } + } + return urlParts; + } + + public static synchronized DmaapPropertyReader getInstance(String ChannelConfig) { + if (instance == null) { + instance = new DmaapPropertyReader(ChannelConfig); + } + return instance; + } + + public String getKeyValue(String HashKey) { + return this.dmaap_hash.get(HashKey); + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java new file mode 100644 index 00000000..4cdba066 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -0,0 +1,192 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import java.io.FileReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; + +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.TimeZone; +import java.util.UUID; + +import org.json.JSONArray; +import org.json.JSONObject; + +public class EventProcessor implements Runnable { + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + + private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>(); + private JSONObject event = null; + + public EventProcessor() { + log.debug("EventProcessor: Default Constructor"); + + String list[] = CommonStartup.streamid.split("\\|"); + for (int i = 0; i < list.length; i++) { + String domain = list[i].split("=")[0]; + //String streamIdList[] = list[i].split("=")[1].split(","); + String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(","); + + log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList)); + streamid_hash.put(domain, streamIdList); + } + + } + + @Override + public void run() { + + try { + + event = CommonStartup.fProcessingInputQueue.take(); + log.info("EventProcessor\tRemoving element: " + event); + + //EventPublisher Ep=new EventPublisher(); + while (event != null) { + // As long as the producer is running we remove elements from the queue. + + //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString()); + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); + localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); + + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); + log.debug("streamIdList:" + streamIdList); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event.toString()); + } + + else { + for (int i=0; i < streamIdList.length; i++) + { + log.info("Invoking publisher for streamId:" + streamIdList[i]); + this.overrideEvent(); + EventPublisher.getInstance(streamIdList[i]).sendEvent(event); + + } + } + log.debug("Message published" + event.toString()); + event = CommonStartup.fProcessingInputQueue.take(); + // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); + } + } catch (InterruptedException e) { + log.error("EventProcessor InterruptedException" + e.getMessage()); + } + + } + + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void overrideEvent() + { + //Set collector timestamp in event payload before publish + final Date currentTime = new Date(); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); + JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray ); + JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); + commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/ + + +/* "event": { + "commonEventHeader": { + …. + "internalHeaderFields": { + "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" + }, +*/ + + //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) ); + JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey); + + if (CommonStartup.eventTransformFlag == 1) + { + // read the mapping json file + final JsonParser parser = new JsonParser(); + try { + final JsonArray jo = (JsonArray) parser.parse ( new FileReader ( "./etc/eventTransform.json" ) ); + log.info("parse eventTransform.json"); + // now convert to org.json + final String jsonText = jo.toString (); + final JSONArray topLevel = new JSONArray ( jsonText ); + //log.info("topLevel == " + topLevel); + + Class[] paramJSONObject = new Class[1]; + paramJSONObject[0] = JSONObject.class; + //load VESProcessors class at runtime + Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); + Constructor constr = cls.getConstructor(paramJSONObject); + Object obj = constr.newInstance(event); + + for (int j=0; j<topLevel.length(); j++) + { + JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter"); + Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject); + boolean filterMet = (boolean) method.invoke (obj, filterObj ); + if (filterMet) + { + final JSONArray processors = (JSONArray)topLevel.getJSONObject(j).getJSONArray("processors"); + + //call the processor method + for (int i=0; i < processors.length(); i++) + { + final JSONObject processorList = processors.getJSONObject(i); + final String functionName = processorList.getString("functionName"); + final JSONObject args = processorList.getJSONObject("args"); + //final JSONObject filter = processorList.getJSONObject("filter"); + + log.info("functionName==" + functionName + " | args==" + args); + //reflect method call + method = cls.getDeclaredMethod(functionName, paramJSONObject); + method.invoke(obj, args); + } + } + } + + } catch (Exception e) { + + log.error("EventProcessor Exception" + e.getMessage() + e); + log.error("EventProcessor Exception" + e.getCause()); + } + } + log.debug("Modified event:" + event); + + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java new file mode 100644 index 00000000..a5acb857 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java @@ -0,0 +1,181 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import java.io.IOException; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import java.security.GeneralSecurityException; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; + + +public class EventPublisher { + + private static EventPublisher instance = null; + private static CambriaBatchingPublisher pub = null; + + private String streamid = ""; + private String ueburl=""; + private String topic=""; + private String authuser=""; + private String authpwd=""; + + private static Logger log = LoggerFactory.getLogger(EventPublisher.class); + + + private EventPublisher( String newstreamid) { + + this.streamid = newstreamid; + try { + ueburl=DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.url"); + + if (ueburl==null){ + ueburl= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.hosts"); + } + topic= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".cambria.topic"); + authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".basicAuthUsername"); + + + if (authuser != null) { + authpwd= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".basicAuthPassword"); + } + } + catch(Exception e) { + log.error("CambriaClientBuilders connection reader exception : " + e.getMessage()); + + } + + } + + + public static synchronized EventPublisher getInstance( String streamid){ + if (instance == null) { + instance = new EventPublisher(streamid); + } + if (!instance.streamid.equals(streamid)){ + instance.closePublisher(); + instance = new EventPublisher(streamid); + } + return instance; + + } + + + public synchronized void sendEvent(JSONObject event) { + + log.debug("EventPublisher.sendEvent: instance for publish is ready"); + + + if (event.has("VESuniqueId")) + { + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); + localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); + log.debug("Removing VESuniqueid object from event"); + event.remove("VESuniqueId"); + } + + + + + try { + + if (authuser != null) + { + log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd); + pub = new CambriaClientBuilders.PublisherBuilder () + .usingHosts (ueburl) + .onTopic (topic) + .usingHttps() + .authenticatedByHttp (authuser, authpwd ) + .logSendFailuresAfter(5) + // .logTo(log) + // .limitBatch(100, 10) + .build (); + } + else + { + + log.debug("URL:" + ueburl + "TOPIC:" + topic ); + pub = new CambriaClientBuilders.PublisherBuilder () + .usingHosts (ueburl) + .onTopic (topic) + // .logTo(log) + .logSendFailuresAfter(5) + // .limitBatch(100, 10) + .build (); + + } + + int pendingMsgs = pub.send("MyPartitionKey", event.toString()); + //this.wait(2000); + + if(pendingMsgs > 100) { + log.info("Pending Message Count="+pendingMsgs); + } + + //closePublisher(); + log.info("pub.send invoked - no error"); + CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event); + + } catch(IOException e) { + log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + } catch (GeneralSecurityException e) { + // TODO Auto-generated catch block + log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + } + catch (IllegalArgumentException e) + { + log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); + } + + } + + + public synchronized void closePublisher() { + + try { + if (pub!= null) + { + final List<?> stuck = pub.close(20, TimeUnit.SECONDS); + if ( stuck.size () > 0 ) { + log.error(stuck.size() + " messages unsent" ); + } + } + } + catch(InterruptedException ie) { + log.error("Caught an Interrupted Exception on Close event"); + }catch(IOException ioe) { + log.error("Caught IO Exception: " + ioe.toString()); + } + + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java new file mode 100644 index 00000000..5d60a016 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java @@ -0,0 +1,170 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import java.net.InetAddress; +import java.net.UnknownHostException; + + +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.clock.SaClock; + +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.LoggingContextFactory; +import com.att.nsa.logging.log4j.EcompFields; + +import jline.internal.Log; + + +public class VESLogger { + + public static final String VES_AGENT = "VES_AGENT"; + + public static Logger auditLog; + public static Logger metricsLog; + public static Logger errorLog; + public static Logger debugLog; + + // Common LoggingContext + private static LoggingContext commonLC = null; + // Thread-specific LoggingContext + private static LoggingContext threadLC = null; + public LoggingContext lc ; + + + + /** + * Returns the common LoggingContext instance that is the base context + * for all subsequent instances. + * + * @return the common LoggingContext + */ + public static LoggingContext getCommonLoggingContext() + { + if (commonLC == null) + { + commonLC = new LoggingContextFactory.Builder().build(); + final UUID uuid = java.util.UUID.randomUUID(); + + commonLC.put("requestId", uuid.toString()); + } + return commonLC; + } + + /** + * Get a logging context for the current thread that's based on the common logging context. + * Populate the context with context-specific values. + * + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread (UUID aUuid) + { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it actually + // helps prevent the thread from overwriting supposedly common data. It also + // should be fairly quick compared with the overhead of handling the actual + // service call. + + threadLC = new LoggingContextFactory.Builder(). + withBaseContext ( getCommonLoggingContext () ). + build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put("requestId", aUuid.toString()); + threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () ); + + return threadLC; + } + + /** + * Get a logging context for the current thread that's based on the common logging context. + * Populate the context with context-specific values. + * + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread (String aUuid) + { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it actually + // helps prevent the thread from overwriting supposedly common data. It also + // should be fairly quick compared with the overhead of handling the actual + // service call. + + threadLC = new LoggingContextFactory.Builder(). + withBaseContext ( getCommonLoggingContext () ). + build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put("requestId", aUuid); + threadLC.put ( "statusCode", "COMPLETE" ); + threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () ); + return threadLC; + } + public static void setUpEcompLogging() + { + + + // Create ECOMP Logger instances + auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); + metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); + errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); + + + final LoggingContext lc = getCommonLoggingContext(); + + String ipAddr = "127.0.0.1"; + String hostname = "localhost"; + try + { + final InetAddress ip = InetAddress.getLocalHost (); + hostname = ip.getCanonicalHostName (); + ipAddr = ip.getHostAddress(); + } + catch ( UnknownHostException x ) + { + Log.debug(x.getMessage()); + } + + lc.put ( "serverName", hostname ); + lc.put ( "serviceName", "VESCollecor" ); + lc.put ( "statusCode", "RUNNING" ); + lc.put ( "targetEntity", "NULL"); + lc.put ( "targetServiceName", "NULL"); + lc.put ( "server", hostname ); + lc.put ( "serverIpAddress", ipAddr.toString () ); + + // instance UUID is meaningless here, so we just create a new one each time the + // server starts. One could argue each new instantiation of the service should + // have a new instance ID. + lc.put ( "instanceUuid", "" ); + lc.put ( "severity", "" ); + lc.put ( EcompFields.kEndTimestamp, SaClock.now () ); + lc.put("EndTimestamp", SaClock.now ()); + lc.put("partnerName", "NA"); + + + } + + +} diff --git a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java new file mode 100644 index 00000000..caee9c4d --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java @@ -0,0 +1,114 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.controller; + +import java.io.BufferedReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Map; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + +public class FetchDynamicConfig { + + static String configFile = "/opt/app/KV-Configuration.json"; + static String url = null; + static String ret_string = null; + + public FetchDynamicConfig() { + + } + + public static void main(String[] args) { + Map<String, String> env = System.getenv(); + for (String envName : env.keySet()) { + System.out.format("%s=%s%n", envName, env.get(envName)); + } + + if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") + && env.containsKey("HOSTNAME")) { + System.out.println(">>>Dynamic configuration to be fetched from ConfigBindingService"); + url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE"); + + ret_string = executecurl(url); + // consul return as array + JSONTokener temp = new JSONTokener(ret_string); + JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0); + + String url_part1 = null; + if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) { + url_part1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort"); + } + + System.out.println("CONFIG_BINDING_SERVICE DNS RESOLVED:" + url_part1); + url = url_part1 + "/service_component/" + env.get("HOSTNAME"); + ret_string = executecurl(url); + + JSONObject jsonObject = new JSONObject(new JSONTokener(ret_string)); + try (FileWriter file = new FileWriter(configFile)) { + file.write(jsonObject.toString()); + + System.out.println("Successfully Copied JSON Object to file /opt/app/KV-Configuration.json"); + } catch (IOException e) { + System.out.println( + "Error in writing configuration into file /opt/app/KV-Configuration.json " + jsonObject); + e.printStackTrace(); + } + } + + else { + System.out.println(">>>Static configuration to be used"); + } + + } + + public static String executecurl(String url) { + + String[] command = { "curl", "-v", url }; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + + InputStreamReader ipr = new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr); + StringBuilder builder = new StringBuilder(); + String line = null; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + result = builder.toString(); + System.out.println(result); + + } catch (IOException e) { + System.out.print("error"); + e.printStackTrace(); + } + return result; + + } + +} diff --git a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java new file mode 100644 index 00000000..63d5f6f3 --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java @@ -0,0 +1,158 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.controller; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.Map; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; + +import org.json.JSONObject; + + + +public class LoadDynamicConfig { + + static String propFile = "collector.properties"; + static String configFile = "/opt/app/KV-Configuration.json"; + static String url = null; + static String ret_string = null; + + public LoadDynamicConfig() { + + } + + public static void main(String[] args) { + Map<String, String> env = System.getenv(); + /*for (String envName : env.keySet()) { + System.out.format("%s=%s%n", envName, env.get(envName)); + }*/ + + //Check again to ensure new controller deployment related config + if (env.containsKey("CONSUL_HOST") && + env.containsKey("CONFIG_BINDING_SERVICE") && env.containsKey("HOSTNAME")) + { + + + try { + + String jsonData = readFile(configFile); + JSONObject jsonObject = new JSONObject(jsonData); + + PropertiesConfiguration conf; + conf = new PropertiesConfiguration(propFile); + conf.setEncoding(null); + + + if (jsonObject != null) { + // update properties based on consul dynamic configuration + Iterator<?> keys = jsonObject.keys(); + + while (keys.hasNext()) { + String key = (String) keys.next(); + // check if any configuration is related to dmaap + // and write into dmaapconfig.json + if (key.startsWith("streams_publishes")) { + //VESCollector only have publish streams + try (FileWriter file = new FileWriter("./etc/DmaapConfig.json")) { + file.write(jsonObject.get(key).toString()); + System.out.println("Successfully written JSON Object to DmaapConfig.json"); + } catch (IOException e) { + System.out.println("Error in writing dmaap configuration into DmaapConfig.json"); + e.printStackTrace(); + } + } else { + conf.setProperty(key, jsonObject.get(key).toString()); + } + + } + conf.save(); + + + } + } catch (ConfigurationException e) { + e.getMessage(); + e.printStackTrace(); + + } + + } + else + { + System.out.println(">>>Static configuration to be used"); + } + + } + + public static String executecurl(String url) { + + String[] command = { "curl", "-v", url }; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + + InputStreamReader ipr= new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr); + StringBuilder builder = new StringBuilder(); + String line = null; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + result = builder.toString(); + System.out.println(result); + + } catch (IOException e) { + System.out.print("error"); + e.printStackTrace(); + } + return result; + + } + + public static String readFile(String filename) { + String result = ""; + try { + BufferedReader br = new BufferedReader(new FileReader(filename)); + StringBuilder sb = new StringBuilder(); + String line = br.readLine(); + while (line != null) { + sb.append(line); + line = br.readLine(); + } + result = sb.toString(); + br.close(); + } catch(Exception e) { + e.printStackTrace(); + } + return result; + } + + +} diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java new file mode 100644 index 00000000..bd9a223e --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java @@ -0,0 +1,150 @@ + +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.restapi; + +import java.io.IOException; +import java.net.URL; + +import javax.servlet.ServletException; + +import org.apache.tomcat.util.codec.binary.Base64; +import org.onap.dcae.commonFunction.CommonStartup; +import org.onap.dcae.commonFunction.VESLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.apiServer.CommonServlet; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.framework.DrumlinErrorHandler; +import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext; +import com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter; +import com.att.nsa.drumlin.service.framework.routing.playish.DrumlinPlayishRoutingFileSource; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.NsaAuthenticator; + +import com.att.nsa.security.authenticators.SimpleAuthenticator; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; + +public class RestfulCollectorServlet extends CommonServlet +{ + String authid = null; + String authpwd = null; + String authlist = null; + + public RestfulCollectorServlet ( rrNvReadable settings ) throws loadException, missingReqdSetting + { + super ( settings, "collector", false ); + authid = settings.getString(CommonStartup.kSetting_authid,null); + if (authid != null) + { + String authpwdtemp = settings.getString(CommonStartup.kSetting_authpwd,null); + authpwd = new String(Base64.decodeBase64(authpwdtemp)); + } + authlist = settings.getString(CommonStartup.kSetting_authlist,null); + } + + + /** + * This is called once at server start. Use it to init any shared objects and setup the route mapping. + */ + @Override + protected void servletSetup () throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException + { + super.servletSetup (); + + try + { + // the base class provides a bunch of things like API authentication and ECOMP compliant + // logging. The Restful Collector likely doesn't need API authentication, so for now, + // we init the base class services with an in-memory (and empty!) config DB. + commonServletSetup ( ConfigDbType.MEMORY ); + + VESLogger.setUpEcompLogging(); + + // setup the servlet routing and error handling + final DrumlinRequestRouter drr = getRequestRouter (); + + // you can tell the request router what to do when a particular kind of exception is thrown. + drr.setHandlerForException( IllegalArgumentException.class, new DrumlinErrorHandler() + { + @Override + public void handle ( DrumlinRequestContext ctx, Throwable cause ) + { + sendJsonReply ( ctx, HttpStatusCodes.k400_badRequest, cause.getMessage() ); + } + }); + + // load the routes from the config file + final URL routes = findStream ( "routes.conf" ); + if ( routes == null ) throw new rrNvReadable.missingReqdSetting ( "No routing configuration." ); + final DrumlinPlayishRoutingFileSource drs = new DrumlinPlayishRoutingFileSource ( routes ); + drr.addRouteSource ( drs ); + + if (CommonStartup.authflag > 0) { + NsaAuthenticator<NsaSimpleApiKey> NsaAuth = new SimpleAuthenticator (); + if (authlist != null) + { + String authpair[] = authlist.split("\\|"); + for (String pair: authpair) { + String lineid[] = pair.split(","); + String listauthid = lineid[0]; + String listauthpwd = new String(Base64.decodeBase64(lineid[1])); + ((SimpleAuthenticator) NsaAuth).add(listauthid,listauthpwd); + } + + } + else if (authid != null) + { + ((SimpleAuthenticator) NsaAuth).add(authid,authpwd); + } + else + { + //add a default test account + ((SimpleAuthenticator) NsaAuth).add("admin","collectorpasscode"); + } + this.getSecurityManager().addAuthenticator(NsaAuth); + } + + log.info ( "Restful Collector Servlet is up." ); + } + catch ( SecurityException e ) + { + throw new ServletException ( e ); + } + catch ( IOException e ) + { + throw new ServletException ( e ); + } + catch ( ConfigDbException e ) + { + throw new ServletException ( e ); + } + } + + + + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class ); +} diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java new file mode 100644 index 00000000..159d483a --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java @@ -0,0 +1,286 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.restapi.endpoints; + +import java.io.FileReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.util.UUID; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.dcae.commonFunction.CommonStartup; +import org.onap.dcae.commonFunction.CustomExceptionLoader; +import org.onap.dcae.commonFunction.VESLogger; +import org.onap.dcae.commonFunction.CommonStartup.QueueFullException; + +import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; +import com.att.nsa.clock.SaClock; +import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.logging.LoggingContext; + +import com.att.nsa.logging.log4j.EcompFields; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; + +import com.google.gson.JsonParser; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class EventReceipt extends NsaBaseEndpoint { + static String valresult = null; + static JSONObject customerror = null; + + private static final Logger log = LoggerFactory.getLogger(EventReceipt.class); + + + public static void receiveVESEvent(DrumlinRequestContext ctx) throws IOException { + // the request body carries events. assume for now it's an array + // of json objects that fits in memory. (See cambria's parsing for + // handling large messages) + + NsaSimpleApiKey retkey = null; + + JSONArray jsonArray = null; + JSONArray jsonArrayMod = new JSONArray(); + JSONObject event = null; + JSONObject jsonObject = null; + FileReader fr = null; + InputStream istr = null; + int arrayflag = 0; + String vesversion = null; + + + + try { + //System.out.print("Version string:" + version); + + // String br = new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine(); + // JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine()); + // jsonArray = new JSONArray ( new JSONTokener ( ctx.request().getBodyStream () ) ); + + + log.debug ("Request recieved :" + ctx.request().getRemoteAddress()); + istr = ctx.request().getBodyStream(); + jsonObject = new JSONObject(new JSONTokener(istr)); + + log.info("ctx getPathInContext: " + ctx.request().getPathInContext()); + Pattern p = Pattern.compile("(v\\d+)"); + Matcher m = p.matcher(ctx.request().getPathInContext()); + + if (m.find()) { + log.info("VES version:" + m.group()); + vesversion = m.group(); + } + if (ctx.request().getPathInContext().contains("eventBatch")) + { + CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VES Batch Input Messsage: " + jsonObject); + log.info(ctx.request().getRemoteAddress() + "VES Batch Input Messsage: " + jsonObject); + arrayflag = 1; + } + else + { + CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); + log.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject); + + } + + + final UUID uuid = java.util.UUID.randomUUID(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); + + + + try { + if (CommonStartup.authflag == 1) { + retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx); + } + } catch (NullPointerException x) { + log.info( + "Invalid user request " + ctx.request().getContentType() + " Message:" + jsonObject.toString()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user"); + return; + } + + if (retkey != null || CommonStartup.authflag == 0) { + if (CommonStartup.schema_Validatorflag > 0) { + + //fr = new FileReader(CommonStartup.schemaFile); + fr = new FileReader(schemaFileVersion(vesversion)); + String schema = new JsonParser().parse(fr).toString(); + + valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema); + if (valresult.equals("true")) { + log.info("Validation successful"); + } else if (valresult.equals("false")) { + log.info("Validation failed"); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); + + return; + } else { + log.error("Validation errored" + valresult); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); + return; + + } + + + if (arrayflag ==1) { + jsonArray = jsonObject.getJSONArray("eventList"); + log.info("Validation successful for all events in batch"); + for (int i = 0; i < jsonArray.length(); i++) { + event = new JSONObject().put("event", jsonArray.getJSONObject(i)); + event.put("VESuniqueId", uuid + "-"+i); + event.put("VESversion", vesversion); + jsonArrayMod.put(event); + } + + log.info("Modified jsonarray:" + jsonArrayMod.toString()); + + } + else + { + + jsonObject.put("VESuniqueId", uuid); + jsonObject.put("VESversion", vesversion); + jsonArrayMod = new JSONArray().put(jsonObject); + } + + } + // reject anything that's not JSON + if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) { + log.info("Rejecting request with content type " + ctx.request().getContentType() + " Message:" + + jsonObject); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, + "Incorrect message content-type; only accepts application/json messages"); + return; + } + + CommonStartup.handleEvents(jsonArrayMod); + } else { + log.info( + "Unauthorized request " + ctx.request().getContentType() + " Message:" + jsonObject.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user"); + return; + } + } catch (JSONException | NullPointerException | IOException x) { + log.error("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest" + HttpStatusCodes.k400_badRequest + + " Message:" + x.getMessage()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); + return; + } catch (QueueFullException e) { + e.printStackTrace(); + log.error("Collector internal queue full :" + e.getMessage()); + CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e.toString()); + respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full"); + return; + } finally { + if (fr != null) { + safeClose(fr); + } + + if (istr != null) { + safeClose(istr); + } + } + log.info("MessageAccepted and k200_ok to be sent"); + ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson); + } + + + public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) { + String[] str = null; + String ExceptionType = "GeneralException"; + + str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg); + System.out.println("Post CustomExceptionLoader.LookupMap" + str); + + if (str != null) { + + if (str[0].matches("SVC")) { + ExceptionType = "ServiceException"; + } else if (str[1].matches("POL")) { + ExceptionType = "PolicyException"; + } + + JSONObject jb = new JSONObject().put("requestError", + new JSONObject().put(ExceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1]))); + + log.debug("Constructed json error : " + jb.toString()); + ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson); + } else { + JSONObject jb = new JSONObject().put("requestError", + new JSONObject().put(ExceptionType, new JSONObject().put("Status", sc).put("Error", msg))); + ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson); + } + + } + + public static void safeClose(FileReader fr) { + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + log.error("Error closing file reader stream : " + e.toString()); + } + } + + } + + public static void safeClose(InputStream is) { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + log.error("Error closing Input stream : " + e.toString()); + } + } + + } + + public static String schemaFileVersion(String version) + { + String filename = null; + + if (CommonStartup.schemaFileJson.has(version)) + { + filename = CommonStartup.schemaFileJson.getString(version); + } + else + { + filename = CommonStartup.schemaFile; + } + log.info("VESversion: " + version + " Schema File:" + filename); + return filename; + + } + + +} diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java b/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java new file mode 100644 index 00000000..36747660 --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.restapi.endpoints; + +import java.io.IOException; + +import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; +import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext; + +public class Ui extends NsaBaseEndpoint +{ + public static void hello ( DrumlinRequestContext ctx ) throws IOException + { + ctx.renderer ().renderTemplate ( "templates/hello.html" ); + } +} |