aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java')
-rw-r--r--src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java305
1 files changed, 175 insertions, 130 deletions
diff --git a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
index 4aa6da4..b40fb24 100644
--- a/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
+++ b/src/main/java/org/openecomp/dcae/commonFunction/EventPublisher.java
@@ -1,136 +1,181 @@
-/*-
+/*-
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.dcae.commonFunction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.security.GeneralSecurityException;
-import java.net.MalformedURLException;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-
-
-public class EventPublisher {
-
- private static EventPublisher instance = null;
- private static CambriaBatchingPublisher pub = null;
-
- private String streamid = "";
- private static Logger log = LoggerFactory.getLogger(EventPublisher.class.getName());
-
-
-
- private EventPublisher(String CambriaConfigFile, String newstreamid) {
-
- this.streamid = newstreamid;
- try {
- String basicAuthUsername = DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".basicAuthUsername");
- if (basicAuthUsername != null)
- {
- //log.debug(streamid+".cambria.url" + streamid+".cambria.topic");
- log.debug("URL:" + DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".cambria.url") + "TOPIC:" + DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".cambria.topic") + "AuthUser:" + DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".basicAuthUsername") + "Authpwd:" + DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".basicAuthPassword"));
-
- pub = new CambriaClientBuilders.PublisherBuilder ()
- .usingHosts (DmaapPropertyReader.getInstance(CambriaConfigFile).dmaap_hash.get(streamid+".cambria.url"))
- .onTopic (DmaapPropertyReader.getInstance(CambriaConfigFile).dmaap_hash.get(streamid+".cambria.topic"))
- .usingHttps()
- .authenticatedByHttp ( DmaapPropertyReader.getInstance(CambriaConfigFile).dmaap_hash.get(streamid+".basicAuthUsername"), DmaapPropertyReader.getInstance(CambriaConfigFile).dmaap_hash.get(streamid+".basicAuthPassword") )
- .build ();
- }
- else
- {
- //log.debug(streamid+".cambria.url" + streamid+".cambria.topic");
- log.debug("URL:" + DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".cambria.url") + "TOPIC:" + DmaapPropertyReader.getInstance(CambriaConfigFile).getKeyValue(streamid+".cambria.topic"));
-
-
- pub = new CambriaClientBuilders.PublisherBuilder ()
- .usingHosts (DmaapPropertyReader.getInstance(CambriaConfigFile).dmaap_hash.get(streamid+".cambria.hosts"))
- .onTopic (DmaapPropertyReader.getInstance(CambriaConfigFile).dmaap_hash.get(streamid+".cambria.topic"))
- .build ();
-
- }
- }
- catch(GeneralSecurityException | MalformedURLException e ) {
- log.error("CambriaClientBuilders connection exception : " + e.getMessage());
- }
- catch(Exception e) {
- log.error("CambriaClientBuilders connection exception : " + e.getMessage());
- }
-
- }
-
- public static synchronized EventPublisher getInstance( String CambriaConfigFile, String streamid){
- if (instance == null) {
- instance = new EventPublisher(CambriaConfigFile, streamid);
- }
- return instance;
-
- }
-
- public synchronized void sendEvent(String event, String newstreamid ) {
-
- //Check if streamid changed
- if(! newstreamid.equals(this.streamid)) {
- closePublisher();
- instance = new EventPublisher (CommonStartup.cambriaConfigFile, newstreamid);
- }
-
-
- try {
- int pendingMsgs = pub.send("MyPartitionKey", event.toString());
-
- if(pendingMsgs > 100) {
- log.info("Pending Message Count="+pendingMsgs);
- }
-
- CommonStartup.oplog.info ("Event Published:" + event);
- } catch(IOException ioe) {
- log.error("Unable to publish event:" + event + " Exception:" + ioe.toString());
- }
-
-
-
-
- }
-
-
- public synchronized void closePublisher() {
-
- try {
- final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
- if ( stuck.size () > 0 ) {
- log.error(stuck.size() + " messages unsent" );
- }
- }
- catch(InterruptedException ie) {
- log.error("Caught an Interrupted Exception on Close event");
- }catch(IOException ioe) {
- log.error("Caught IO Exception: " + ioe.toString());
- }
-
- }
-}
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.dcae.commonFunction;
+
+import java.io.IOException;
+
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import java.security.GeneralSecurityException;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+
+
+public class EventPublisher {
+
+ private static EventPublisher instance = null;
+ private static CambriaBatchingPublisher pub = null;
+
+ private String streamid = "";
+ private String ueburl="";
+ private String topic="";
+ private String authuser="";
+ private String authpwd="";
+
+ private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
+
+
+ private EventPublisher( String newstreamid) {
+
+ this.streamid = newstreamid;
+ try {
+ ueburl=DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.url");
+
+ if (ueburl==null){
+ ueburl= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".cambria.hosts");
+ }
+ topic= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".cambria.topic");
+ authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).getKeyValue(streamid+".basicAuthUsername");
+
+
+ if (authuser != null) {
+ authpwd= DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash.get(streamid+".basicAuthPassword");
+ }
+ }
+ catch(Exception e) {
+ log.error("CambriaClientBuilders connection reader exception : " + e.getMessage());
+
+ }
+
+ }
+
+
+ public static synchronized EventPublisher getInstance( String streamid){
+ if (instance == null) {
+ instance = new EventPublisher(streamid);
+ }
+ if (!instance.streamid.equals(streamid)){
+ instance.closePublisher();
+ instance = new EventPublisher(streamid);
+ }
+ return instance;
+
+ }
+
+
+ public synchronized void sendEvent(JSONObject event) {
+
+ log.debug("EventPublisher.sendEvent: instance for publish is ready");
+
+
+ if (event.has("VESuniqueId"))
+ {
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
+ localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
+ log.debug("Removing VESuniqueid object from event");
+ event.remove("VESuniqueId");
+ }
+
+
+
+
+ try {
+
+ if (authuser != null)
+ {
+ log.debug("URL:" + ueburl + "TOPIC:" + topic + "AuthUser:" + authuser + "Authpwd:" + authpwd);
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts (ueburl)
+ .onTopic (topic)
+ .usingHttps()
+ .authenticatedByHttp (authuser, authpwd )
+ .logSendFailuresAfter(5)
+ // .logTo(log)
+ // .limitBatch(100, 10)
+ .build ();
+ }
+ else
+ {
+
+ log.debug("URL:" + ueburl + "TOPIC:" + topic );
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts (ueburl)
+ .onTopic (topic)
+ // .logTo(log)
+ .logSendFailuresAfter(5)
+ // .limitBatch(100, 10)
+ .build ();
+
+ }
+
+ int pendingMsgs = pub.send("MyPartitionKey", event.toString());
+ //this.wait(2000);
+
+ if(pendingMsgs > 100) {
+ log.info("Pending Message Count="+pendingMsgs);
+ }
+
+ //closePublisher();
+ log.info("pub.send invoked - no error");
+ CommonStartup.oplog.info ("URL:" + ueburl + "TOPIC:" + topic + "Event Published:" + event);
+
+ } catch(IOException e) {
+ log.error("IOException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ } catch (GeneralSecurityException e) {
+ // TODO Auto-generated catch block
+ log.error("GeneralSecurityException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ }
+ catch (IllegalArgumentException e)
+ {
+ log.error("IllegalArgumentException:Unable to publish event:" + event + " streamid:" + this.streamid + " Exception:" + e.toString());
+ }
+
+ }
+
+
+ public synchronized void closePublisher() {
+
+ try {
+ if (pub!= null)
+ {
+ final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+ if ( stuck.size () > 0 ) {
+ log.error(stuck.size() + " messages unsent" );
+ }
+ }
+ }
+ catch(InterruptedException ie) {
+ log.error("Caught an Interrupted Exception on Close event");
+ }catch(IOException ioe) {
+ log.error("Caught IO Exception: " + ioe.toString());
+ }
+
+ }
+}