aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java')
-rw-r--r--src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java55
1 files changed, 23 insertions, 32 deletions
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
index b40fb24..62a77cd 100644
--- a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
+++ b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
@@ -39,19 +39,18 @@ import com.att.nsa.logging.log4j.EcompFields;
public class EventPublisher {
- private static EventPublisher instance = null;
- private static CambriaBatchingPublisher pub = null;
-
+
+ private CambriaBatchingPublisher pub = null;
private String streamid = "";
private String ueburl="";
private String topic="";
private String authuser="";
private String authpwd="";
- private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
+ private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
- private EventPublisher( String newstreamid) {
+ EventPublisher( String newstreamid) {
this.streamid = newstreamid;
try {
@@ -75,28 +74,18 @@ public class EventPublisher {
}
-
- public static synchronized EventPublisher getInstance( String streamid){
- if (instance == null) {
- instance = new EventPublisher(streamid);
- }
- if (!instance.streamid.equals(streamid)){
- instance.closePublisher();
- instance = new EventPublisher(streamid);
- }
- return instance;
-
- }
+
- public synchronized void sendEvent(JSONObject event) {
+ public void sendEvent(JSONObject event) {
log.debug("EventPublisher.sendEvent: instance for publish is ready");
+ String uuid = "";
if (event.has("VESuniqueId"))
{
- String uuid = event.get("VESuniqueId").toString();
+ uuid = event.get("VESuniqueId").toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
log.debug("Removing VESuniqueid object from event");
@@ -104,13 +93,12 @@ public class EventPublisher {
}
-
try {
if (authuser != null)
{
- log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd);
+ log.debug("URL:" + ueburl + " TOPIC:" + topic + " AuthUser:" + authuser);
pub = new CambriaClientBuilders.PublisherBuilder ()
.usingHosts (ueburl)
.onTopic (topic)
@@ -124,7 +112,7 @@ public class EventPublisher {
else
{
- log.debug("URL:" + ueburl + "TOPIC:" + topic );
+ log.debug("URL:" + ueburl + " TOPIC:" + topic );
pub = new CambriaClientBuilders.PublisherBuilder ()
.usingHosts (ueburl)
.onTopic (topic)
@@ -142,25 +130,26 @@ public class EventPublisher {
log.info("Pending Message Count="+pendingMsgs);
}
- //closePublisher();
log.info("pub.send invoked - no error");
- CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event);
+ CommonStartup.oplog.info ("URL:" + ueburl + " TOPIC:" + topic + " VESuniqueId:" + uuid + " Event Published:" + event);
} catch(IOException e) {
- log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ log.error("IOException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ e.printStackTrace();
} catch (GeneralSecurityException e) {
- // TODO Auto-generated catch block
- log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ log.error("GeneralSecurityException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ e.printStackTrace();
}
catch (IllegalArgumentException e)
{
- log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ log.error("IllegalArgumentException: Unable to publish VESuniqueId:" + uuid + " event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ e.printStackTrace();
}
}
- public synchronized void closePublisher() {
+ public void closePublisher() {
try {
if (pub!= null)
@@ -171,10 +160,12 @@ public class EventPublisher {
}
}
}
- catch(InterruptedException ie) {
+ catch(InterruptedException e) {
log.error("Caught an Interrupted Exception on Close event");
- }catch(IOException ioe) {
- log.error("Caught IO Exception: " + ioe.toString());
+ e.printStackTrace();
+ }catch(IOException e) {
+ log.error("Caught IO Exception: " + e.toString());
+ e.printStackTrace();
}
}