summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java456
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java6
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java15
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java379
-rw-r--r--src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java12
-rw-r--r--src/test/java/org/onap/dcae/vestest/EventTransformTest.java37
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestCommonStartup.java3
-rw-r--r--src/test/resources/event4xjson.txt7
8 files changed, 452 insertions, 463 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
index e782a1aa..cd23de87 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
@@ -20,7 +20,6 @@
package org.onap.dcae.commonFunction;
-
import com.att.nsa.apiServer.ApiServer;
import com.att.nsa.apiServer.ApiServerConnector;
import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
@@ -71,12 +70,12 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable {
public static final String KSETTING_PORT = "collector.service.port";
public static final int KDEFAULT_PORT = 8080;
-
+
public static final String KSETTING_SECUREPORT = "collector.service.secure.port";
- public static final int KDEFAULT_SECUREPORT = -1;
-
+ public static final int KDEFAULT_SECUREPORT = -1;
+
public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
- public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
+ public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
public static final String KSETTING_KEYALIAS = "collector.keystore.alias";
@@ -86,253 +85,226 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable {
protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";
- public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024*4;
-
+ public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
+
public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
- public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
-
+ public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
+
public static final String KSETTING_SCHEMAFILE = "collector.schema.file";
public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig";
-
+
public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
-
+
public static final String KSETTING_AUTHFLAG = "header.authflag";
public static final int KDEFAULT_AUTHFLAG = 0;
-
- public static final String kSetting_authid = "header.authid";
- public static final String kSetting_authpwd = "header.authpwd";
- public static final String kSetting_authstore = "header.authstore";
+
public static final String kSetting_authlist = "header.authlist";
-
+
public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
- public static final Logger inlog = LoggerFactory
- .getLogger("org.onap.dcae.commonFunction.input");
- public static final Logger oplog = LoggerFactory
- .getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory
- .getLogger("org.onap.dcae.commonFunction.error");
- public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
-
- public static int schema_Validatorflag = -1;
- public static int authflag = 1;
- public static int eventTransformFlag = 1;
- public static String schemaFile;
- public static JSONObject schemaFileJson;
- public static String exceptionConfig;
- public static String cambriaConfigFile;
- private boolean listnerstatus;
- public static String streamid;
-
-
- private CommonStartup(rrNvReadable settings)
- throws loadException, IOException, rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
- final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
-
- if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
- // http service
- connectors.add(
- new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT))
- .secure(false)
- .build()
- );
- }
-
- // optional https service
- final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
- final String keystoreFile = settings
- .getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
- final String keystorePasswordFile = settings
- .getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
- final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
-
- if (securePort > 0) {
- final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile,
- Charset.defaultCharset());
- connectors.add(new ApiServerConnector.Builder(securePort)
- .secure(true)
- .keystorePassword(KSETTING_KEYSTOREPASS)
- .keystoreFile(keystoreFile)
- .keyAlias(keyAlias)
- .build());
-
- }
-
- //Reading other config properties
-
- schema_Validatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
- if (schema_Validatorflag > 0) {
- schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
- //System.out.println("SchemaFile:" + schemaFile);
- schemaFileJson = new JSONObject(schemaFile);
-
- }
- exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);
- authflag = settings
- .getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
- String[] currentconffile = settings
- .getStrings(CommonStartup.KSETTING_DMAAPCONFIGS, CommonStartup.KDEFAULT_DMAAPCONFIGS);
- cambriaConfigFile = currentconffile[0];
- streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
- eventTransformFlag = settings
- .getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings))
- .encodeSlashes(true)
- .name("collector")
- .build();
-
- //Load override exception map
- CustomExceptionLoader.LoadMap();
- setListnerstatus(true);
- }
-
- public static void main(String[] args) {
- ExecutorService executor = null;
- try {
- // process command line arguments
- final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
- final String config = NsaCommandLineUtil
- .getSetting(argMap, KCONFIG, "collector.properties");
- final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
-
- final nvReadableStack settings = new nvReadableStack();
- settings.push(new nvPropertiesFile(settingStream));
- settings.push(new nvReadableTable(argMap));
-
- fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(
- CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(settings);
-
- Thread csmain = new Thread(cs);
- csmain.start();
-
- EventProcessor ep = new EventProcessor();
- //Thread epThread=new Thread(ep);
- //epThread.start();
- executor = Executors.newFixedThreadPool(20);
- executor.execute(ep);
-
- } catch (loadException | missingReqdSetting | IOException | invalidSettingValue |
- ServletException | InterruptedException e) {
- CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
- throw new RuntimeException(e);
- } finally {
- // This will make the executor accept no new threads
- // and finish all existing threads in the queue
- if (executor != null) {
- executor.shutdown();
- }
-
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- } catch (LifecycleException | IOException e) {
-
- LOG.error("lifecycle or IO: ", e);
- }
- fTomcatServer.await();
- }
-
- public boolean isListnerstatus() {
- return listnerstatus;
- }
-
- public void setListnerstatus(boolean listnerstatus) {
- this.listnerstatus = listnerstatus;
- }
-
- public static Queue<JSONObject> getProcessingInputQueue() {
- return fProcessingInputQueue;
- }
-
- public static class QueueFullException extends Exception {
-
- private static final long serialVersionUID = 1L;
- }
-
-
- public static void handleEvents(JSONArray a)
- throws QueueFullException, JSONException, IOException {
- final Queue<JSONObject> queue = getProcessingInputQueue();
- try {
-
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!queue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
-
- }
- LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- //ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
-
- } catch (JSONException e) {
- throw e;
-
- }
- }
-
-
- static String readFile(String path, Charset encoding)
- throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
-
- public static String schemavalidate(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- //System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to data: #<#<"+jsonData+">#>#");
- LOG.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- LOG.error("schemavalidate:JsonParseException for event:" + jsonData);
- return e.getMessage().toString();
- } catch (ProcessingException e) {
- LOG.error("schemavalidate:Processing exception for event:" + jsonData);
- return e.getMessage().toString();
- } catch (IOException e) {
- LOG.error(
- "schemavalidate:IO exception; something went wrong trying to read json data for event:"
- + jsonData);
- return e.getMessage().toString();
- }
- if (report != null) {
- Iterator<ProcessingMessage> iter = report.iterator();
- while (iter.hasNext()) {
- ProcessingMessage pm = iter.next();
- LOG.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- LOG.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- LOG.error("schemavalidate:NullpointerException on report");
- }
- return result;
- }
-
- public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger LOG = LoggerFactory.getLogger(CommonStartup.class);
+ public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
+ public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
+ public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+
+ public static int schema_Validatorflag = -1;
+ public static int authflag = 1;
+ public static int eventTransformFlag = 1;
+ public static String schemaFile;
+ public static JSONObject schemaFileJson;
+ public static String exceptionConfig;
+ public static String cambriaConfigFile;
+ private boolean listnerstatus;
+ public static String streamid;
+
+ private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,
+ rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
+ final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
+
+ if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
+ // http service
+ connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
+ .build());
+ }
+
+ // optional https service
+ final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
+ final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
+ final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
+ final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
+
+ if (securePort > 0) {
+ final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset());
+ connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
+ .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
+
+ }
+
+ // Reading other config properties
+
+ schema_Validatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
+ if (schema_Validatorflag > 0) {
+ schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
+ // System.out.println("SchemaFile:" + schemaFile);
+ schemaFileJson = new JSONObject(schemaFile);
+
+ }
+ exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);
+ authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
+ String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,
+ CommonStartup.KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentconffile[0];
+ streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
+ eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
+
+ fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
+ .name("collector").build();
+
+ // Load override exception map
+ CustomExceptionLoader.LoadMap();
+ setListnerstatus(true);
+ }
+
+ public static void main(String[] args) {
+ ExecutorService executor = null;
+ try {
+ // process command line arguments
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
+
+ VESLogger.setUpEcompLogging();
+
+ CommonStartup cs = new CommonStartup(settings);
+
+ Thread csmain = new Thread(cs);
+ csmain.start();
+
+ EventProcessor ep = new EventProcessor();
+ // Thread epThread=new Thread(ep);
+ // epThread.start();
+ executor = Executors.newFixedThreadPool(20);
+ executor.execute(ep);
+
+ } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
+ | InterruptedException e) {
+ CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
+ throw new RuntimeException(e);
+ } finally {
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ if (executor != null) {
+ executor.shutdown();
+ }
+
+ }
+ }
+
+ public void run() {
+ try {
+ fTomcatServer.start();
+ } catch (LifecycleException | IOException e) {
+
+ LOG.error("lifecycle or IO: ", e);
+ }
+ fTomcatServer.await();
+ }
+
+ public boolean isListnerstatus() {
+ return listnerstatus;
+ }
+
+ public void setListnerstatus(boolean listnerstatus) {
+ this.listnerstatus = listnerstatus;
+ }
+
+ public static Queue<JSONObject> getProcessingInputQueue() {
+ return fProcessingInputQueue;
+ }
+
+ public static class QueueFullException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {
+ final Queue<JSONObject> queue = getProcessingInputQueue();
+ try {
+
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START");
+ for (int i = 0; i < a.length(); i++) {
+ if (!queue.offer(a.getJSONObject(i))) {
+ throw new QueueFullException();
+ }
+
+ }
+ LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
+ // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
+
+ } catch (JSONException e) {
+ throw e;
+
+ }
+ }
+
+ static String readFile(String path, Charset encoding) throws IOException {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ String pwd = new String(encoded);
+ return pwd.substring(0, pwd.length() - 1);
+ }
+
+ public static String schemavalidate(String jsonData, String jsonSchema) {
+ ProcessingReport report;
+ String result = "false";
+
+ try {
+ // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to
+ // data: #<#<"+jsonData+">#>#");
+ LOG.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ LOG.error("schemavalidate:JsonParseException for event:" + jsonData);
+ return e.getMessage().toString();
+ } catch (ProcessingException e) {
+ LOG.error("schemavalidate:Processing exception for event:" + jsonData);
+ return e.getMessage().toString();
+ } catch (IOException e) {
+ LOG.error(
+ "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ return e.getMessage().toString();
+ }
+ if (report != null) {
+ Iterator<ProcessingMessage> iter = report.iterator();
+ while (iter.hasNext()) {
+ ProcessingMessage pm = iter.next();
+ LOG.trace("Processing Message: " + pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ LOG.debug("Validation Result:" + result + " Validation report:" + report);
+ } catch (NullPointerException e) {
+ LOG.error("schemavalidate:NullpointerException on report");
+ }
+ return result;
+ }
+
+ public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApiServer fTomcatServer = null;
+ private static final Logger LOG = LoggerFactory.getLogger(CommonStartup.class);
}
+
diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
index 351d8ade..9a056226 100644
--- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
+++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
@@ -21,6 +21,7 @@
package org.onap.dcae.commonFunction;
+
import java.text.DecimalFormat;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -334,7 +335,7 @@ public class ConfigProcessors {
}
}
else //if new array
- setEventObjectVal(field + "[0]", new JSONObject(value));
+ setEventObjectVal(field + "[0]", new JSONObject(value), "JArray");
}
else //oldfield is jsonArray
setEventObjectVal(field, new JSONArray(value));
@@ -657,8 +658,11 @@ public class ConfigProcessors {
}
else if (fieldType.equals("integer") && value instanceof String)
((JSONObject)keySeriesObj).put(keySet[keySet.length -1], Integer.valueOf((String) value));
+ else if (fieldType.equals("JArray"))
+ ((JSONArray)keySeriesObj).put( value);
else
((JSONObject)keySeriesObj).put(keySet[keySet.length -1], value);
+
}
private JSONObject event = new JSONObject();
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
index c6666d63..10a1db47 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
@@ -37,7 +37,6 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
-
public class CustomExceptionLoader {
protected static HashMap<String, JsonArray> map = null;
@@ -78,14 +77,10 @@ public class CustomExceptionLoader {
}
log.debug("CustomExceptionLoader.LoadMap --> Map loaded - " + map);
- } catch (JsonIOException e) {
- e.printStackTrace();
- } catch (JsonSyntaxException e) {
- e.printStackTrace();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (JsonIOException|JsonSyntaxException|FileNotFoundException e) {
+ log.error("Exception in LoadMap:" + e.getMessage());
+ //e.printStackTrace();
+ map = null;
}
finally {
if (fr != null) {
@@ -93,6 +88,7 @@ public class CustomExceptionLoader {
fr.close();
} catch (IOException e) {
log.error("Error closing file reader stream : " +e.toString());
+ map = null;
}
}
}
@@ -130,4 +126,3 @@ public class CustomExceptionLoader {
}
}
-
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
index cb4e3af6..b10f5882 100644
--- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
+++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
@@ -20,213 +20,194 @@
package org.onap.dcae.commonFunction;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
+import java.io.FileNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
public class DmaapPropertyReader {
- private static DmaapPropertyReader instance = null;
-
- private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class);
- private static final String TOPIC = "TOPIC:";
- private static final String HOST_URL = " HOST-URL:";
- private static final String CAMBRIA_URL = "cambria.url";
- private static final String CAMBRIA_HOSTS = "cambria.hosts";
- private static final String USER = " USER:";
- private static final String PWD = " PWD:";
- private static final String BA_PWD = "basicAuthPassword";
- private static final String BA_UNAME = "basicAuthUsername";
-
- public HashMap<String, String> dmaap_hash = new HashMap<String, String>();
-
- public DmaapPropertyReader(String cambriaConfigFile) {
-
- FileReader fr = null;
- try {
- JsonElement root;
- fr = new FileReader(cambriaConfigFile);
- root = new JsonParser().parse(fr);
-
- //Check if dmaap config is handled by legacy controller/service manager
- if (root.getAsJsonObject().has("channels")) {
- JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels");
-
- for (int i = 0; i < jsonObject.size(); i++) {
- log.debug(String.format("%s%s%s%s HOSTS:%s%s%s%s%s NAME:%s", TOPIC,
- jsonObject.get(i).getAsJsonObject().get("cambria.topic"), HOST_URL,
- jsonObject.get(i).getAsJsonObject().get(CAMBRIA_URL),
- jsonObject.get(i).getAsJsonObject().get(CAMBRIA_HOSTS), PWD,
- jsonObject.get(i).getAsJsonObject().get(BA_PWD), USER,
- jsonObject.get(i).getAsJsonObject().get(BA_UNAME),
- jsonObject.get(i).getAsJsonObject().get("name")));
-
- String convertedname = jsonObject.get(i).getAsJsonObject().get("name")
- .toString().replace("\"", "");
- dmaap_hash.put(convertedname + ".cambria.topic",
- jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString()
- .replace("\"", ""));
-
- if (jsonObject.get(i).getAsJsonObject().get(CAMBRIA_HOSTS) != null) {
- dmaap_hash.put(convertedname + ".cambria.hosts",
- jsonObject.get(i).getAsJsonObject().get(CAMBRIA_HOSTS).toString()
- .replace("\"", ""));
- }
- if (jsonObject.get(i).getAsJsonObject().get(CAMBRIA_URL) != null) {
- dmaap_hash.put(convertedname + ".cambria.url",
- jsonObject.get(i).getAsJsonObject().get(CAMBRIA_URL).toString()
- .replace("\"", ""));
- }
- if (jsonObject.get(i).getAsJsonObject().get(BA_PWD) != null) {
- dmaap_hash.put(convertedname + ".basicAuthPassword",
- jsonObject.get(i).getAsJsonObject()
- .get(BA_PWD).toString().replace("\"", ""));
- }
- if (jsonObject.get(i).getAsJsonObject().get(BA_UNAME) != null) {
- dmaap_hash.put(convertedname + ".basicAuthUsername",
- jsonObject.get(i).getAsJsonObject()
- .get(BA_UNAME).toString().replace("\"", ""));
- }
-
- }
- } else {
-
- //Handing new format from controllergen2/config_binding_service
- JsonObject jsonObject = root.getAsJsonObject();
- Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet();
-
- for (Map.Entry<String, JsonElement> entry : entries) {
-
- JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info")
- .getAsJsonObject().get("topic_url");
- String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", ""));
-
- String mrTopic = null;
- String mrUrl = null;
- String[] hostport = null;
- String username = null;
- String userpwd = null;
-
- try {
-
- if (null != urlParts) {
- mrUrl = urlParts[2];
-
- // DCAE internal dmaap topic convention
- if (urlParts[3].equals("events")) {
- mrTopic = urlParts[4];
- } else {
- // ONAP dmaap topic convention
- mrTopic = urlParts[3];
- hostport = mrUrl.split(":");
- }
-
- }
- } catch (NullPointerException e) {
- log.error("NullPointerException " + e.getLocalizedMessage(), e);
- }
-
- if (entry.getValue().getAsJsonObject().has("aaf_username")) {
- username = entry.getValue().getAsJsonObject().get("aaf_username").toString()
- .replace("\"", "");
- }
- if (entry.getValue().getAsJsonObject().has("aaf_password")) {
- userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString()
- .replace("\"", "");
- }
- if (hostport == null) {
- log.debug(
- String.format("%s%s%s%s%s%s%s%s", TOPIC, mrTopic, HOST_URL, mrUrl, PWD,
- userpwd, USER, username));
- } else {
- log.debug(
- String.format("%s%s%s%s HOSTS:%s%s%s%s%s NAME:%s", TOPIC, mrTopic,
- HOST_URL, mrUrl, hostport[0], PWD, userpwd, USER, username,
- entry.getKey()));
- }
-
- dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic);
-
- if (hostport != null) {
- dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]);
- }
-
- if (mrUrl != null) {
- dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl);
- }
-
- if (username != null) {
- dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username);
- }
-
- if (userpwd != null) {
- dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd);
- }
-
- }
-
- }
-
- fr.close();
- } catch (JsonIOException | JsonSyntaxException | IOException e1) {
- log.error("Problem loading Dmaap Channel configuration file: " +
- e1.getLocalizedMessage(), e1);
- } finally {
- if (fr != null) {
- try {
- fr.close();
- } catch (IOException e) {
- log.error("Error closing file reader stream : " + e);
- }
- }
- }
-
- }
-
- /***
- * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/
- * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>:
- * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1";
- *
- * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>.
- * <topic>,
- */
-
- private String[] dmaapUrlSplit(String dmUrl) {
- String[] multUrls = dmUrl.split(",");
-
- StringBuilder newUrls = new StringBuilder();
- String[] urlParts = null;
- for (int i = 0; i < multUrls.length; i++) {
- urlParts = multUrls[i].split("/");
- if (i == 0) {
- newUrls = newUrls.append(urlParts[2]);
- } else {
- newUrls = newUrls.append(",").append(urlParts[2]);
- }
- }
- return urlParts;
- }
-
- public static synchronized DmaapPropertyReader getInstance(String channelConfig) {
- if (instance == null) {
- instance = new DmaapPropertyReader(channelConfig);
- }
- return instance;
- }
-
- public String getKeyValue(String hashKey) {
- return dmaap_hash.get(hashKey);
- }
+ private static DmaapPropertyReader instance = null;
+
+ private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class);
+
+ public HashMap<String, String> dmaap_hash = new HashMap<String, String>();
+
+ public DmaapPropertyReader(String CambriaConfigFile) {
+
+ FileReader fr = null;
+ try {
+ JsonElement root = null;
+ fr = new FileReader(CambriaConfigFile);
+ root = new JsonParser().parse(fr);
+
+ //Check if dmaap config is handled by legacy controller/service manager
+ if (root.getAsJsonObject().has("channels")) {
+ JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels");
+
+ for (int i = 0; i < jsonObject.size(); i++) {
+ log.debug("TOPIC:" + jsonObject.get(i).getAsJsonObject().get("cambria.topic") + " HOST-URL:"
+ + jsonObject.get(i).getAsJsonObject().get("cambria.url") + " HOSTS:"
+ + jsonObject.get(i).getAsJsonObject().get("cambria.hosts") + " PWD:"
+ + jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") + " USER:"
+ + jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") + " NAME:"
+ + jsonObject.get(i).getAsJsonObject().get("name"));
+
+ String convertedname = jsonObject.get(i).getAsJsonObject().get("name").toString().replace("\"", "");
+ dmaap_hash.put(convertedname + ".cambria.topic",
+ jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString().replace("\"", ""));
+
+ if (jsonObject.get(i).getAsJsonObject().get("cambria.hosts") != null) {
+ dmaap_hash.put(convertedname + ".cambria.hosts",
+ jsonObject.get(i).getAsJsonObject().get("cambria.hosts").toString().replace("\"", ""));
+ }
+ if (jsonObject.get(i).getAsJsonObject().get("cambria.url") != null) {
+ dmaap_hash.put(convertedname + ".cambria.url",
+ jsonObject.get(i).getAsJsonObject().get("cambria.url").toString().replace("\"", ""));
+ }
+ if (jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") != null) {
+ dmaap_hash.put(convertedname + ".basicAuthPassword", jsonObject.get(i).getAsJsonObject()
+ .get("basicAuthPassword").toString().replace("\"", ""));
+ }
+ if (jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") != null) {
+ dmaap_hash.put(convertedname + ".basicAuthUsername", jsonObject.get(i).getAsJsonObject()
+ .get("basicAuthUsername").toString().replace("\"", ""));
+ }
+
+ }
+ } else {
+
+ //Handing new format from controllergen2/config_binding_service
+ JsonObject jsonObject = root.getAsJsonObject();
+ Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet();
+
+ for (Map.Entry<String, JsonElement> entry : entries) {
+
+ JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info").getAsJsonObject().get("topic_url");
+ String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", ""));
+
+ String mrTopic = null;
+ String mrUrl = null;
+ String[] hostport = null;
+ String username = null;
+ String userpwd = null;
+
+ try {
+
+ if (null != urlParts) {
+ mrUrl = urlParts[2];
+
+ // DCAE internal dmaap topic convention
+ if (urlParts[3].equals("events")) {
+ mrTopic = urlParts[4];
+ } else {
+ // ONAP dmaap topic convention
+ mrTopic = urlParts[3];
+ hostport = mrUrl.split(":");
+ }
+
+ }
+ } catch (NullPointerException e) {
+ System.out.println("NullPointerException");
+ e.getMessage();
+ }
+
+ if (entry.getValue().getAsJsonObject().has("aaf_username")) {
+ username = entry.getValue().getAsJsonObject().get("aaf_username").toString().replace("\"", "");
+ }
+ if (entry.getValue().getAsJsonObject().has("aaf_password")) {
+ userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString().replace("\"", "");
+ }
+ if (hostport == null) {
+ log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " PWD:" + userpwd + " USER:" + username);
+ } else {
+ log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " HOSTS:" + hostport[0] + " PWD:"
+ + userpwd + " USER:" + username + " NAME:" + entry.getKey());
+ }
+
+ dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic);
+
+ if (!(hostport == null)) {
+ dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]);
+ }
+
+ if (!(mrUrl == null)) {
+ dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl);
+ }
+
+ if (!(username == null)) {
+ dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username);
+ }
+
+ if (!(userpwd == null)) {
+ dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd);
+ }
+
+ }
+
+ }
+
+ } catch (JsonIOException | JsonSyntaxException |
+
+ FileNotFoundException e1) {
+ e1.printStackTrace();
+ log.error("Problem loading Dmaap Channel configuration file: " + e1.toString());
+ } finally {
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ log.error("Error closing file reader stream : " + e.toString());
+ }
+ }
+ }
+
+ }
+
+ /***
+ * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/
+ * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>:
+ * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1";
+ *
+ * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>.
+ * <topic>,
+ */
+
+ private String[] dmaapUrlSplit(String dmUrl) {
+ String[] multUrls = dmUrl.split(",");
+
+ StringBuffer newUrls = new StringBuffer();
+ String urlParts[] = null;
+ for (int i = 0; i < multUrls.length; i++) {
+ urlParts = multUrls[i].split("/");
+ if (i == 0) {
+ newUrls = newUrls.append(urlParts[2]);
+ } else {
+ newUrls = newUrls.append(",").append(urlParts[2]);
+ }
+ }
+ return urlParts;
+ }
+
+ public static synchronized DmaapPropertyReader getInstance(String ChannelConfig) {
+ if (instance == null) {
+ instance = new DmaapPropertyReader(ChannelConfig);
+ }
+ return instance;
+ }
+
+ public String getKeyValue(String HashKey) {
+ return this.dmaap_hash.get(HashKey);
+ }
}
diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
index fee9ce1c..a8bfc24e 100644
--- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
+++ b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
@@ -47,21 +47,22 @@ import com.att.nsa.security.NsaAuthenticator;
import com.att.nsa.security.authenticators.SimpleAuthenticator;
import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+
public class RestfulCollectorServlet extends CommonServlet
{
- String authid;
- String authpwd;
+ String authid=null;
+ String authpwd=null;
public String authlist;
public RestfulCollectorServlet ( rrNvReadable settings ) throws loadException, missingReqdSetting
{
super ( settings, "collector", false );
- authid = settings.getString(CommonStartup.kSetting_authid,null);
- if (authid != null)
+ //authid = settings.getString(CommonStartup.kSetting_authid,null);
+ /*if (authid != null)
{
String authpwdtemp = settings.getString(CommonStartup.kSetting_authpwd,null);
authpwd = new String(Base64.decodeBase64(authpwdtemp));
- }
+ }*/
authlist = settings.getString(CommonStartup.kSetting_authlist,null);
}
@@ -158,4 +159,3 @@ public class RestfulCollectorServlet extends CommonServlet
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class );
}
-
diff --git a/src/test/java/org/onap/dcae/vestest/EventTransformTest.java b/src/test/java/org/onap/dcae/vestest/EventTransformTest.java
index 8a85117b..96e965cf 100644
--- a/src/test/java/org/onap/dcae/vestest/EventTransformTest.java
+++ b/src/test/java/org/onap/dcae/vestest/EventTransformTest.java
@@ -23,10 +23,8 @@ import static org.junit.Assert.assertEquals;
import java.io.FileReader;
import java.io.IOException;
-
import org.json.JSONObject;
import com.google.gson.JsonParser;
-
import com.google.gson.JsonObject;
import org.junit.Test;
@@ -63,7 +61,6 @@ public class EventTransformTest {
}
return jsonObject;
}
-
@Test
public void testAttrMap(){
@@ -95,7 +92,23 @@ public class EventTransformTest {
System.out.println("responseData==" + responseData);
assertEquals (alarmAdditionalInformation, responseData);
}
-
+ @Test
+ public void testJobjMaptoArray(){
+
+ final JSONObject jsonObject = getFileAsJsonObject();
+ //final String receiveDiscards = (((jsonObject.getJSONObject("event")).getJSONObject("faultFields")).get("errors")).get("receiveDiscards").toString();
+ System.out.println("event==" + jsonObject.toString());
+ //System.out.println("alarmAdditionalInformation==" + alarmAdditionalInformation);
+ final JSONObject jsonArgs = new JSONObject ( "{\"field\": \"event.faultFields.vNicPerformanceArray[]\",\"oldField\": \"event.faultFields.errors\",\"attrMap\":{\"receiveDiscards\":\"receivedDiscardedPacketsAccumulated\"}}" );
+ ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
+ final String receiveDiscards = cpEvent.getEventObjectVal("event.faultFields.errors.receiveDiscards").toString();
+ System.out.println("receiveDiscards==" + receiveDiscards);
+ cpEvent.map(jsonArgs);
+ final String responseData = cpEvent.getEventObjectVal("event.faultFields.vNicPerformanceArray[0].receivedDiscardedPacketsAccumulated").toString();
+ System.out.println("modified event==" + jsonObject.toString());
+ System.out.println("responseData==" + responseData);
+ assertEquals (receiveDiscards, responseData);
+ }
@Test
public void testAttrAdd(){
@@ -113,6 +126,22 @@ public class EventTransformTest {
}
@Test
+ public void testAttrUpdate(){
+
+ final JSONObject jsonObject = getFileAsJsonObject();
+ //final String functionRole = (jsonObject.getJSONObject("event")).getJSONObject("commonEventHeader").get("functionalRole").toString();
+ System.out.println("event==" + jsonObject.toString());
+ //System.out.println("functionRole==" + functionRole);
+ final JSONObject jsonArgs = new JSONObject ( "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}" );
+ ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
+ cpEvent.updateAttribute(jsonArgs);
+ final String responseData = cpEvent.getEventObjectVal("event.faultFields.version").toString();
+ System.out.println("modified event==" + jsonObject.toString());
+ System.out.println("responseData==" + responseData);
+ assertEquals ("2.0", responseData);
+ }
+
+ @Test
public void testAttrConcatenate(){
final JSONObject jsonObject = getFileAsJsonObject();
diff --git a/src/test/java/org/onap/dcae/vestest/TestCommonStartup.java b/src/test/java/org/onap/dcae/vestest/TestCommonStartup.java
index 06f70183..0cf90759 100644
--- a/src/test/java/org/onap/dcae/vestest/TestCommonStartup.java
+++ b/src/test/java/org/onap/dcae/vestest/TestCommonStartup.java
@@ -49,6 +49,7 @@ import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+
public class TestCommonStartup {
String payload = null;
@@ -57,7 +58,7 @@ public class TestCommonStartup {
// process command line arguments
payload = new JsonParser().parse(new FileReader("src/test/resources/VES_valid.txt")).toString();
- CommonStartup.fProcessingInputQueue = new LinkedBlockingQueue<JSONObject> (CommonStartup.kDefault_MaxQueuedEvents);
+ CommonStartup.fProcessingInputQueue = new LinkedBlockingQueue<JSONObject> (CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
}
@After
diff --git a/src/test/resources/event4xjson.txt b/src/test/resources/event4xjson.txt
index 46c5fb51..318d5aed 100644
--- a/src/test/resources/event4xjson.txt
+++ b/src/test/resources/event4xjson.txt
@@ -1,4 +1,5 @@
{
+
"event": {
"commonEventHeader": {
"sourceId": "owb-rdm-003",
@@ -16,6 +17,12 @@
"sourceName": "owb-rdm-003"
},
"faultFields": {
+"errors": {
+ "receiveDiscards": 87665,
+ "transmitDiscards": 99999999,
+ "receiveErrors": 888888888,
+ "transmitErrors": 9000000001111
+},
"eventSeverity": "CRITICAL",
"alarmCondition": "lossOfSignal",
"faultFieldsVersion": 1.0,