From c3b5df3c174d7953ff5c24c57b4d5e567b144cc5 Mon Sep 17 00:00:00 2001 From: Bharat saraswal Date: Sun, 17 Sep 2017 12:55:37 +0530 Subject: Fixing sonar and javadoc issues. minor code refactoring. Issue-Id:DCAEGEN2-92 Change-Id: I260c16ac8131a2fd3e31221b124a939c757de2d7 Signed-off-by: Bharat saraswal --- .../onap/dcae/commonFunction/EventProcessor.java | 294 ++++++++++---------- .../onap/dcae/commonFunction/EventPublisher.java | 296 +++++++++++---------- .../org/onap/dcae/commonFunction/VESLogger.java | 262 +++++++++--------- 3 files changed, 424 insertions(+), 428 deletions(-) (limited to 'src/main/java/org') diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 1b912cf4..64af1cb8 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -20,107 +20,108 @@ package org.onap.dcae.commonFunction; -import java.io.FileReader; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.text.SimpleDateFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import com.google.gson.JsonArray; import com.google.gson.JsonParser; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.FileReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.TimeZone; -import java.util.UUID; - -import org.json.JSONArray; -import org.json.JSONObject; public class EventProcessor implements Runnable { - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - - private static HashMap streamid_hash = new HashMap(); - private JSONObject event = null; - - public EventProcessor() { - log.debug("EventProcessor: Default Constructor"); - - String list[] = CommonStartup.streamid.split("\\|"); - for (int i = 0; i < list.length; i++) { - String domain = list[i].split("=")[0]; - //String streamIdList[] = list[i].split("=")[1].split(","); - String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(","); - - log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList)); - streamid_hash.put(domain, streamIdList); - } - - } - - @Override - public void run() { - - try { - - event = CommonStartup.fProcessingInputQueue.take(); - log.info("EventProcessor\tRemoving element: " + event); - - //EventPublisher Ep=new EventPublisher(); - while (event != null) { - // As long as the producer is running we remove elements from the queue. - - //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString()); - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString()); - localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () ); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); - String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event.toString()); - } - - else { - for (int i=0; i < streamIdList.length; i++) - { - log.info("Invoking publisher for streamId:" + streamIdList[i]); - this.overrideEvent(); - EventPublisher.getInstance(streamIdList[i]).sendEvent(event); - - } - } - log.debug("Message published" + event.toString()); - event = CommonStartup.fProcessingInputQueue.take(); - // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - } - - } - - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void overrideEvent() - { - //Set collector timestamp in event payload before publish - final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - + + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + + private static HashMap streamid_hash = new HashMap(); + private JSONObject event; + + public EventProcessor() { + log.debug("EventProcessor: Default Constructor"); + + String[] list = CommonStartup.streamid.split("\\|"); + for (String aList : list) { + String domain = aList.split("=")[0]; + //String streamIdList[] = list[i].split("=")[1].split(","); + String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); + + log.debug(String.format("Domain: %s streamIdList:%s", domain, + Arrays.toString(streamIdList))); + streamid_hash.put(domain, streamIdList); + } + + } + + @Override + public void run() { + + try { + + event = CommonStartup.fProcessingInputQueue.take(); + log.info("EventProcessor\tRemoving element: " + event); + + //EventPublisher Ep=new EventPublisher(); + while (event != null) { + // As long as the producer is running we remove elements from the queue. + + //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString()); + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + + "event.commonEventHeader.domain:" + event.getJSONObject(EVENT_LITERAL) + .getJSONObject(COMMON_EVENT_HEADER).getString("domain")); + String[] streamIdList = streamid_hash.get( + event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER) + .getString("domain")); + log.debug("streamIdList:" + streamIdList); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event); + } else { + for (String aStreamIdList : streamIdList) { + log.info("Invoking publisher for streamId:" + aStreamIdList); + this.overrideEvent(); + EventPublisher.getInstance(aStreamIdList).sendEvent(event); + + } + } + log.debug("Message published" + event); + event = CommonStartup.fProcessingInputQueue.take(); + // log.info("EventProcessor\tRemoving element: " + this.queue.remove()); + } + } catch (InterruptedException e) { + log.error("EventProcessor InterruptedException" + e.getMessage()); + } + + } + + + @SuppressWarnings({"unchecked", "rawtypes"}) + public void overrideEvent() { + //Set collector timestamp in event payload before publish + final Date currentTime = new Date(); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); - JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray ); + JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray ); JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/ - + /* "event": { "commonEventHeader": { @@ -128,64 +129,65 @@ public class EventProcessor implements Runnable { "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT" }, */ - - //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime))); - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) ); - JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader"); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey); - - if (CommonStartup.eventTransformFlag == 1) - { - // read the mapping json file - final JsonParser parser = new JsonParser(); - try { - final JsonArray jo = (JsonArray) parser.parse ( new FileReader ( "./etc/eventTransform.json" ) ); - log.info("parse eventTransform.json"); - // now convert to org.json - final String jsonText = jo.toString (); - final JSONArray topLevel = new JSONArray ( jsonText ); - //log.info("topLevel == " + topLevel); - - Class[] paramJSONObject = new Class[1]; - paramJSONObject[0] = JSONObject.class; - //load VESProcessors class at runtime - Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); - Constructor constr = cls.getConstructor(paramJSONObject); - Object obj = constr.newInstance(event); - - for (int j=0; j 100) { - log.info("Pending Message Count="+pendingMsgs); - } - - //closePublisher(); - log.info("pub.send invoked - no error"); - CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event); - - } catch(IOException e) { - log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); - } catch (GeneralSecurityException e) { - // TODO Auto-generated catch block - log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); - } - catch (IllegalArgumentException e) - { - log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString()); - } - - } + if (ueburl == null) { + ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash + .get(streamid + ".cambria.hosts"); + } + topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile) + .getKeyValue(streamid + ".cambria.topic"); + authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile) + .getKeyValue(streamid + ".basicAuthUsername"); + + if (authuser != null) { + authpwd = DmaapPropertyReader + .getInstance(CommonStartup.cambriaConfigFile).dmaap_hash + .get(streamid + ".basicAuthPassword"); + } + } catch (Exception e) { + log.error("CambriaClientBuilders connection reader exception : " + e.getMessage()); + + } + + } + + + /** + * Returns event publisher + * + * @param streamid stream id + * @return event publisher + */ + public static synchronized EventPublisher getInstance(String streamid) { + if (instance == null) { + instance = new EventPublisher(streamid); + } + if (!instance.streamid.equals(streamid)) { + instance.closePublisher(); + instance = new EventPublisher(streamid); + } + return instance; + + } + + + /** + * + * @param event json object for event + */ + public synchronized void sendEvent(JSONObject event) { + + log.debug("EventPublisher.sendEvent: instance for publish is ready"); + + if (event.has(VES_UNIQUE_ID)) { + String uuid = event.get(VES_UNIQUE_ID).toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("Removing VESuniqueid object from event"); + event.remove(VES_UNIQUE_ID); + } + + try { + + if (authuser != null) { + log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, + authuser, authpwd)); + pub = new CambriaClientBuilders.PublisherBuilder() + .usingHosts(ueburl) + .onTopic(topic) + .usingHttps() + .authenticatedByHttp(authuser, authpwd) + .logSendFailuresAfter(5) + // .logTo(log) + // .limitBatch(100, 10) + .build(); + } else { + + log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic)); + pub = new CambriaClientBuilders.PublisherBuilder() + .usingHosts(ueburl) + .onTopic(topic) + // .logTo(log) + .logSendFailuresAfter(5) + // .limitBatch(100, 10) + .build(); + + } + + int pendingMsgs = pub.send("MyPartitionKey", event.toString()); + //this.wait(2000); + + if (pendingMsgs > 100) { + log.info("Pending Message Count=" + pendingMsgs); + } + + //closePublisher(); + log.info("pub.send invoked - no error"); + CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", + ueburl, topic, event)); + + } catch (IOException e) { + log.error("IOException:Unable to publish event:" + event + " streamid:" + streamid + + " Exception:" + e); + } catch (GeneralSecurityException e) { + // TODO Auto-generated catch block + log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + + streamid + " Exception:" + e); + } catch (IllegalArgumentException e) { + log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + + streamid + " Exception:" + e); + } + + } public synchronized void closePublisher() { - - try { - if (pub!= null) - { - final List stuck = pub.close(20, TimeUnit.SECONDS); - if ( stuck.size () > 0 ) { - log.error(stuck.size() + " messages unsent" ); - } - } - } - catch(InterruptedException ie) { - log.error("Caught an Interrupted Exception on Close event"); - }catch(IOException ioe) { - log.error("Caught IO Exception: " + ioe.toString()); - } - - } + + try { + if (pub != null) { + final List stuck = pub.close(20, TimeUnit.SECONDS); + if (!stuck.isEmpty()) { + log.error(stuck.size() + " messages unsent"); + } + } + } catch (InterruptedException ie) { + log.error("Caught an Interrupted Exception on Close event"); + } catch (IOException ioe) { + log.error("Caught IO Exception: " + ioe); + } + + } } diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java index 5d60a016..79108443 100644 --- a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java +++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java @@ -21,150 +21,138 @@ package org.onap.dcae.commonFunction; -import java.net.InetAddress; -import java.net.UnknownHostException; - - -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.att.nsa.clock.SaClock; - import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.LoggingContextFactory; import com.att.nsa.logging.log4j.EcompFields; - import jline.internal.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; public class VESLogger { - public static final String VES_AGENT = "VES_AGENT"; - - public static Logger auditLog; - public static Logger metricsLog; - public static Logger errorLog; - public static Logger debugLog; - - // Common LoggingContext - private static LoggingContext commonLC = null; - // Thread-specific LoggingContext - private static LoggingContext threadLC = null; - public LoggingContext lc ; - - - - /** - * Returns the common LoggingContext instance that is the base context - * for all subsequent instances. - * - * @return the common LoggingContext - */ - public static LoggingContext getCommonLoggingContext() - { - if (commonLC == null) - { - commonLC = new LoggingContextFactory.Builder().build(); - final UUID uuid = java.util.UUID.randomUUID(); - - commonLC.put("requestId", uuid.toString()); - } - return commonLC; - } - - /** - * Get a logging context for the current thread that's based on the common logging context. - * Populate the context with context-specific values. - * - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread (UUID aUuid) - { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it actually - // helps prevent the thread from overwriting supposedly common data. It also - // should be fairly quick compared with the overhead of handling the actual - // service call. - - threadLC = new LoggingContextFactory.Builder(). - withBaseContext ( getCommonLoggingContext () ). - build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put("requestId", aUuid.toString()); - threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () ); - - return threadLC; - } - - /** - * Get a logging context for the current thread that's based on the common logging context. - * Populate the context with context-specific values. - * - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread (String aUuid) - { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it actually - // helps prevent the thread from overwriting supposedly common data. It also - // should be fairly quick compared with the overhead of handling the actual - // service call. - - threadLC = new LoggingContextFactory.Builder(). - withBaseContext ( getCommonLoggingContext () ). - build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put("requestId", aUuid); - threadLC.put ( "statusCode", "COMPLETE" ); - threadLC.put ( EcompFields.kEndTimestamp, SaClock.now () ); - return threadLC; - } - public static void setUpEcompLogging() - { - - - // Create ECOMP Logger instances - auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); - metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); - errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); - - - final LoggingContext lc = getCommonLoggingContext(); - - String ipAddr = "127.0.0.1"; - String hostname = "localhost"; - try - { - final InetAddress ip = InetAddress.getLocalHost (); - hostname = ip.getCanonicalHostName (); - ipAddr = ip.getHostAddress(); - } - catch ( UnknownHostException x ) - { - Log.debug(x.getMessage()); - } - - lc.put ( "serverName", hostname ); - lc.put ( "serviceName", "VESCollecor" ); - lc.put ( "statusCode", "RUNNING" ); - lc.put ( "targetEntity", "NULL"); - lc.put ( "targetServiceName", "NULL"); - lc.put ( "server", hostname ); - lc.put ( "serverIpAddress", ipAddr.toString () ); - - // instance UUID is meaningless here, so we just create a new one each time the - // server starts. One could argue each new instantiation of the service should - // have a new instance ID. - lc.put ( "instanceUuid", "" ); - lc.put ( "severity", "" ); - lc.put ( EcompFields.kEndTimestamp, SaClock.now () ); - lc.put("EndTimestamp", SaClock.now ()); - lc.put("partnerName", "NA"); - - - } - - + public static final String VES_AGENT = "VES_AGENT"; + private static final String REQUEST_ID = "requestId"; + private static final String IP_ADDRESS ="127.0.0.1"; + private static final String HOST_NAME="localhost"; + + public static Logger auditLog; + public static Logger metricsLog; + public static Logger errorLog; + public static Logger debugLog; + + // Common LoggingContext + private static LoggingContext commonLC; + // Thread-specific LoggingContext + private static LoggingContext threadLC; + public LoggingContext lc; + + + /** + * Returns the common LoggingContext instance that is the base context + * for all subsequent instances. + * + * @return the common LoggingContext + */ + public static LoggingContext getCommonLoggingContext() { + if (commonLC == null) { + commonLC = new LoggingContextFactory.Builder().build(); + final UUID uuid = UUID.randomUUID(); + + commonLC.put(REQUEST_ID, uuid.toString()); + } + return commonLC; + } + + /** + * Get a logging context for the current thread that's based on the common logging context. + * Populate the context with context-specific values. + * + * @param aUuid uuid for request id + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread(UUID aUuid) { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it actually + // helps prevent the thread from overwriting supposedly common data. It also + // should be fairly quick compared with the overhead of handling the actual + // service call. + + threadLC = new LoggingContextFactory.Builder(). + withBaseContext(getCommonLoggingContext()). + build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put(REQUEST_ID, aUuid.toString()); + threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); + + return threadLC; + } + + /** + * Get a logging context for the current thread that's based on the common logging context. + * Populate the context with context-specific values. + * + * @param aUuid uuid for request id + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread(String aUuid) { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it actually + // helps prevent the thread from overwriting supposedly common data. It also + // should be fairly quick compared with the overhead of handling the actual + // service call. + + threadLC = new LoggingContextFactory.Builder(). + withBaseContext(getCommonLoggingContext()). + build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put(REQUEST_ID, aUuid); + threadLC.put("statusCode", "COMPLETE"); + threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); + return threadLC; + } + + public static void setUpEcompLogging() { + + // Create ECOMP Logger instances + auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); + metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); + errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); + + final LoggingContext lc = getCommonLoggingContext(); + + String ipAddr = IP_ADDRESS; + String hostname = HOST_NAME; + try { + final InetAddress ip = InetAddress.getLocalHost(); + hostname = ip.getCanonicalHostName(); + ipAddr = ip.getHostAddress(); + } catch (UnknownHostException x) { + Log.debug(x.getMessage()); + } + + lc.put("serverName", hostname); + lc.put("serviceName", "VESCollecor"); + lc.put("statusCode", "RUNNING"); + lc.put("targetEntity", "NULL"); + lc.put("targetServiceName", "NULL"); + lc.put("server", hostname); + lc.put("serverIpAddress", ipAddr); + + // instance UUID is meaningless here, so we just create a new one each time the + // server starts. One could argue each new instantiation of the service should + // have a new instance ID. + lc.put("instanceUuid", ""); + lc.put("severity", ""); + lc.put(EcompFields.kEndTimestamp, SaClock.now()); + lc.put("EndTimestamp", SaClock.now()); + lc.put("partnerName", "NA"); + } + } -- cgit 1.2.3-korg