aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java630
-rw-r--r--src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java54
-rw-r--r--src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java73
3 files changed, 397 insertions, 360 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
index d1dbca92..70da7744 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
@@ -1,316 +1,314 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.apiServer.ApiServer;
-import com.att.nsa.apiServer.ApiServerConnector;
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.cmdLine.NsaCommandLineUtil;
-import com.att.nsa.drumlin.service.framework.DrumlinServlet;
-import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jsonschema.exceptions.ProcessingException;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import com.github.fge.jsonschema.report.ProcessingMessage;
-import com.github.fge.jsonschema.report.ProcessingReport;
-import com.github.fge.jsonschema.util.JsonLoader;
-import org.apache.catalina.LifecycleException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import javax.servlet.ServletException;
-
-public class CommonStartup extends NsaBaseEndpoint implements Runnable {
-
- public static final String KCONFIG = "c";
-
- public static final String KSETTING_PORT = "collector.service.port";
- public static final int KDEFAULT_PORT = 8080;
-
- public static final String KSETTING_SECUREPORT = "collector.service.secure.port";
- public static final int KDEFAULT_SECUREPORT = -1;
-
- public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
- public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
- public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
- public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
- public static final String KSETTING_KEYALIAS = "collector.keystore.alias";
- public static final String KDEFAULT_KEYALIAS = "tomcat";
-
- public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
- private static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
-
- public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";
- public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
-
- public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
- public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
-
- public static final String KSETTING_SCHEMAFILE = "collector.schema.file";
- public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
- public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig";
-
- public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
-
- public static final String KSETTING_AUTHFLAG = "header.authflag";
- public static final int KDEFAULT_AUTHFLAG = 0;
-
- public static final String KSETTING_AUTHLIST = "header.authlist";
-
- public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
- public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
-
- public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
- public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
-
- public static int schemaValidatorflag = -1;
- public static int authflag = 1;
- public static int eventTransformFlag = 1;
- public static String schemaFile;
- public static JSONObject schemaFileJson;
- public static String exceptionConfig;
- public static String cambriaConfigFile;
- private boolean listnerstatus;
- public static String streamid;
-
- public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
-
- private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,
- rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
- final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
-
- if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
- // http service
- connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
- .build());
- }
-
- // optional https service
- final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
- final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
- final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
- final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
-
- if (securePort > 0) {
- final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset());
- connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
-
- }
-
- // Reading other config properties
-
- schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
- if (schemaValidatorflag > 0) {
- schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
- // System.out.println("SchemaFile:" + schemaFile);
- schemaFileJson = new JSONObject(schemaFile);
-
- }
- exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);
- authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
- String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,
- CommonStartup.KDEFAULT_DMAAPCONFIGS);
- cambriaConfigFile = currentconffile[0];
- streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
- eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
- .name("collector").build();
-
- // Load override exception map
- CustomExceptionLoader.LoadMap();
- setListnerstatus(true);
- }
-
- public static void main(String[] args) {
- ExecutorService executor = null;
- try {
- // process command line arguments
- final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
- final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
- final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
-
- final nvReadableStack settings = new nvReadableStack();
- settings.push(new nvPropertiesFile(settingStream));
- settings.push(new nvReadableTable(argMap));
-
- fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(settings);
-
- Thread csmain = new Thread(cs);
- csmain.start();
-
- EventProcessor ep = new EventProcessor();
- // Thread epThread=new Thread(ep);
- // epThread.start();
- executor = Executors.newFixedThreadPool(20);
- //executor.execute(ep);
- for (int i = 0; i < 20; ++i) {
- executor.execute(ep);
- }
-
- } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
- | InterruptedException e) {
- CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
- throw new RuntimeException(e);
- } catch (Throwable e) {
- System.err.println("Uncaught exception - " + e.getMessage());
- CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());
- e.printStackTrace(System.err);
- } finally {
- // This will make the executor accept no new threads
- // and finish all existing threads in the queue
- if (executor != null) {
- executor.shutdown();
- }
-
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- } catch (LifecycleException | IOException e) {
-
- log.error("lifecycle or IO: ", e);
- }
- fTomcatServer.await();
- }
-
- public boolean isListnerstatus() {
- return listnerstatus;
- }
-
- public void setListnerstatus(boolean listnerstatus) {
- this.listnerstatus = listnerstatus;
- }
-
- public static Queue<JSONObject> getProcessingInputQueue() {
- return fProcessingInputQueue;
- }
-
- public static class QueueFullException extends Exception {
-
- private static final long serialVersionUID = 1L;
- }
-
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {
- final Queue<JSONObject> queue = getProcessingInputQueue();
- try {
-
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!queue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
-
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
-
- } catch (JSONException e) {
- throw e;
-
- }
- }
-
- static String readFile(String path, Charset encoding) throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
- public static String schemavalidate(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to
- // data: #<#<"+jsonData+">#>#");
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("schemavalidate:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("schemavalidate:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- Iterator<ProcessingMessage> iter = report.iterator();
- while (iter.hasNext()) {
- ProcessingMessage pm = iter.next();
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("schemavalidate:NullpointerException on report");
- }
- return result;
- }
-
-
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import com.att.nsa.apiServer.ApiServer;
+import com.att.nsa.apiServer.ApiServerConnector;
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
+import com.att.nsa.cmdLine.NsaCommandLineUtil;
+import com.att.nsa.drumlin.service.framework.DrumlinServlet;
+import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jsonschema.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import com.github.fge.jsonschema.report.ProcessingMessage;
+import com.github.fge.jsonschema.report.ProcessingReport;
+import com.github.fge.jsonschema.util.JsonLoader;
+import org.apache.catalina.LifecycleException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.restapi.RestfulCollectorServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.servlet.ServletException;
+
+public class CommonStartup extends NsaBaseEndpoint implements Runnable {
+
+ public static final String KCONFIG = "c";
+
+ public static final String KSETTING_PORT = "collector.service.port";
+ public static final int KDEFAULT_PORT = 8080;
+
+ public static final String KSETTING_SECUREPORT = "collector.service.secure.port";
+ public static final int KDEFAULT_SECUREPORT = -1;
+
+ public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
+ public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
+ public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
+ public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
+ public static final String KSETTING_KEYALIAS = "collector.keystore.alias";
+ public static final String KDEFAULT_KEYALIAS = "tomcat";
+
+ public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
+ protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
+
+ public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";
+ public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
+
+ public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
+ public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
+
+ public static final String KSETTING_SCHEMAFILE = "collector.schema.file";
+ public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
+ public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig";
+
+ public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
+
+ public static final String KSETTING_AUTHFLAG = "header.authflag";
+ public static final int KDEFAULT_AUTHFLAG = 0;
+
+ public static final String KSETTING_AUTHLIST = "header.authlist";
+
+ public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
+ public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
+
+ public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
+ public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
+ public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+
+ public static int schemaValidatorflag = -1;
+ public static int authflag = 1;
+ public static int eventTransformFlag = 1;
+ public static String schemaFile;
+ public static JSONObject schemaFileJson;
+ public static String exceptionConfig;
+ public static String cambriaConfigFile;
+ private boolean listnerstatus;
+ public static String streamid;
+
+ public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApiServer fTomcatServer = null;
+ private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
+
+ private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,
+ rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
+ final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
+
+ if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
+ // http service
+ connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
+ .build());
+ }
+
+ // optional https service
+ final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
+ final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
+ final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
+ final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
+
+ if (securePort > 0) {
+ final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset());
+ connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
+ .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
+
+ }
+
+ // Reading other config properties
+
+ schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
+ if (schemaValidatorflag > 0) {
+ schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
+ // System.out.println("SchemaFile:" + schemaFile);
+ schemaFileJson = new JSONObject(schemaFile);
+
+ }
+ exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);
+ authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
+ String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,
+ CommonStartup.KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentconffile[0];
+ streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
+ eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
+
+ fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
+ .name("collector").build();
+
+ // Load override exception map
+ CustomExceptionLoader.LoadMap();
+ setListnerstatus(true);
+ }
+
+ public static void main(String[] args) {
+ ExecutorService executor = null;
+ try {
+ // process command line arguments
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
+
+ VESLogger.setUpEcompLogging();
+
+ CommonStartup cs = new CommonStartup(settings);
+
+ Thread csmain = new Thread(cs);
+ csmain.start();
+
+ EventProcessor ep = new EventProcessor();
+ executor = Executors.newFixedThreadPool(20);
+ //executor.execute(ep);
+ for (int i = 0; i < 20; ++i) {
+ executor.execute(ep);
+ }
+
+ } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
+ | InterruptedException e) {
+ CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
+ throw new RuntimeException(e);
+ } catch (Throwable e) {
+ System.err.println("Uncaught exception - " + e.getMessage());
+ CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());
+ e.printStackTrace(System.err);
+ } finally {
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ /*if (executor != null) {
+ executor.shutdown();
+ }*/
+
+ }
+ }
+
+ public void run() {
+ try {
+ fTomcatServer.start();
+ } catch (LifecycleException | IOException e) {
+
+ log.error("lifecycle or IO: ", e);
+ }
+ fTomcatServer.await();
+ }
+
+ public boolean isListnerstatus() {
+ return listnerstatus;
+ }
+
+ public void setListnerstatus(boolean listnerstatus) {
+ this.listnerstatus = listnerstatus;
+ }
+
+ public static Queue<JSONObject> getProcessingInputQueue() {
+ return fProcessingInputQueue;
+ }
+
+ public static class QueueFullException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {
+ final Queue<JSONObject> queue = getProcessingInputQueue();
+ try {
+
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START");
+ for (int i = 0; i < a.length(); i++) {
+ if (!queue.offer(a.getJSONObject(i))) {
+ throw new QueueFullException();
+ }
+
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
+ // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
+
+ } catch (JSONException e) {
+ throw e;
+
+ }
+ }
+
+ static String readFile(String path, Charset encoding) throws IOException {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ String pwd = new String(encoded);
+ return pwd.substring(0, pwd.length() - 1);
+ }
+
+ public static String schemavalidate(String jsonData, String jsonSchema) {
+ ProcessingReport report;
+ String result = "false";
+
+ try {
+ // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to
+ // data: #<#<"+jsonData+">#>#");
+ log.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ log.error("schemavalidate:JsonParseException for event:" + jsonData);
+ return e.getMessage();
+ } catch (ProcessingException e) {
+ log.error("schemavalidate:Processing exception for event:" + jsonData);
+ return e.getMessage();
+ } catch (IOException e) {
+ log.error(
+ "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ return e.getMessage();
+ }
+ if (report != null) {
+ Iterator<ProcessingMessage> iter = report.iterator();
+ while (iter.hasNext()) {
+ ProcessingMessage pm = iter.next();
+ log.trace("Processing Message: " + pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ log.debug("Validation Result:" + result + " Validation report:" + report);
+ } catch (NullPointerException e) {
+ log.error("schemavalidate:NullpointerException on report");
+ }
+ return result;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
index 82d4bba3..95e95747 100644
--- a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,7 +36,7 @@ public class FetchDynamicConfig {
private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
- static String configFile = "/opt/app/KV-Configuration.json";
+ public static String configFile = "/opt/app/KV-Configuration.json";
static String url;
static String retString;
@@ -49,8 +49,8 @@ public class FetchDynamicConfig {
log.info("%s=%s%n", entry.getKey(), entry.getValue());
}
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")
- && env.containsKey("HOSTNAME")) {
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
+// && env.containsKey("HOSTNAME")) {
log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
@@ -65,23 +65,47 @@ public class FetchDynamicConfig {
}
log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
- url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
- retString = executecurl(url);
-
- JSONObject jsonObject = new JSONObject(new JSONTokener(retString));
- try (FileWriter file = new FileWriter(configFile)) {
- file.write(jsonObject.toString());
-
- log.info("Successfully Copied JSON Object to file /opt/app/KV-Configuration.json");
- } catch (IOException e) {
- log.error("Error in writing configuration into file /opt/app/KV-Configuration.json " + jsonObject, e);
- e.printStackTrace();
+ FetchDynamicConfig fc= new FetchDynamicConfig();
+ if (env.containsKey("HOSTNAME"))
+ {
+ url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
+ retString = executecurl(url);
+ }
+ else if (env.containsKey("SERVICE_NAME"))
+ {
+ url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
+ retString = executecurl(url);
+ }
+ else
+ {
+ log.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
}
+ fc.writefile(retString);
+
+
} else {
log.info(">>>Static configuration to be used");
}
}
+
+ public void writefile (String retString)
+ {
+ log.info("URL to fetch configuration:" + url + " Return String:" + retString);
+
+
+ String indentedretstring=(new JSONObject(retString)).toString(4);
+
+ try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
+ file.write(indentedretstring);
+
+ log.info("Successfully Copied JSON Object to file /opt/app/KV-Configuration.json");
+ } catch (IOException e) {
+ log.error("Error in writing configuration into file /opt/app/KV-Configuration.json " + retString, e);
+ e.printStackTrace();
+ }
+
+ }
public static String executecurl(String url) {
diff --git a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
index 2db4ff42..9184c3e7 100644
--- a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,6 +42,7 @@ public class LoadDynamicConfig {
public String configFile = "/opt/app/KV-Configuration.json";
static String url;
static String retString;
+ public String dmaapoutputfile = "./etc/DmaapConfig.json";
public LoadDynamicConfig() {
@@ -59,35 +60,10 @@ public class LoadDynamicConfig {
LoadDynamicConfig lc = new LoadDynamicConfig();
String jsonData = readFile(lc.configFile);
JSONObject jsonObject = new JSONObject(jsonData);
+ lc.writeconfig(jsonObject);
+
- PropertiesConfiguration conf;
- conf = new PropertiesConfiguration(lc.propFile);
- conf.setEncoding(null);
-
- // update properties based on consul dynamic configuration
- Iterator<?> keys = jsonObject.keys();
-
- while (keys.hasNext()) {
- String key = (String) keys.next();
- // check if any configuration is related to dmaap
- // and write into dmaapconfig.json
- if (key.startsWith("streams_publishes")) {
- // VESCollector only have publish streams
- try (FileWriter file = new FileWriter("./etc/DmaapConfig.json")) {
- file.write(jsonObject.get(key).toString());
- log.info("Successfully written JSON Object to DmaapConfig.json");
- file.close();
- } catch (IOException e) {
- log.info("Error in writing dmaap configuration into DmaapConfig.json", e);
- }
- } else {
- conf.setProperty(key, jsonObject.get(key).toString());
- }
-
- }
- conf.save();
-
- } catch (ConfigurationException e) {
+ } catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
e.printStackTrace();
@@ -98,6 +74,45 @@ public class LoadDynamicConfig {
}
}
+
+ public void writeconfig (JSONObject jsonObject)
+ {
+
+ PropertiesConfiguration conf;
+ try {
+ conf = new PropertiesConfiguration(propFile);
+
+ conf.setEncoding(null);
+
+ // update properties based on consul dynamic configuration
+ Iterator<?> keys = jsonObject.keys();
+
+ while (keys.hasNext()) {
+ String key = (String) keys.next();
+ // check if any configuration is related to dmaap
+ // and write into dmaapconfig.json
+ if (key.startsWith("streams_publishes")) {
+ // VESCollector only have publish streams
+ try (FileWriter file = new FileWriter(dmaapoutputfile)) {
+
+ String indentedretstring=(new JSONObject(jsonObject.get(key).toString())).toString(4);
+ file.write(indentedretstring);
+ log.info("Successfully written JSON Object to DmaapConfig.json");
+ file.close();
+ } catch (IOException e) {
+ log.info("Error in writing dmaap configuration into DmaapConfig.json", e);
+ }
+ } else {
+ conf.setProperty(key, jsonObject.get(key).toString());
+ }
+
+ }
+ conf.save();
+ } catch (ConfigurationException e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+ }
+ }
public static String readFile(String filename) {
String result = "";