diff options
Diffstat (limited to 'src/main/java')
5 files changed, 410 insertions, 458 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java index e782a1aa..cd23de87 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java +++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java @@ -20,7 +20,6 @@ package org.onap.dcae.commonFunction; - import com.att.nsa.apiServer.ApiServer; import com.att.nsa.apiServer.ApiServerConnector; import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; @@ -71,12 +70,12 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { public static final String KSETTING_PORT = "collector.service.port"; public static final int KDEFAULT_PORT = 8080; - + public static final String KSETTING_SECUREPORT = "collector.service.secure.port"; - public static final int KDEFAULT_SECUREPORT = -1; - + public static final int KDEFAULT_SECUREPORT = -1; + public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile"; - public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile"; + public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile"; public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location"; public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore"; public static final String KSETTING_KEYALIAS = "collector.keystore.alias"; @@ -86,253 +85,226 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" }; public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending"; - public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024*4; - + public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4; + public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag"; - public static final int KDEFAULT_SCHEMAVALIDATOR = -1; - + public static final int KDEFAULT_SCHEMAVALIDATOR = -1; + public static final String KSETTING_SCHEMAFILE = "collector.schema.file"; public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"; public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig"; - + public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid"; - + public static final String KSETTING_AUTHFLAG = "header.authflag"; public static final int KDEFAULT_AUTHFLAG = 0; - - public static final String kSetting_authid = "header.authid"; - public static final String kSetting_authpwd = "header.authpwd"; - public static final String kSetting_authstore = "header.authstore"; + public static final String kSetting_authlist = "header.authlist"; - + public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag"; public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1; - public static final Logger inlog = LoggerFactory - .getLogger("org.onap.dcae.commonFunction.input"); - public static final Logger oplog = LoggerFactory - .getLogger("org.onap.dcae.commonFunction.output"); - public static final Logger eplog = LoggerFactory - .getLogger("org.onap.dcae.commonFunction.error"); - public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - - public static int schema_Validatorflag = -1; - public static int authflag = 1; - public static int eventTransformFlag = 1; - public static String schemaFile; - public static JSONObject schemaFileJson; - public static String exceptionConfig; - public static String cambriaConfigFile; - private boolean listnerstatus; - public static String streamid; - - - private CommonStartup(rrNvReadable settings) - throws loadException, IOException, rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, InterruptedException { - final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>(); - - if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) { - // http service - connectors.add( - new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)) - .secure(false) - .build() - ); - } - - // optional https service - final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT); - final String keystoreFile = settings - .getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE); - final String keystorePasswordFile = settings - .getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE); - final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS); - - if (securePort > 0) { - final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, - Charset.defaultCharset()); - connectors.add(new ApiServerConnector.Builder(securePort) - .secure(true) - .keystorePassword(KSETTING_KEYSTOREPASS) - .keystoreFile(keystoreFile) - .keyAlias(keyAlias) - .build()); - - } - - //Reading other config properties - - schema_Validatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR); - if (schema_Validatorflag > 0) { - schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE); - //System.out.println("SchemaFile:" + schemaFile); - schemaFileJson = new JSONObject(schemaFile); - - } - exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null); - authflag = settings - .getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG); - String[] currentconffile = settings - .getStrings(CommonStartup.KSETTING_DMAAPCONFIGS, CommonStartup.KDEFAULT_DMAAPCONFIGS); - cambriaConfigFile = currentconffile[0]; - streamid = settings.getString(KSETTING_DMAAPSTREAMID, null); - eventTransformFlag = settings - .getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG); - - fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)) - .encodeSlashes(true) - .name("collector") - .build(); - - //Load override exception map - CustomExceptionLoader.LoadMap(); - setListnerstatus(true); - } - - public static void main(String[] args) { - ExecutorService executor = null; - try { - // process command line arguments - final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true); - final String config = NsaCommandLineUtil - .getSetting(argMap, KCONFIG, "collector.properties"); - final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class); - - final nvReadableStack settings = new nvReadableStack(); - settings.push(new nvPropertiesFile(settingStream)); - settings.push(new nvReadableTable(argMap)); - - fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>( - CommonStartup.KDEFAULT_MAXQUEUEDEVENTS); - - VESLogger.setUpEcompLogging(); - - CommonStartup cs = new CommonStartup(settings); - - Thread csmain = new Thread(cs); - csmain.start(); - - EventProcessor ep = new EventProcessor(); - //Thread epThread=new Thread(ep); - //epThread.start(); - executor = Executors.newFixedThreadPool(20); - executor.execute(ep); - - } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | - ServletException | InterruptedException e) { - CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage()); - throw new RuntimeException(e); - } finally { - // This will make the executor accept no new threads - // and finish all existing threads in the queue - if (executor != null) { - executor.shutdown(); - } - - } - } - - public void run() { - try { - fTomcatServer.start(); - } catch (LifecycleException | IOException e) { - - LOG.error("lifecycle or IO: ", e); - } - fTomcatServer.await(); - } - - public boolean isListnerstatus() { - return listnerstatus; - } - - public void setListnerstatus(boolean listnerstatus) { - this.listnerstatus = listnerstatus; - } - - public static Queue<JSONObject> getProcessingInputQueue() { - return fProcessingInputQueue; - } - - public static class QueueFullException extends Exception { - - private static final long serialVersionUID = 1L; - } - - - public static void handleEvents(JSONArray a) - throws QueueFullException, JSONException, IOException { - final Queue<JSONObject> queue = getProcessingInputQueue(); - try { - - CommonStartup.metriclog.info("EVENT_PUBLISH_START"); - for (int i = 0; i < a.length(); i++) { - if (!queue.offer(a.getJSONObject(i))) { - throw new QueueFullException(); - } - - } - LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - CommonStartup.metriclog.info("EVENT_PUBLISH_END"); - //ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS); - - } catch (JSONException e) { - throw e; - - } - } - - - static String readFile(String path, Charset encoding) - throws IOException { - byte[] encoded = Files.readAllBytes(Paths.get(path)); - String pwd = new String(encoded); - return pwd.substring(0, pwd.length() - 1); - } - - - public static String schemavalidate(String jsonData, String jsonSchema) { - ProcessingReport report; - String result = "false"; - - try { - //System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to data: #<#<"+jsonData+">#>#"); - LOG.trace("Schema validation for event:" + jsonData); - JsonNode schemaNode = JsonLoader.fromString(jsonSchema); - JsonNode data = JsonLoader.fromString(jsonData); - JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); - JsonSchema schema = factory.getJsonSchema(schemaNode); - report = schema.validate(data); - } catch (JsonParseException e) { - LOG.error("schemavalidate:JsonParseException for event:" + jsonData); - return e.getMessage().toString(); - } catch (ProcessingException e) { - LOG.error("schemavalidate:Processing exception for event:" + jsonData); - return e.getMessage().toString(); - } catch (IOException e) { - LOG.error( - "schemavalidate:IO exception; something went wrong trying to read json data for event:" - + jsonData); - return e.getMessage().toString(); - } - if (report != null) { - Iterator<ProcessingMessage> iter = report.iterator(); - while (iter.hasNext()) { - ProcessingMessage pm = iter.next(); - LOG.trace("Processing Message: " + pm.getMessage()); - } - result = String.valueOf(report.isSuccess()); - } - try { - LOG.debug("Validation Result:" + result + " Validation report:" + report); - } catch (NullPointerException e) { - LOG.error("schemavalidate:NullpointerException on report"); - } - return result; - } - - public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; - private static ApiServer fTomcatServer = null; - private static final Logger LOG = LoggerFactory.getLogger(CommonStartup.class); + public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input"); + public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output"); + public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error"); + public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + + public static int schema_Validatorflag = -1; + public static int authflag = 1; + public static int eventTransformFlag = 1; + public static String schemaFile; + public static JSONObject schemaFileJson; + public static String exceptionConfig; + public static String cambriaConfigFile; + private boolean listnerstatus; + public static String streamid; + + private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting, + rrNvReadable.invalidSettingValue, ServletException, InterruptedException { + final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>(); + + if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) { + // http service + connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false) + .build()); + } + + // optional https service + final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT); + final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE); + final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE); + final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS); + + if (securePort > 0) { + final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset()); + connectors.add(new ApiServerConnector.Builder(securePort).secure(true) + .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build()); + + } + + // Reading other config properties + + schema_Validatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR); + if (schema_Validatorflag > 0) { + schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE); + // System.out.println("SchemaFile:" + schemaFile); + schemaFileJson = new JSONObject(schemaFile); + + } + exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null); + authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG); + String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS, + CommonStartup.KDEFAULT_DMAAPCONFIGS); + cambriaConfigFile = currentconffile[0]; + streamid = settings.getString(KSETTING_DMAAPSTREAMID, null); + eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG); + + fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true) + .name("collector").build(); + + // Load override exception map + CustomExceptionLoader.LoadMap(); + setListnerstatus(true); + } + + public static void main(String[] args) { + ExecutorService executor = null; + try { + // process command line arguments + final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true); + final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties"); + final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class); + + final nvReadableStack settings = new nvReadableStack(); + settings.push(new nvPropertiesFile(settingStream)); + settings.push(new nvReadableTable(argMap)); + + fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS); + + VESLogger.setUpEcompLogging(); + + CommonStartup cs = new CommonStartup(settings); + + Thread csmain = new Thread(cs); + csmain.start(); + + EventProcessor ep = new EventProcessor(); + // Thread epThread=new Thread(ep); + // epThread.start(); + executor = Executors.newFixedThreadPool(20); + executor.execute(ep); + + } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException + | InterruptedException e) { + CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage()); + throw new RuntimeException(e); + } finally { + // This will make the executor accept no new threads + // and finish all existing threads in the queue + if (executor != null) { + executor.shutdown(); + } + + } + } + + public void run() { + try { + fTomcatServer.start(); + } catch (LifecycleException | IOException e) { + + LOG.error("lifecycle or IO: ", e); + } + fTomcatServer.await(); + } + + public boolean isListnerstatus() { + return listnerstatus; + } + + public void setListnerstatus(boolean listnerstatus) { + this.listnerstatus = listnerstatus; + } + + public static Queue<JSONObject> getProcessingInputQueue() { + return fProcessingInputQueue; + } + + public static class QueueFullException extends Exception { + + private static final long serialVersionUID = 1L; + } + + public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException { + final Queue<JSONObject> queue = getProcessingInputQueue(); + try { + + CommonStartup.metriclog.info("EVENT_PUBLISH_START"); + for (int i = 0; i < a.length(); i++) { + if (!queue.offer(a.getJSONObject(i))) { + throw new QueueFullException(); + } + + } + LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + CommonStartup.metriclog.info("EVENT_PUBLISH_END"); + // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS); + + } catch (JSONException e) { + throw e; + + } + } + + static String readFile(String path, Charset encoding) throws IOException { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + String pwd = new String(encoded); + return pwd.substring(0, pwd.length() - 1); + } + + public static String schemavalidate(String jsonData, String jsonSchema) { + ProcessingReport report; + String result = "false"; + + try { + // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to + // data: #<#<"+jsonData+">#>#"); + LOG.trace("Schema validation for event:" + jsonData); + JsonNode schemaNode = JsonLoader.fromString(jsonSchema); + JsonNode data = JsonLoader.fromString(jsonData); + JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); + JsonSchema schema = factory.getJsonSchema(schemaNode); + report = schema.validate(data); + } catch (JsonParseException e) { + LOG.error("schemavalidate:JsonParseException for event:" + jsonData); + return e.getMessage().toString(); + } catch (ProcessingException e) { + LOG.error("schemavalidate:Processing exception for event:" + jsonData); + return e.getMessage().toString(); + } catch (IOException e) { + LOG.error( + "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData); + return e.getMessage().toString(); + } + if (report != null) { + Iterator<ProcessingMessage> iter = report.iterator(); + while (iter.hasNext()) { + ProcessingMessage pm = iter.next(); + LOG.trace("Processing Message: " + pm.getMessage()); + } + result = String.valueOf(report.isSuccess()); + } + try { + LOG.debug("Validation Result:" + result + " Validation report:" + report); + } catch (NullPointerException e) { + LOG.error("schemavalidate:NullpointerException on report"); + } + return result; + } + + public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; + private static ApiServer fTomcatServer = null; + private static final Logger LOG = LoggerFactory.getLogger(CommonStartup.class); } + diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java index 351d8ade..9a056226 100644 --- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java +++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java @@ -21,6 +21,7 @@ package org.onap.dcae.commonFunction; + import java.text.DecimalFormat; import org.json.JSONArray; import org.json.JSONObject; @@ -334,7 +335,7 @@ public class ConfigProcessors { } } else //if new array - setEventObjectVal(field + "[0]", new JSONObject(value)); + setEventObjectVal(field + "[0]", new JSONObject(value), "JArray"); } else //oldfield is jsonArray setEventObjectVal(field, new JSONArray(value)); @@ -657,8 +658,11 @@ public class ConfigProcessors { } else if (fieldType.equals("integer") && value instanceof String) ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], Integer.valueOf((String) value)); + else if (fieldType.equals("JArray")) + ((JSONArray)keySeriesObj).put( value); else ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], value); + } private JSONObject event = new JSONObject(); } diff --git a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java index c6666d63..10a1db47 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java +++ b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java @@ -37,7 +37,6 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; - public class CustomExceptionLoader { protected static HashMap<String, JsonArray> map = null; @@ -78,14 +77,10 @@ public class CustomExceptionLoader { } log.debug("CustomExceptionLoader.LoadMap --> Map loaded - " + map); - } catch (JsonIOException e) { - e.printStackTrace(); - } catch (JsonSyntaxException e) { - e.printStackTrace(); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); + } catch (JsonIOException|JsonSyntaxException|FileNotFoundException e) { + log.error("Exception in LoadMap:" + e.getMessage()); + //e.printStackTrace(); + map = null; } finally { if (fr != null) { @@ -93,6 +88,7 @@ public class CustomExceptionLoader { fr.close(); } catch (IOException e) { log.error("Error closing file reader stream : " +e.toString()); + map = null; } } } @@ -130,4 +126,3 @@ public class CustomExceptionLoader { } } - diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java index cb4e3af6..b10f5882 100644 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java +++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java @@ -20,213 +20,194 @@ package org.onap.dcae.commonFunction; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonIOException; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; +import java.io.FileNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; + public class DmaapPropertyReader { - private static DmaapPropertyReader instance = null; - - private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class); - private static final String TOPIC = "TOPIC:"; - private static final String HOST_URL = " HOST-URL:"; - private static final String CAMBRIA_URL = "cambria.url"; - private static final String CAMBRIA_HOSTS = "cambria.hosts"; - private static final String USER = " USER:"; - private static final String PWD = " PWD:"; - private static final String BA_PWD = "basicAuthPassword"; - private static final String BA_UNAME = "basicAuthUsername"; - - public HashMap<String, String> dmaap_hash = new HashMap<String, String>(); - - public DmaapPropertyReader(String cambriaConfigFile) { - - FileReader fr = null; - try { - JsonElement root; - fr = new FileReader(cambriaConfigFile); - root = new JsonParser().parse(fr); - - //Check if dmaap config is handled by legacy controller/service manager - if (root.getAsJsonObject().has("channels")) { - JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels"); - - for (int i = 0; i < jsonObject.size(); i++) { - log.debug(String.format("%s%s%s%s HOSTS:%s%s%s%s%s NAME:%s", TOPIC, - jsonObject.get(i).getAsJsonObject().get("cambria.topic"), HOST_URL, - jsonObject.get(i).getAsJsonObject().get(CAMBRIA_URL), - jsonObject.get(i).getAsJsonObject().get(CAMBRIA_HOSTS), PWD, - jsonObject.get(i).getAsJsonObject().get(BA_PWD), USER, - jsonObject.get(i).getAsJsonObject().get(BA_UNAME), - jsonObject.get(i).getAsJsonObject().get("name"))); - - String convertedname = jsonObject.get(i).getAsJsonObject().get("name") - .toString().replace("\"", ""); - dmaap_hash.put(convertedname + ".cambria.topic", - jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString() - .replace("\"", "")); - - if (jsonObject.get(i).getAsJsonObject().get(CAMBRIA_HOSTS) != null) { - dmaap_hash.put(convertedname + ".cambria.hosts", - jsonObject.get(i).getAsJsonObject().get(CAMBRIA_HOSTS).toString() - .replace("\"", "")); - } - if (jsonObject.get(i).getAsJsonObject().get(CAMBRIA_URL) != null) { - dmaap_hash.put(convertedname + ".cambria.url", - jsonObject.get(i).getAsJsonObject().get(CAMBRIA_URL).toString() - .replace("\"", "")); - } - if (jsonObject.get(i).getAsJsonObject().get(BA_PWD) != null) { - dmaap_hash.put(convertedname + ".basicAuthPassword", - jsonObject.get(i).getAsJsonObject() - .get(BA_PWD).toString().replace("\"", "")); - } - if (jsonObject.get(i).getAsJsonObject().get(BA_UNAME) != null) { - dmaap_hash.put(convertedname + ".basicAuthUsername", - jsonObject.get(i).getAsJsonObject() - .get(BA_UNAME).toString().replace("\"", "")); - } - - } - } else { - - //Handing new format from controllergen2/config_binding_service - JsonObject jsonObject = root.getAsJsonObject(); - Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet(); - - for (Map.Entry<String, JsonElement> entry : entries) { - - JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info") - .getAsJsonObject().get("topic_url"); - String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", "")); - - String mrTopic = null; - String mrUrl = null; - String[] hostport = null; - String username = null; - String userpwd = null; - - try { - - if (null != urlParts) { - mrUrl = urlParts[2]; - - // DCAE internal dmaap topic convention - if (urlParts[3].equals("events")) { - mrTopic = urlParts[4]; - } else { - // ONAP dmaap topic convention - mrTopic = urlParts[3]; - hostport = mrUrl.split(":"); - } - - } - } catch (NullPointerException e) { - log.error("NullPointerException " + e.getLocalizedMessage(), e); - } - - if (entry.getValue().getAsJsonObject().has("aaf_username")) { - username = entry.getValue().getAsJsonObject().get("aaf_username").toString() - .replace("\"", ""); - } - if (entry.getValue().getAsJsonObject().has("aaf_password")) { - userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString() - .replace("\"", ""); - } - if (hostport == null) { - log.debug( - String.format("%s%s%s%s%s%s%s%s", TOPIC, mrTopic, HOST_URL, mrUrl, PWD, - userpwd, USER, username)); - } else { - log.debug( - String.format("%s%s%s%s HOSTS:%s%s%s%s%s NAME:%s", TOPIC, mrTopic, - HOST_URL, mrUrl, hostport[0], PWD, userpwd, USER, username, - entry.getKey())); - } - - dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic); - - if (hostport != null) { - dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]); - } - - if (mrUrl != null) { - dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl); - } - - if (username != null) { - dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username); - } - - if (userpwd != null) { - dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd); - } - - } - - } - - fr.close(); - } catch (JsonIOException | JsonSyntaxException | IOException e1) { - log.error("Problem loading Dmaap Channel configuration file: " + - e1.getLocalizedMessage(), e1); - } finally { - if (fr != null) { - try { - fr.close(); - } catch (IOException e) { - log.error("Error closing file reader stream : " + e); - } - } - } - - } - - /*** - * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/ - * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>: - * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1"; - * - * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>. - * <topic>, - */ - - private String[] dmaapUrlSplit(String dmUrl) { - String[] multUrls = dmUrl.split(","); - - StringBuilder newUrls = new StringBuilder(); - String[] urlParts = null; - for (int i = 0; i < multUrls.length; i++) { - urlParts = multUrls[i].split("/"); - if (i == 0) { - newUrls = newUrls.append(urlParts[2]); - } else { - newUrls = newUrls.append(",").append(urlParts[2]); - } - } - return urlParts; - } - - public static synchronized DmaapPropertyReader getInstance(String channelConfig) { - if (instance == null) { - instance = new DmaapPropertyReader(channelConfig); - } - return instance; - } - - public String getKeyValue(String hashKey) { - return dmaap_hash.get(hashKey); - } + private static DmaapPropertyReader instance = null; + + private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class); + + public HashMap<String, String> dmaap_hash = new HashMap<String, String>(); + + public DmaapPropertyReader(String CambriaConfigFile) { + + FileReader fr = null; + try { + JsonElement root = null; + fr = new FileReader(CambriaConfigFile); + root = new JsonParser().parse(fr); + + //Check if dmaap config is handled by legacy controller/service manager + if (root.getAsJsonObject().has("channels")) { + JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels"); + + for (int i = 0; i < jsonObject.size(); i++) { + log.debug("TOPIC:" + jsonObject.get(i).getAsJsonObject().get("cambria.topic") + " HOST-URL:" + + jsonObject.get(i).getAsJsonObject().get("cambria.url") + " HOSTS:" + + jsonObject.get(i).getAsJsonObject().get("cambria.hosts") + " PWD:" + + jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") + " USER:" + + jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") + " NAME:" + + jsonObject.get(i).getAsJsonObject().get("name")); + + String convertedname = jsonObject.get(i).getAsJsonObject().get("name").toString().replace("\"", ""); + dmaap_hash.put(convertedname + ".cambria.topic", + jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString().replace("\"", "")); + + if (jsonObject.get(i).getAsJsonObject().get("cambria.hosts") != null) { + dmaap_hash.put(convertedname + ".cambria.hosts", + jsonObject.get(i).getAsJsonObject().get("cambria.hosts").toString().replace("\"", "")); + } + if (jsonObject.get(i).getAsJsonObject().get("cambria.url") != null) { + dmaap_hash.put(convertedname + ".cambria.url", + jsonObject.get(i).getAsJsonObject().get("cambria.url").toString().replace("\"", "")); + } + if (jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") != null) { + dmaap_hash.put(convertedname + ".basicAuthPassword", jsonObject.get(i).getAsJsonObject() + .get("basicAuthPassword").toString().replace("\"", "")); + } + if (jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") != null) { + dmaap_hash.put(convertedname + ".basicAuthUsername", jsonObject.get(i).getAsJsonObject() + .get("basicAuthUsername").toString().replace("\"", "")); + } + + } + } else { + + //Handing new format from controllergen2/config_binding_service + JsonObject jsonObject = root.getAsJsonObject(); + Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet(); + + for (Map.Entry<String, JsonElement> entry : entries) { + + JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info").getAsJsonObject().get("topic_url"); + String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", "")); + + String mrTopic = null; + String mrUrl = null; + String[] hostport = null; + String username = null; + String userpwd = null; + + try { + + if (null != urlParts) { + mrUrl = urlParts[2]; + + // DCAE internal dmaap topic convention + if (urlParts[3].equals("events")) { + mrTopic = urlParts[4]; + } else { + // ONAP dmaap topic convention + mrTopic = urlParts[3]; + hostport = mrUrl.split(":"); + } + + } + } catch (NullPointerException e) { + System.out.println("NullPointerException"); + e.getMessage(); + } + + if (entry.getValue().getAsJsonObject().has("aaf_username")) { + username = entry.getValue().getAsJsonObject().get("aaf_username").toString().replace("\"", ""); + } + if (entry.getValue().getAsJsonObject().has("aaf_password")) { + userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString().replace("\"", ""); + } + if (hostport == null) { + log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " PWD:" + userpwd + " USER:" + username); + } else { + log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " HOSTS:" + hostport[0] + " PWD:" + + userpwd + " USER:" + username + " NAME:" + entry.getKey()); + } + + dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic); + + if (!(hostport == null)) { + dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]); + } + + if (!(mrUrl == null)) { + dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl); + } + + if (!(username == null)) { + dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username); + } + + if (!(userpwd == null)) { + dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd); + } + + } + + } + + } catch (JsonIOException | JsonSyntaxException | + + FileNotFoundException e1) { + e1.printStackTrace(); + log.error("Problem loading Dmaap Channel configuration file: " + e1.toString()); + } finally { + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + log.error("Error closing file reader stream : " + e.toString()); + } + } + } + + } + + /*** + * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/ + * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>: + * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1"; + * + * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>. + * <topic>, + */ + + private String[] dmaapUrlSplit(String dmUrl) { + String[] multUrls = dmUrl.split(","); + + StringBuffer newUrls = new StringBuffer(); + String urlParts[] = null; + for (int i = 0; i < multUrls.length; i++) { + urlParts = multUrls[i].split("/"); + if (i == 0) { + newUrls = newUrls.append(urlParts[2]); + } else { + newUrls = newUrls.append(",").append(urlParts[2]); + } + } + return urlParts; + } + + public static synchronized DmaapPropertyReader getInstance(String ChannelConfig) { + if (instance == null) { + instance = new DmaapPropertyReader(ChannelConfig); + } + return instance; + } + + public String getKeyValue(String HashKey) { + return this.dmaap_hash.get(HashKey); + } } diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java index fee9ce1c..a8bfc24e 100644 --- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java +++ b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java @@ -47,21 +47,22 @@ import com.att.nsa.security.NsaAuthenticator; import com.att.nsa.security.authenticators.SimpleAuthenticator; import com.att.nsa.security.db.simple.NsaSimpleApiKey; + public class RestfulCollectorServlet extends CommonServlet { - String authid; - String authpwd; + String authid=null; + String authpwd=null; public String authlist; public RestfulCollectorServlet ( rrNvReadable settings ) throws loadException, missingReqdSetting { super ( settings, "collector", false ); - authid = settings.getString(CommonStartup.kSetting_authid,null); - if (authid != null) + //authid = settings.getString(CommonStartup.kSetting_authid,null); + /*if (authid != null) { String authpwdtemp = settings.getString(CommonStartup.kSetting_authpwd,null); authpwd = new String(Base64.decodeBase64(authpwdtemp)); - } + }*/ authlist = settings.getString(CommonStartup.kSetting_authlist,null); } @@ -158,4 +159,3 @@ public class RestfulCollectorServlet extends CommonServlet private static final long serialVersionUID = 1L; private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class ); } - |