aboutsummaryrefslogtreecommitdiffstats
path: root/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications
diff options
context:
space:
mode:
authorITSERVICES\rb7147 <rb7147@att.com>2017-04-25 11:46:00 -0400
committerITSERVICES\rb7147 <rb7147@att.com>2017-05-03 09:58:17 -0400
commite0addf5b588a1244f9679becd90999dfcb4c3a94 (patch)
tree1212772d6366730266ff0e093c874b07aa716c29 /ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications
parent39fb0f30472777e4b60d6a7ac8aa4eb9773961ff (diff)
Policy 1707 commit to LF
Change-Id: Ibe6f01d92f9a434c040abb05d5386e89d675ae65 Signed-off-by: ITSERVICES\rb7147 <rb7147@att.com>
Diffstat (limited to 'ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications')
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java184
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java12
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java116
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java136
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java6
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java17
6 files changed, 349 insertions, 122 deletions
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
index 6b8857273..5f62be3ac 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
@@ -24,28 +24,36 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.List;
import java.util.UUID;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusConsumer;
+import org.openecomp.policy.utils.BusPublisher;
+import org.openecomp.policy.xacml.api.XACMLErrorConstants;
import com.att.nsa.cambria.client.CambriaClientFactory;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaPublisher;
-import org.openecomp.policy.xacml.api.XACMLErrorConstants;
import com.att.research.xacml.util.XACMLProperties;
-import org.openecomp.policy.common.logging.flexlogger.*;
-
public class ManualNotificationUpdateThread implements Runnable {
- private static final Logger logger = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
-// private static List<String> uebURLList = null;
+
+ private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
+
private static String topic = null;
private static CambriaConsumer CConsumer = null;
-// private static Collection<String> clusterList = null;
private static String clusterList = null;
-// private Collection<String> urlList = null;
private static String update = null;
+ private static BusConsumer dmaapConsumer = null;
+ private static List<String> dmaapList = null;
+ private static String propNotificationType = null;
+ private static String aafLogin = null;
+ private static String aafPassword = null;
public volatile boolean isRunning = false;
@@ -67,77 +75,145 @@ public class ManualNotificationUpdateThread implements Runnable {
synchronized(this) {
this.isRunning = true;
}
+
URL aURL = null;
String group = UUID.randomUUID ().toString ();
String id = "0";
String returnTopic = null;
- try {
- ManualNotificationUpdateThread.clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
- String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
- aURL = new URL(url);
- topic = aURL.getHost() + aURL.getPort();
- } catch (NumberFormatException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
- this.isRunning = false;
- } catch (MalformedURLException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
- }
- String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
- SendMessage(consumerTopic, "Starting-Topic");
- final LinkedList<String> urlList = new LinkedList<String> ();
- for ( String u : clusterList.split ( "," ) ){
- urlList.add ( u );
- }
-
- try {
- CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
- } catch (MalformedURLException | GeneralSecurityException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
-
-
- while (this.isRunning()) {
- logger.debug("While loop test _ take out ");
+ propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
+ if ("ueb".equals(propNotificationType)){
+ try {
+ clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
+ String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
+ aURL = new URL(url);
+ topic = aURL.getHost() + aURL.getPort();
+ } catch (NumberFormatException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
+ this.isRunning = false;
+ } catch (MalformedURLException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
+ }
+
+ String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
+ SendMessage(consumerTopic, "Starting-Topic");
+ final LinkedList<String> urlList = new LinkedList<String> ();
+ for ( String u : clusterList.split ( "," ) ){
+ urlList.add ( u );
+ }
+
try {
- for ( String msg : CConsumer.fetch () ){
- logger.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
- returnTopic = processMessage(msg);
- if(returnTopic != null){
- SendMessage(returnTopic, update);
+ CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
+ } catch (MalformedURLException | GeneralSecurityException e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
+ }
+
+ while (this.isRunning()) {
+ LOGGER.debug("While loop test _ take out ");
+ try {
+ for ( String msg : CConsumer.fetch () ){
+ LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
+ returnTopic = processMessage(msg);
+ if(returnTopic != null){
+ SendMessage(returnTopic, update);
+ }
}
+ } catch (IOException e) {
+ LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
}
- } catch (IOException e) {
- logger.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
}
- }
- logger.debug("Stopping UEB Consuer loop will not logger fetch messages from the cluser");
+ LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
+ } else if ("dmaap".equals(propNotificationType)) {
+ String dmaapServers = null;
+ try {
+ dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+ aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+ aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
+ this.isRunning = false;
+ }
+
+ if(dmaapServers==null || topic==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ try {
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ dmaapServers.trim();
+ topic.trim();
+ aafLogin.trim();
+ aafPassword.trim();
+
+ String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
+ SendMessage(consumerTopic, "Starting-Topic");
+ dmaapList = new ArrayList<String>();
+ for ( String u : dmaapServers.split ( "," ) ){
+ dmaapList.add ( u );
+ }
+
+ try {
+
+ dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
+ } catch (Exception e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
+ }
+ while (this.isRunning()) {
+ LOGGER.debug("While loop test _ take out ");
+ try {
+ for ( String msg : dmaapConsumer.fetch () ){
+ LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
+ returnTopic = processMessage(msg);
+ if(returnTopic != null){
+ SendMessage(returnTopic, update);
+ }
+ }
+ }catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); }
+ }
+ LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
}
+ }
private void SendMessage( String topic, String message) {
CambriaPublisher pub = null;
+ BusPublisher publisher = null;
try {
- pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
- } catch (MalformedURLException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (GeneralSecurityException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
+ if ("ueb".equals(propNotificationType)) {
+ pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
+ pub.send( "pdpReturnMessage", message );
+ LOGGER.debug("Sending Message to UEB topic: " + topic);
+ pub.close();
+
+ } else if ("dmaap".equals(propNotificationType)){
+ publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
+ publisher.send( "pdpReturnMessage", message );
+ LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
+ publisher.close();
+ }
+
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
}
+
try {
pub.send( "pdpReturnMessage", message );
- logger.debug("Sending to Message to tpoic" + topic);
+ LOGGER.debug("Sending to Message to tpoic" + topic);
} catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
}
pub.close();
}
private String processMessage(String msg) {
- logger.debug("notification message: " + msg);
+ LOGGER.debug("notification message: " + msg);
String[] UID = msg.split("=")[1].split("\"");
+
String returnTopic = topic + UID[0];
if(msg.contains("Starting-Topic")){
return null;
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java
index 5ab165b0a..c9b510ec3 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java
@@ -22,16 +22,19 @@ package org.openecomp.policy.pdp.rest.notifications;
import java.util.Collection;
+import org.openecomp.policy.api.NotificationType;
+
/**
* Notification is the POJO which will be used to send the Notifications to the Server.
* Notification must contain the Removal and Updated policies.
*
- * @version 0.1
+ * @version 0.2
*
*/
public class Notification {
private Collection<Removed> removedPolicies = null;
private Collection<Updated> loadedPolicies = null;
+ private NotificationType notificationType= null;
public Collection<Removed> getRemovedPolicies() {
return removedPolicies;
@@ -49,4 +52,11 @@ public class Notification {
this.loadedPolicies = loadedPolicies;
}
+ public NotificationType getNotificationType() {
+ return notificationType;
+ }
+
+ public void setNotificationType(NotificationType notificationType){
+ this.notificationType= notificationType;
+ }
}
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java
index 1dfd07422..14d7aa165 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java
@@ -39,12 +39,15 @@ import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.openecomp.policy.pdp.rest.PapUrlResolver;
-import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.api.NotificationType;
+import org.openecomp.policy.api.RemovedPolicy;
+import org.openecomp.policy.api.UpdateType;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
-
+import org.openecomp.policy.pdp.rest.PapUrlResolver;
+import org.openecomp.policy.rest.XACMLRestProperties;
import org.openecomp.policy.xacml.api.XACMLErrorConstants;
+
import com.att.research.xacml.api.pap.PDPPolicy;
import com.att.research.xacml.api.pap.PDPStatus;
import com.att.research.xacml.util.XACMLProperties;
@@ -64,7 +67,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
*
*/
public class NotificationController {
- private static final Logger logger = FlexLogger.getLogger(NotificationController.class);
+ private static final Logger LOGGER = FlexLogger.getLogger(NotificationController.class);
private static Notification record = new Notification();
private PDPStatus oldStatus = null;
private Removed removed = null;
@@ -89,14 +92,14 @@ public class NotificationController {
oldStatus = newStatus;
}
// Debugging purpose only.
- logger.debug("old config Status :" + oldStatus.getStatus());
- logger.debug("new config Status :" + newStatus.getStatus());
+ LOGGER.debug("old config Status :" + oldStatus.getStatus());
+ LOGGER.debug("new config Status :" + newStatus.getStatus());
// Depending on the above condition taking the Change as an Update.
if (oldStatus.getStatus().toString() != newStatus.getStatus().toString()) {
- logger.info("There is an Update to the PDP");
- logger.debug(oldStatus.getLoadedPolicies());
- logger.debug(newStatus.getLoadedPolicies());
+ LOGGER.info("There is an Update to the PDP");
+ LOGGER.debug(oldStatus.getLoadedPolicies());
+ LOGGER.debug(newStatus.getLoadedPolicies());
// Check if there is an Update/additions in the policy.
for (PDPPolicy newPolicy : newStatus.getLoadedPolicies()) {
boolean change = true;
@@ -143,15 +146,16 @@ public class NotificationController {
// Call the Notification Server..
notification.setRemovedPolicies(removedPolicies);
notification.setLoadedPolicies(updatedPolicies);
+ notification = setUpdateTypes(updated, removed, notification);
ObjectWriter om = new ObjectMapper().writer();
try {
notificationJSON = om.writeValueAsString(notification);
- logger.info(notificationJSON);
+ LOGGER.info(notificationJSON);
// NotificationServer Method here.
propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
pdpURL = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
- if (propNotificationType!=null && propNotificationType.equals("ueb") && !manualThreadStarted) {
- logger.debug("Starting Thread to accept UEB notfications.");
+ if (("ueb".equals(propNotificationType)||"dmaap".equals(propNotificationType)) && !manualThreadStarted) {
+ LOGGER.debug("Starting Thread to accept UEB or DMAAP notfications.");
this.registerMaunualNotificationRunnable = new ManualNotificationUpdateThread();
this.manualNotificationThread = new Thread(this.registerMaunualNotificationRunnable);
this.manualNotificationThread.start();
@@ -162,15 +166,15 @@ public class NotificationController {
try{
notificationJSON= record(notification);
}catch(Exception e){
- logger.error(e);
- // TODO:EELF Cleanup - Remove logger
+ LOGGER.error(e);
+ // TODO:EELF Cleanup - Remove LOGGER
//PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, "");
}
NotificationServer.setUpdate(notificationJSON);
ManualNotificationUpdateThread.setUpdate(notificationJSON);
} catch (JsonProcessingException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage());
- // TODO:EELF Cleanup - Remove logger
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage());
+ // TODO:EELF Cleanup - Remove LOGGER
//PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, "");
}
}
@@ -178,7 +182,12 @@ public class NotificationController {
public static void sendNotification(){
if(notificationFlag){
- NotificationServer.sendNotification(notificationJSON, propNotificationType, pdpURL);
+ try {
+ NotificationServer.sendNotification(notificationJSON, propNotificationType, pdpURL);
+ } catch (Exception e) {
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ e.printStackTrace();
+ }
notificationFlag = false;
}
}
@@ -186,9 +195,9 @@ public class NotificationController {
private void sendremove(PDPPolicy oldPolicy) {
removed = new Removed();
// Want to know what is removed ?
- // logger.info("The Policy removed is: " + oldPolicy.getId());
- // logger.info("The version no. is: " + oldPolicy.getVersion());
- logger.info("Policy removed: " + oldPolicy.getId()+ " with version number: " + oldPolicy.getVersion());
+ // LOGGER.info("The Policy removed is: " + oldPolicy.getId());
+ // LOGGER.info("The version no. is: " + oldPolicy.getVersion());
+ LOGGER.info("Policy removed: " + oldPolicy.getId()+ " with version number: " + oldPolicy.getVersion());
removed.setPolicyName(oldPolicy.getId());
removed.setVersionNo(oldPolicy.getVersion());
removeFile(oldPolicy);
@@ -197,12 +206,13 @@ public class NotificationController {
private void sendUpdate(PDPPolicy newPolicy,HashMap<String, PolicyDef> policyContainer) {
updated = new Updated();
// Want to know what is new ?
- logger.info("The new Policy is: " + newPolicy.getId());
- logger.info("The version no. is: " + newPolicy.getVersion());
+ LOGGER.info("The new Policy is: " + newPolicy.getId());
+ LOGGER.info("The version no. is: " + newPolicy.getVersion());
updated.setPolicyName(newPolicy.getId());
updated.setVersionNo(newPolicy.getVersion());
+ updated.setUpdateType(UpdateType.NEW);
// If the policy is of Config type then retrieve its matches.
- if (newPolicy.getName().startsWith("Config")) {
+ if (newPolicy.getName().contains(".Config_")) {
// Take a Configuration copy to PDP webapps.
final String urlStart = "attributeId=URLID,expression";
final String urlEnd = "}}},{";
@@ -223,19 +233,19 @@ public class NotificationController {
HashMap<String, String> matchValues = new HashMap<String, String>();
while (matches.hasNext()) {
Match match = matches.next();
- logger.info("Attribute Value is: "+ match.getAttributeValue().getValue().toString());
+ LOGGER.info("Attribute Value is: "+ match.getAttributeValue().getValue().toString());
String[] result = match.getAttributeRetrievalBase().toString().split("attributeId=");
result[1] = result[1].replaceAll("}", "");
if (!result[1].equals("urn:oasis:names:tc:xacml:1.0:subject:subject-id")) {
- logger.info("Attribute id is: " + result[1]);
+ LOGGER.info("Attribute id is: " + result[1]);
}
matchValues.put(result[1], match.getAttributeValue().getValue().toString());
- logger.info("Match is : "+ result[1]+ " , " + match.getAttributeValue().getValue().toString());
+ LOGGER.info("Match is : "+ result[1]+ " , " + match.getAttributeValue().getValue().toString());
}
updated.setMatches(matchValues);
}
}
- }else if(newPolicy.getName().startsWith("Action")){
+ }else if(newPolicy.getName().contains(".Action_")){
// Take Configuration copy to PDP Webapps.
// Action policies have .json as extension.
String urlString = "$URL/Action/" + newPolicy.getId().substring(0, newPolicy.getId().lastIndexOf(".")) + ".json";
@@ -244,7 +254,7 @@ public class NotificationController {
}
// Adding this for Recording the changes to serve Polling requests..
- public static String record(Notification notification) throws Exception {
+ private static String record(Notification notification) throws Exception {
// Initialization with updates.
if (record.getRemovedPolicies() == null || record.getLoadedPolicies() == null) {
record.setRemovedPolicies(notification.getRemovedPolicies());
@@ -317,14 +327,44 @@ public class NotificationController {
try {
json = om.writeValueAsString(record);
} catch (JsonProcessingException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage());
- // TODO:EELF Cleanup - Remove logger
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage());
+ // TODO:EELF Cleanup - Remove LOGGER
//PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, "");
}
- logger.info(json);
+ LOGGER.info(json);
return json;
}
+ private static Notification setUpdateTypes(boolean updated, boolean removed, Notification notification) {
+ if(notification!=null){
+ if(updated && removed){
+ notification.setNotificationType(NotificationType.BOTH);
+ if(notification.getLoadedPolicies()!=null){
+ HashSet<Updated> updatedPolicies = new HashSet<Updated>();
+ for(Updated oldUpdatedPolicy: notification.getLoadedPolicies()){
+ Updated updatePolicy = oldUpdatedPolicy;
+ if(notification.getRemovedPolicies()!=null){
+ for(RemovedPolicy removedPolicy: notification.getRemovedPolicies()){
+ String regex = ".(\\d)*.xml";
+ if(removedPolicy.getPolicyName().replaceAll(regex, "").equals(oldUpdatedPolicy.getPolicyName().replaceAll(regex, ""))){
+ updatePolicy.setUpdateType(UpdateType.UPDATE);
+ break;
+ }
+ }
+ }
+ updatedPolicies.add(updatePolicy);
+ }
+ notification.setLoadedPolicies(updatedPolicies);
+ }
+ }else if(updated){
+ notification.setNotificationType(NotificationType.UPDATE);
+ }else if(removed){
+ notification.setNotificationType(NotificationType.REMOVE);
+ }
+ }
+ return notification;
+ }
+
private void removeFile(PDPPolicy oldPolicy) {
try{
Path removedPolicyFile = Paths.get(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_CONFIG)+File.separator+oldPolicy.getId());
@@ -346,8 +386,8 @@ public class NotificationController {
}
}
}catch(Exception e){
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Couldn't remove the policy/config file " + oldPolicy.getName());
- // TODO:EELF Cleanup - Remove logger
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Couldn't remove the policy/config file " + oldPolicy.getName());
+ // TODO:EELF Cleanup - Remove LOGGER
//PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e, "Couldn't remove the policy file " + oldPolicy.getName());
}
}
@@ -358,7 +398,7 @@ public class NotificationController {
try {
Files.createDirectories(configLocation);
} catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"Failed to create config directory: " + configLocation.toAbsolutePath().toString(), e);
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"Failed to create config directory: " + configLocation.toAbsolutePath().toString(), e);
}
}
PapUrlResolver papUrls = PapUrlResolver.getInstance();
@@ -370,7 +410,7 @@ public class NotificationController {
String fileLocation = configLocation.toString() + File.separator + fileName;
try {
URL papURL = new URL(papAddress);
- logger.info("Calling " +papAddress + " for Configuration Copy.");
+ LOGGER.info("Calling " +papAddress + " for Configuration Copy.");
URLConnection urlConnection = papURL.openConnection();
File file= new File(fileLocation);
try (InputStream is = urlConnection.getInputStream();
@@ -379,11 +419,11 @@ public class NotificationController {
break;
}
} catch (MalformedURLException e) {
- logger.error(e + e.getMessage());
+ LOGGER.error(e + e.getMessage());
} catch(FileNotFoundException e){
- logger.error(e + e.getMessage());
+ LOGGER.error(e + e.getMessage());
} catch (IOException e) {
- logger.error(e + e.getMessage());
+ LOGGER.error(e + e.getMessage());
}
papUrls.getNext();
}
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
index d6cda7491..fe295ebb8 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
@@ -24,8 +24,12 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import javax.websocket.OnClose;
import javax.websocket.OnError;
@@ -34,28 +38,31 @@ import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
-import org.openecomp.policy.rest.XACMLRestProperties;
import org.openecomp.policy.common.logging.eelf.MessageCodes;
import org.openecomp.policy.common.logging.eelf.PolicyLogger;
-import com.att.nsa.cambria.client.CambriaClientFactory;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusPublisher;
import org.openecomp.policy.xacml.api.XACMLErrorConstants;
-import com.att.research.xacml.util.XACMLProperties;
-import org.openecomp.policy.common.logging.flexlogger.*;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
/**
* The NotificationServer sends the Server Notifications to the Clients once there is any Event.
* WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications.
+ * UEB is being used as a medium for sending Notifications.
+ * DMAAP is being used as a medium for sending Notifications.
*
- * @version 0.1
+ * @version 0.2
*
**/
@ServerEndpoint(value = "/notifications")
public class NotificationServer {
- private static final Logger logger = FlexLogger.getLogger(NotificationServer.class);
+ private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>();
private static String update = null;
private static String hosts = null;
@@ -63,7 +70,7 @@ public class NotificationServer {
@OnOpen
public void openConnection(Session session) {
- logger.info("Session Connected: " + session.getId());
+ LOGGER.info("Session Connected: " + session.getId());
queue.add(session);
}
@@ -75,7 +82,7 @@ public class NotificationServer {
@OnError
public void error(Session session, Throwable t) {
queue.remove(session);
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
}
@@ -87,16 +94,17 @@ public class NotificationServer {
session.getBasicRemote().sendText(update);
session.close();
} catch (IOException e) {
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
}
}
}
- public static void sendNotification(String notification, String propNotificationType, String pdpURL){
+ public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception {
- logger.debug("Notification set to " + propNotificationType);
+ LOGGER.debug("Notification set to " + propNotificationType);
if (propNotificationType.equals("ueb")){
+
String topic = null;
try {
aURL = new URL(pdpURL);
@@ -104,33 +112,111 @@ public class NotificationServer {
} catch (MalformedURLException e1) {
pdpURL = pdpURL.replace("/", "");
topic = pdpURL.replace(":", "");
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
}
- hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
- logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
- CambriaPublisher pub = null;
+ hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
+ String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
+
+ LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+ CambriaBatchingPublisher pub = null;
try {
- pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic );
+ if(hosts==null || topic==null || apiKey==null || apiSecret==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+ }
+
+ hosts.trim();
+ topic.trim();
+ apiKey.trim();
+ apiSecret.trim();
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts ( hosts )
+ .onTopic ( topic )
+ .authenticatedBy ( apiKey, apiSecret )
+ .build ()
+ ;
+
} catch (MalformedURLException e1) {
- // TODO Auto-generated catch block
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
e1.printStackTrace();
} catch (GeneralSecurityException e1) {
- // TODO Auto-generated catch block
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
e1.printStackTrace();
}
+
try {
pub.send( "MyPartitionKey", notification );
} catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update");
- }
- pub.close();
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+ }
+
+ // close the publisher. The batching publisher does not send events
+ // immediately, so you MUST use close to send any remaining messages.
+ // You provide the amount of time you're willing to wait for the sends
+ // to succeed before giving up. If any messages are unsent after that time,
+ // they're returned to your app. You could, for example, persist to disk
+ // and try again later.
+ final List stuck = pub.close ( 20, TimeUnit.SECONDS );
+
+ if ( stuck.size () > 0 )
+ {
+ System.err.println ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ System.out.println ( "Clean exit; all messages sent: " + notification );
+ }
+
+ } else if (propNotificationType.equals("dmaap")) {
+
+ // Setting up the Publisher for DMaaP MR
+ String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+ String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+ String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+
+ try {
+ if(dmaapServers==null || topic==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ }
+
+ dmaapServers.trim();
+ topic.trim();
+ aafLogin.trim();
+ aafPassword.trim();
+
+ List<String> dmaapList = null;
+ if(dmaapServers.contains(",")) {
+ dmaapList = new ArrayList<String>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+ } else {
+ dmaapList = new ArrayList<String>();
+ dmaapList.add(dmaapServers);
+ }
+
+ BusPublisher publisher =
+ new BusPublisher.DmaapPublisherWrapper(dmaapList,
+ topic,
+ aafLogin,
+ aafPassword);
+
+ // Sending notification through DMaaP Message Router
+ publisher.send( "MyPartitionKey", notification);
+ LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+ publisher.close();
+
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+ }
}
+
for(Session session: queue) {
try {
session.getBasicRemote().sendText(notification);
} catch (IOException e) {
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
}
}
}
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java
index 6ba073815..151b40125 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java
@@ -20,15 +20,17 @@
package org.openecomp.policy.pdp.rest.notifications;
+import org.openecomp.policy.api.RemovedPolicy;
+
/**
* Removal is the POJO for removal updates of the Policy.
* It must have the Policy removed and its Version number.
*
- * @version 0.1
+ * @version 0.2
*
*/
-public class Removed {
+public class Removed implements RemovedPolicy{
private String policyName = null;
private String versionNo = null;
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java
index 39236bada..f0d9aa38d 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java
@@ -22,17 +22,21 @@ package org.openecomp.policy.pdp.rest.notifications;
import java.util.HashMap;
+import org.openecomp.policy.api.LoadedPolicy;
+import org.openecomp.policy.api.UpdateType;
+
/**
* Updated is the POJO which consists of any new or Updated Policy information.
* It must hold the Policy Name, version Number, Matches.
*
- * @version 0.1
+ * @version 0.2
*
*/
-public class Updated {
+public class Updated implements LoadedPolicy{
private String policyName = null;
private String versionNo = null;
private HashMap<String,String> matches = null;
+ private UpdateType updateType = null;
public String getPolicyName() {
return policyName;
@@ -57,5 +61,14 @@ public class Updated {
public void setMatches(HashMap<String,String> matches) {
this.matches = matches;
}
+
+ @Override
+ public UpdateType getUpdateType() {
+ return this.updateType;
+ }
+
+ public void setUpdateType(UpdateType updateType){
+ this.updateType = updateType;
+ }
}