aboutsummaryrefslogtreecommitdiffstats
path: root/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
diff options
context:
space:
mode:
authorITSERVICES\rb7147 <rb7147@att.com>2017-04-25 11:46:00 -0400
committerITSERVICES\rb7147 <rb7147@att.com>2017-05-03 09:58:17 -0400
commite0addf5b588a1244f9679becd90999dfcb4c3a94 (patch)
tree1212772d6366730266ff0e093c874b07aa716c29 /ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
parent39fb0f30472777e4b60d6a7ac8aa4eb9773961ff (diff)
Policy 1707 commit to LF
Change-Id: Ibe6f01d92f9a434c040abb05d5386e89d675ae65 Signed-off-by: ITSERVICES\rb7147 <rb7147@att.com>
Diffstat (limited to 'ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java')
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java184
1 files changed, 130 insertions, 54 deletions
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
index 6b8857273..5f62be3ac 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
@@ -24,28 +24,36 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.List;
import java.util.UUID;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusConsumer;
+import org.openecomp.policy.utils.BusPublisher;
+import org.openecomp.policy.xacml.api.XACMLErrorConstants;
import com.att.nsa.cambria.client.CambriaClientFactory;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaPublisher;
-import org.openecomp.policy.xacml.api.XACMLErrorConstants;
import com.att.research.xacml.util.XACMLProperties;
-import org.openecomp.policy.common.logging.flexlogger.*;
-
public class ManualNotificationUpdateThread implements Runnable {
- private static final Logger logger = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
-// private static List<String> uebURLList = null;
+
+ private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
+
private static String topic = null;
private static CambriaConsumer CConsumer = null;
-// private static Collection<String> clusterList = null;
private static String clusterList = null;
-// private Collection<String> urlList = null;
private static String update = null;
+ private static BusConsumer dmaapConsumer = null;
+ private static List<String> dmaapList = null;
+ private static String propNotificationType = null;
+ private static String aafLogin = null;
+ private static String aafPassword = null;
public volatile boolean isRunning = false;
@@ -67,77 +75,145 @@ public class ManualNotificationUpdateThread implements Runnable {
synchronized(this) {
this.isRunning = true;
}
+
URL aURL = null;
String group = UUID.randomUUID ().toString ();
String id = "0";
String returnTopic = null;
- try {
- ManualNotificationUpdateThread.clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
- String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
- aURL = new URL(url);
- topic = aURL.getHost() + aURL.getPort();
- } catch (NumberFormatException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
- this.isRunning = false;
- } catch (MalformedURLException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
- }
- String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
- SendMessage(consumerTopic, "Starting-Topic");
- final LinkedList<String> urlList = new LinkedList<String> ();
- for ( String u : clusterList.split ( "," ) ){
- urlList.add ( u );
- }
-
- try {
- CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
- } catch (MalformedURLException | GeneralSecurityException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
-
-
- while (this.isRunning()) {
- logger.debug("While loop test _ take out ");
+ propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
+ if ("ueb".equals(propNotificationType)){
+ try {
+ clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
+ String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
+ aURL = new URL(url);
+ topic = aURL.getHost() + aURL.getPort();
+ } catch (NumberFormatException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
+ this.isRunning = false;
+ } catch (MalformedURLException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
+ }
+
+ String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
+ SendMessage(consumerTopic, "Starting-Topic");
+ final LinkedList<String> urlList = new LinkedList<String> ();
+ for ( String u : clusterList.split ( "," ) ){
+ urlList.add ( u );
+ }
+
try {
- for ( String msg : CConsumer.fetch () ){
- logger.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
- returnTopic = processMessage(msg);
- if(returnTopic != null){
- SendMessage(returnTopic, update);
+ CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
+ } catch (MalformedURLException | GeneralSecurityException e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
+ }
+
+ while (this.isRunning()) {
+ LOGGER.debug("While loop test _ take out ");
+ try {
+ for ( String msg : CConsumer.fetch () ){
+ LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
+ returnTopic = processMessage(msg);
+ if(returnTopic != null){
+ SendMessage(returnTopic, update);
+ }
}
+ } catch (IOException e) {
+ LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
}
- } catch (IOException e) {
- logger.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
}
- }
- logger.debug("Stopping UEB Consuer loop will not logger fetch messages from the cluser");
+ LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
+ } else if ("dmaap".equals(propNotificationType)) {
+ String dmaapServers = null;
+ try {
+ dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+ aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+ aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
+ this.isRunning = false;
+ }
+
+ if(dmaapServers==null || topic==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ try {
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ dmaapServers.trim();
+ topic.trim();
+ aafLogin.trim();
+ aafPassword.trim();
+
+ String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
+ SendMessage(consumerTopic, "Starting-Topic");
+ dmaapList = new ArrayList<String>();
+ for ( String u : dmaapServers.split ( "," ) ){
+ dmaapList.add ( u );
+ }
+
+ try {
+
+ dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
+ } catch (Exception e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
+ }
+ while (this.isRunning()) {
+ LOGGER.debug("While loop test _ take out ");
+ try {
+ for ( String msg : dmaapConsumer.fetch () ){
+ LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
+ returnTopic = processMessage(msg);
+ if(returnTopic != null){
+ SendMessage(returnTopic, update);
+ }
+ }
+ }catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); }
+ }
+ LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
}
+ }
private void SendMessage( String topic, String message) {
CambriaPublisher pub = null;
+ BusPublisher publisher = null;
try {
- pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
- } catch (MalformedURLException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (GeneralSecurityException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
+ if ("ueb".equals(propNotificationType)) {
+ pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
+ pub.send( "pdpReturnMessage", message );
+ LOGGER.debug("Sending Message to UEB topic: " + topic);
+ pub.close();
+
+ } else if ("dmaap".equals(propNotificationType)){
+ publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
+ publisher.send( "pdpReturnMessage", message );
+ LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
+ publisher.close();
+ }
+
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
}
+
try {
pub.send( "pdpReturnMessage", message );
- logger.debug("Sending to Message to tpoic" + topic);
+ LOGGER.debug("Sending to Message to tpoic" + topic);
} catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
}
pub.close();
}
private String processMessage(String msg) {
- logger.debug("notification message: " + msg);
+ LOGGER.debug("notification message: " + msg);
String[] UID = msg.split("=")[1].split("\"");
+
String returnTopic = topic + UID[0];
if(msg.contains("Starting-Topic")){
return null;