summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorBharat saraswal <bharat.saraswal@huawei.com>2017-09-17 12:55:37 +0530
committerBharat saraswal <bharat.saraswal@huawei.com>2017-09-17 09:40:26 +0000
commitc3b5df3c174d7953ff5c24c57b4d5e567b144cc5 (patch)
tree3e328d412fe1e96b530a39395ae076c83ff2612c /src/main
parent31edebb539c80dafe9fbdce91813ef267695e95b (diff)
Fixing sonar and javadoc issues.
minor code refactoring. Issue-Id:DCAEGEN2-92 Change-Id: I260c16ac8131a2fd3e31221b124a939c757de2d7 Signed-off-by: Bharat saraswal <bharat.saraswal@huawei.com>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java294
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventPublisher.java296
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/VESLogger.java262
3 files changed, 424 insertions, 428 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index 1b912cf4..64af1cb8 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -20,107 +20,108 @@
package org.onap.dcae.commonFunction;
-import java.io.FileReader;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.FileReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;
-import java.util.UUID;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
public class EventProcessor implements Runnable {
- private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
-
- private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
- private JSONObject event = null;
-
- public EventProcessor() {
- log.debug("EventProcessor: Default Constructor");
-
- String list[] = CommonStartup.streamid.split("\\|");
- for (int i = 0; i < list.length; i++) {
- String domain = list[i].split("=")[0];
- //String streamIdList[] = list[i].split("=")[1].split(",");
- String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(",");
-
- log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList));
- streamid_hash.put(domain, streamIdList);
- }
-
- }
-
- @Override
- public void run() {
-
- try {
-
- event = CommonStartup.fProcessingInputQueue.take();
- log.info("EventProcessor\tRemoving element: " + event);
-
- //EventPublisher Ep=new EventPublisher();
- while (event != null) {
- // As long as the producer is running we remove elements from the queue.
-
- //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString());
- String uuid = event.get("VESuniqueId").toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
- localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
-
- log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
- String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
- log.debug("streamIdList:" + streamIdList);
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event.toString());
- }
-
- else {
- for (int i=0; i < streamIdList.length; i++)
- {
- log.info("Invoking publisher for streamId:" + streamIdList[i]);
- this.overrideEvent();
- EventPublisher.getInstance(streamIdList[i]).sendEvent(event);
-
- }
- }
- log.debug("Message published" + event.toString());
- event = CommonStartup.fProcessingInputQueue.take();
- // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
- }
- } catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
- }
-
- }
-
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public void overrideEvent()
- {
- //Set collector timestamp in event payload before publish
- final Date currentTime = new Date();
- final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
+
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+ private static final String EVENT_LITERAL = "event";
+ private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+
+ private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
+ private JSONObject event;
+
+ public EventProcessor() {
+ log.debug("EventProcessor: Default Constructor");
+
+ String[] list = CommonStartup.streamid.split("\\|");
+ for (String aList : list) {
+ String domain = aList.split("=")[0];
+ //String streamIdList[] = list[i].split("=")[1].split(",");
+ String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
+
+ log.debug(String.format("Domain: %s streamIdList:%s", domain,
+ Arrays.toString(streamIdList)));
+ streamid_hash.put(domain, streamIdList);
+ }
+
+ }
+
+ @Override
+ public void run() {
+
+ try {
+
+ event = CommonStartup.fProcessingInputQueue.take();
+ log.info("EventProcessor\tRemoving element: " + event);
+
+ //EventPublisher Ep=new EventPublisher();
+ while (event != null) {
+ // As long as the producer is running we remove elements from the queue.
+
+ //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString());
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId")
+ + "event.commonEventHeader.domain:" + event.getJSONObject(EVENT_LITERAL)
+ .getJSONObject(COMMON_EVENT_HEADER).getString("domain"));
+ String[] streamIdList = streamid_hash.get(
+ event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER)
+ .getString("domain"));
+ log.debug("streamIdList:" + streamIdList);
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ } else {
+ for (String aStreamIdList : streamIdList) {
+ log.info("Invoking publisher for streamId:" + aStreamIdList);
+ this.overrideEvent();
+ EventPublisher.getInstance(aStreamIdList).sendEvent(event);
+
+ }
+ }
+ log.debug("Message published" + event);
+ event = CommonStartup.fProcessingInputQueue.take();
+ // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
+ }
+ } catch (InterruptedException e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ }
+
+ }
+
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void overrideEvent() {
+ //Set collector timestamp in event payload before publish
+ final Date currentTime = new Date();
+ final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
/*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
- JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
+ JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/
-
+
/* "event": {
"commonEventHeader": {
@@ -128,64 +129,65 @@ public class EventProcessor implements Runnable {
"collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT"
},
*/
-
- //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
- JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) );
- JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
- commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
- event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey);
-
- if (CommonStartup.eventTransformFlag == 1)
- {
- // read the mapping json file
- final JsonParser parser = new JsonParser();
- try {
- final JsonArray jo = (JsonArray) parser.parse ( new FileReader ( "./etc/eventTransform.json" ) );
- log.info("parse eventTransform.json");
- // now convert to org.json
- final String jsonText = jo.toString ();
- final JSONArray topLevel = new JSONArray ( jsonText );
- //log.info("topLevel == " + topLevel);
-
- Class[] paramJSONObject = new Class[1];
- paramJSONObject[0] = JSONObject.class;
- //load VESProcessors class at runtime
- Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors");
- Constructor constr = cls.getConstructor(paramJSONObject);
- Object obj = constr.newInstance(event);
-
- for (int j=0; j<topLevel.length(); j++)
- {
- JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter");
- Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject);
- boolean filterMet = (boolean) method.invoke (obj, filterObj );
- if (filterMet)
- {
- final JSONArray processors = (JSONArray)topLevel.getJSONObject(j).getJSONArray("processors");
-
- //call the processor method
- for (int i=0; i < processors.length(); i++)
- {
- final JSONObject processorList = processors.getJSONObject(i);
- final String functionName = processorList.getString("functionName");
- final JSONObject args = processorList.getJSONObject("args");
- //final JSONObject filter = processorList.getJSONObject("filter");
-
- log.info("functionName==" + functionName + " | args==" + args);
- //reflect method call
- method = cls.getDeclaredMethod(functionName, paramJSONObject);
- method.invoke(obj, args);
- }
- }
- }
-
- } catch (Exception e) {
-
- log.error("EventProcessor Exception" + e.getMessage() + e);
- log.error("EventProcessor Exception" + e.getCause());
- }
- }
- log.debug("Modified event:" + event);
-
- }
+
+ //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
+ JSONObject collectorTimeStamp = new JSONObject()
+ .put("collectorTimeStamp", sdf.format(currentTime));
+ JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL)
+ .getJSONObject(COMMON_EVENT_HEADER);
+ commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
+ event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
+
+ if (CommonStartup.eventTransformFlag == 1) {
+ // read the mapping json file
+ final JsonParser parser = new JsonParser();
+ try {
+ final JsonArray jo = (JsonArray) parser
+ .parse(new FileReader("./etc/eventTransform.json"));
+ log.info("parse eventTransform.json");
+ // now convert to org.json
+ final String jsonText = jo.toString();
+ final JSONArray topLevel = new JSONArray(jsonText);
+ //log.info("topLevel == " + topLevel);
+
+ Class[] paramJSONObject = new Class[1];
+ paramJSONObject[0] = JSONObject.class;
+ //load VESProcessors class at runtime
+ Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors");
+ Constructor constr = cls.getConstructor(paramJSONObject);
+ Object obj = constr.newInstance(event);
+
+ for (int j = 0; j < topLevel.length(); j++) {
+ JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter");
+ Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject);
+ boolean filterMet = (boolean) method.invoke(obj, filterObj);
+ if (filterMet) {
+ final JSONArray processors = topLevel.getJSONObject(j)
+ .getJSONArray("processors");
+
+ //call the processor method
+ for (int i = 0; i < processors.length(); i++) {
+ final JSONObject processorList = processors.getJSONObject(i);
+ final String functionName = processorList.getString("functionName");
+ final JSONObject args = processorList.getJSONObject("args");
+ //final JSONObject filter = processorList.getJSONObject("filter");
+
+ log.info(String.format("functionName==%s | args==%s", functionName,
+ args));
+ //reflect method call
+ method = cls.getDeclaredMethod(functionName, paramJSONObject);
+ method.invoke(obj, args);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+
+ log.error("EventProcessor Exception" + e.getMessage() + e);
+ log.error("EventProcessor Exception" + e.getCause());
+ }
+ }
+ log.debug("Modified event:" + event);
+
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
index a5acb857..f870ffac 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
@@ -20,162 +20,168 @@
package org.onap.dcae.commonFunction;
-import java.io.IOException;
-
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.security.GeneralSecurityException;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
+public class EventPublisher {
+ private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static EventPublisher instance;
+ private static CambriaBatchingPublisher pub;
-public class EventPublisher {
+ private String streamid = "";
+ private String ueburl = "";
+ private String topic = "";
+ private String authuser = "";
+ private String authpwd = "";
+
+ private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
+
+
+ private EventPublisher(String newstreamid) {
+
+ streamid = newstreamid;
+ try {
+ ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+ .get(streamid + ".cambria.url");
- private static EventPublisher instance = null;
- private static CambriaBatchingPublisher pub = null;
-
- private String streamid = "";
- private String ueburl="";
- private String topic="";
- private String authuser="";
- private String authpwd="";
-
- private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
-
-
- private EventPublisher( String newstreamid) {
-
- this.streamid = newstreamid;
- try {
- ueburl=DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.url");
-
- if (ueburl==null){
- ueburl= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.hosts");
- }
- topic= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".cambria.topic");
- authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".basicAuthUsername");
-
-
- if (authuser != null) {
- authpwd= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".basicAuthPassword");
- }
- }
- catch(Exception e) {
- log.error("CambriaClientBuilders connection reader exception : " + e.getMessage());
-
- }
-
- }
-
-
- public static synchronized EventPublisher getInstance( String streamid){
- if (instance == null) {
- instance = new EventPublisher(streamid);
- }
- if (!instance.streamid.equals(streamid)){
- instance.closePublisher();
- instance = new EventPublisher(streamid);
- }
- return instance;
-
- }
-
-
- public synchronized void sendEvent(JSONObject event) {
-
- log.debug("EventPublisher.sendEvent: instance for publish is ready");
-
-
- if (event.has("VESuniqueId"))
- {
- String uuid = event.get("VESuniqueId").toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
- localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
- log.debug("Removing VESuniqueid object from event");
- event.remove("VESuniqueId");
- }
-
-
-
-
- try {
-
- if (authuser != null)
- {
- log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd);
- pub = new CambriaClientBuilders.PublisherBuilder ()
- .usingHosts (ueburl)
- .onTopic (topic)
- .usingHttps()
- .authenticatedByHttp (authuser, authpwd )
- .logSendFailuresAfter(5)
- // .logTo(log)
- // .limitBatch(100, 10)
- .build ();
- }
- else
- {
-
- log.debug("URL:" + ueburl + "TOPIC:" + topic );
- pub = new CambriaClientBuilders.PublisherBuilder ()
- .usingHosts (ueburl)
- .onTopic (topic)
- // .logTo(log)
- .logSendFailuresAfter(5)
- // .limitBatch(100, 10)
- .build ();
-
- }
-
- int pendingMsgs = pub.send("MyPartitionKey", event.toString());
- //this.wait(2000);
-
- if(pendingMsgs > 100) {
- log.info("Pending Message Count="+pendingMsgs);
- }
-
- //closePublisher();
- log.info("pub.send invoked - no error");
- CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event);
-
- } catch(IOException e) {
- log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
- } catch (GeneralSecurityException e) {
- // TODO Auto-generated catch block
- log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
- }
- catch (IllegalArgumentException e)
- {
- log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
- }
-
- }
+ if (ueburl == null) {
+ ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+ .get(streamid + ".cambria.hosts");
+ }
+ topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
+ .getKeyValue(streamid + ".cambria.topic");
+ authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
+ .getKeyValue(streamid + ".basicAuthUsername");
+
+ if (authuser != null) {
+ authpwd = DmaapPropertyReader
+ .getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+ .get(streamid + ".basicAuthPassword");
+ }
+ } catch (Exception e) {
+ log.error("CambriaClientBuilders connection reader exception : " + e.getMessage());
+
+ }
+
+ }
+
+
+ /**
+ * Returns event publisher
+ *
+ * @param streamid stream id
+ * @return event publisher
+ */
+ public static synchronized EventPublisher getInstance(String streamid) {
+ if (instance == null) {
+ instance = new EventPublisher(streamid);
+ }
+ if (!instance.streamid.equals(streamid)) {
+ instance.closePublisher();
+ instance = new EventPublisher(streamid);
+ }
+ return instance;
+
+ }
+
+
+ /**
+ *
+ * @param event json object for event
+ */
+ public synchronized void sendEvent(JSONObject event) {
+
+ log.debug("EventPublisher.sendEvent: instance for publish is ready");
+
+ if (event.has(VES_UNIQUE_ID)) {
+ String uuid = event.get(VES_UNIQUE_ID).toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ log.debug("Removing VESuniqueid object from event");
+ event.remove(VES_UNIQUE_ID);
+ }
+
+ try {
+
+ if (authuser != null) {
+ log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic,
+ authuser, authpwd));
+ pub = new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(ueburl)
+ .onTopic(topic)
+ .usingHttps()
+ .authenticatedByHttp(authuser, authpwd)
+ .logSendFailuresAfter(5)
+ // .logTo(log)
+ // .limitBatch(100, 10)
+ .build();
+ } else {
+
+ log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic));
+ pub = new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(ueburl)
+ .onTopic(topic)
+ // .logTo(log)
+ .logSendFailuresAfter(5)
+ // .limitBatch(100, 10)
+ .build();
+
+ }
+
+ int pendingMsgs = pub.send("MyPartitionKey", event.toString());
+ //this.wait(2000);
+
+ if (pendingMsgs > 100) {
+ log.info("Pending Message Count=" + pendingMsgs);
+ }
+
+ //closePublisher();
+ log.info("pub.send invoked - no error");
+ CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s",
+ ueburl, topic, event));
+
+ } catch (IOException e) {
+ log.error("IOException:Unable to publish event:" + event + " streamid:" + streamid
+ + " Exception:" + e);
+ } catch (GeneralSecurityException e) {
+ // TODO Auto-generated catch block
+ log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:"
+ + streamid + " Exception:" + e);
+ } catch (IllegalArgumentException e) {
+ log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:"
+ + streamid + " Exception:" + e);
+ }
+
+ }
public synchronized void closePublisher() {
-
- try {
- if (pub!= null)
- {
- final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
- if ( stuck.size () > 0 ) {
- log.error(stuck.size() + " messages unsent" );
- }
- }
- }
- catch(InterruptedException ie) {
- log.error("Caught an Interrupted Exception on Close event");
- }catch(IOException ioe) {
- log.error("Caught IO Exception: " + ioe.toString());
- }
-
- }
+
+ try {
+ if (pub != null) {
+ final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+ if (!stuck.isEmpty()) {
+ log.error(stuck.size() + " messages unsent");
+ }
+ }
+ } catch (InterruptedException ie) {
+ log.error("Caught an Interrupted Exception on Close event");
+ } catch (IOException ioe) {
+ log.error("Caught IO Exception: " + ioe);
+ }
+
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
index 5d60a016..79108443 100644
--- a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
+++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
@@ -21,150 +21,138 @@
package org.onap.dcae.commonFunction;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.att.nsa.clock.SaClock;
-
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.LoggingContextFactory;
import com.att.nsa.logging.log4j.EcompFields;
-
import jline.internal.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
public class VESLogger {
- public static final String VES_AGENT = "VES_AGENT";
-
- public static Logger auditLog;
- public static Logger metricsLog;
- public static Logger errorLog;
- public static Logger debugLog;
-
- // Common LoggingContext
- private static LoggingContext commonLC = null;
- // Thread-specific LoggingContext
- private static LoggingContext threadLC = null;
- public LoggingContext lc ;
-
-
-
- /**
- * Returns the common LoggingContext instance that is the base context
- * for all subsequent instances.
- *
- * @return the common LoggingContext
- */
- public static LoggingContext getCommonLoggingContext()
- {
- if (commonLC == null)
- {
- commonLC = new LoggingContextFactory.Builder().build();
- final UUID uuid = java.util.UUID.randomUUID();
-
- commonLC.put("requestId", uuid.toString());
- }
- return commonLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common logging context.
- * Populate the context with context-specific values.
- *
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread (UUID aUuid)
- {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it actually
- // helps prevent the thread from overwriting supposedly common data. It also
- // should be fairly quick compared with the overhead of handling the actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().
- withBaseContext ( getCommonLoggingContext () ).
- build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put("requestId", aUuid.toString());
- threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () );
-
- return threadLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common logging context.
- * Populate the context with context-specific values.
- *
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread (String aUuid)
- {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it actually
- // helps prevent the thread from overwriting supposedly common data. It also
- // should be fairly quick compared with the overhead of handling the actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().
- withBaseContext ( getCommonLoggingContext () ).
- build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put("requestId", aUuid);
- threadLC.put ( "statusCode", "COMPLETE" );
- threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () );
- return threadLC;
- }
- public static void setUpEcompLogging()
- {
-
-
- // Create ECOMP Logger instances
- auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
- metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
- errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
-
-
- final LoggingContext lc = getCommonLoggingContext();
-
- String ipAddr = "127.0.0.1";
- String hostname = "localhost";
- try
- {
- final InetAddress ip = InetAddress.getLocalHost ();
- hostname = ip.getCanonicalHostName ();
- ipAddr = ip.getHostAddress();
- }
- catch ( UnknownHostException x )
- {
- Log.debug(x.getMessage());
- }
-
- lc.put ( "serverName", hostname );
- lc.put ( "serviceName", "VESCollecor" );
- lc.put ( "statusCode", "RUNNING" );
- lc.put ( "targetEntity", "NULL");
- lc.put ( "targetServiceName", "NULL");
- lc.put ( "server", hostname );
- lc.put ( "serverIpAddress", ipAddr.toString () );
-
- // instance UUID is meaningless here, so we just create a new one each time the
- // server starts. One could argue each new instantiation of the service should
- // have a new instance ID.
- lc.put ( "instanceUuid", "" );
- lc.put ( "severity", "" );
- lc.put ( EcompFields.kEndTimestamp, SaClock.now () );
- lc.put("EndTimestamp", SaClock.now ());
- lc.put("partnerName", "NA");
-
-
- }
-
-
+ public static final String VES_AGENT = "VES_AGENT";
+ private static final String REQUEST_ID = "requestId";
+ private static final String IP_ADDRESS ="127.0.0.1";
+ private static final String HOST_NAME="localhost";
+
+ public static Logger auditLog;
+ public static Logger metricsLog;
+ public static Logger errorLog;
+ public static Logger debugLog;
+
+ // Common LoggingContext
+ private static LoggingContext commonLC;
+ // Thread-specific LoggingContext
+ private static LoggingContext threadLC;
+ public LoggingContext lc;
+
+
+ /**
+ * Returns the common LoggingContext instance that is the base context
+ * for all subsequent instances.
+ *
+ * @return the common LoggingContext
+ */
+ public static LoggingContext getCommonLoggingContext() {
+ if (commonLC == null) {
+ commonLC = new LoggingContextFactory.Builder().build();
+ final UUID uuid = UUID.randomUUID();
+
+ commonLC.put(REQUEST_ID, uuid.toString());
+ }
+ return commonLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common logging context.
+ * Populate the context with context-specific values.
+ *
+ * @param aUuid uuid for request id
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread(UUID aUuid) {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it actually
+ // helps prevent the thread from overwriting supposedly common data. It also
+ // should be fairly quick compared with the overhead of handling the actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().
+ withBaseContext(getCommonLoggingContext()).
+ build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put(REQUEST_ID, aUuid.toString());
+ threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
+
+ return threadLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common logging context.
+ * Populate the context with context-specific values.
+ *
+ * @param aUuid uuid for request id
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread(String aUuid) {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it actually
+ // helps prevent the thread from overwriting supposedly common data. It also
+ // should be fairly quick compared with the overhead of handling the actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().
+ withBaseContext(getCommonLoggingContext()).
+ build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put(REQUEST_ID, aUuid);
+ threadLC.put("statusCode", "COMPLETE");
+ threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
+ return threadLC;
+ }
+
+ public static void setUpEcompLogging() {
+
+ // Create ECOMP Logger instances
+ auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
+ metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
+ errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
+
+ final LoggingContext lc = getCommonLoggingContext();
+
+ String ipAddr = IP_ADDRESS;
+ String hostname = HOST_NAME;
+ try {
+ final InetAddress ip = InetAddress.getLocalHost();
+ hostname = ip.getCanonicalHostName();
+ ipAddr = ip.getHostAddress();
+ } catch (UnknownHostException x) {
+ Log.debug(x.getMessage());
+ }
+
+ lc.put("serverName", hostname);
+ lc.put("serviceName", "VESCollecor");
+ lc.put("statusCode", "RUNNING");
+ lc.put("targetEntity", "NULL");
+ lc.put("targetServiceName", "NULL");
+ lc.put("server", hostname);
+ lc.put("serverIpAddress", ipAddr);
+
+ // instance UUID is meaningless here, so we just create a new one each time the
+ // server starts. One could argue each new instantiation of the service should
+ // have a new instance ID.
+ lc.put("instanceUuid", "");
+ lc.put("severity", "");
+ lc.put(EcompFields.kEndTimestamp, SaClock.now());
+ lc.put("EndTimestamp", SaClock.now());
+ lc.put("partnerName", "NA");
+ }
+
}