diff options
Diffstat (limited to 'ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications')
3 files changed, 564 insertions, 523 deletions
diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java index c1306572f..9027e27a5 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java @@ -44,186 +44,205 @@ import com.att.research.xacml.util.XACMLProperties; @SuppressWarnings("deprecation") public class ManualNotificationUpdateThread implements Runnable { - private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class); - - private static String topic = null; - private static CambriaConsumer CConsumer = null; - private static String clusterList = null; - private static String update = null; - private static BusConsumer dmaapConsumer = null; - private static List<String> dmaapList = null; - private static String propNotificationType = null; - private static String aafLogin = null; - private static String aafPassword = null; - - public volatile boolean isRunning = false; - - public synchronized boolean isRunning() { - return this.isRunning; - } - - public synchronized void terminate() { - this.isRunning = false; - } - - /** - * - * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests - * - */ - @Override - public void run() { - synchronized(this) { - this.isRunning = true; - } - - URL aURL = null; - String group = UUID.randomUUID ().toString (); - String id = "0"; - String returnTopic = null; - propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE); - if ("ueb".equals(propNotificationType)){ - try { - clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim(); - String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); - aURL = new URL(url); - topic = aURL.getHost() + aURL.getPort(); - } catch (NumberFormatException e) { - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e); - this.isRunning = false; - } catch (MalformedURLException e) { - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e); - } - if(aURL != null){ - String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest"; - SendMessage(consumerTopic, "Starting-Topic"); - final LinkedList<String> urlList = new LinkedList<> (); - for ( String u : clusterList.split ( "," ) ){ - urlList.add ( u ); - } - - try { - CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 ); - } catch (MalformedURLException | GeneralSecurityException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1); - } - - while (this.isRunning()) { - LOGGER.debug("While loop test _ take out "); - try { - for ( String msg : CConsumer.fetch () ){ - LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : "); - returnTopic = processMessage(msg); - if(returnTopic != null){ - SendMessage(returnTopic, update); - } - } - } catch (IOException e) { - LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e); - } - } - LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster"); - } - } else if ("dmaap".equals(propNotificationType)) { - String dmaapServers = null; - try { - dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); - topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); - aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); - aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); - } catch (Exception e) { - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e); - this.isRunning = false; - } - - if(dmaapServers==null || topic==null){ - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - try { - throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - } catch (Exception e) { - LOGGER.error(e); - } - } - - dmaapServers.trim(); - topic.trim(); - aafLogin.trim(); - aafPassword.trim(); - - String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim(); - SendMessage(consumerTopic, "Starting-Topic"); - dmaapList = new ArrayList<>(); - for ( String u : dmaapServers.split ( "," ) ){ - dmaapList.add ( u ); - } - - try { - - dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000); - } catch (Exception e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1); - } - - while (this.isRunning()) { - LOGGER.debug("While loop test _ take out "); - try { - for ( String msg : dmaapConsumer.fetch () ){ - LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : "); - returnTopic = processMessage(msg); - if(returnTopic != null){ - SendMessage(returnTopic, update); - } - } - }catch (Exception e) { - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); } - } - LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers"); - } - } - - private void SendMessage( String topic, String message) { - CambriaPublisher pub = null; - BusPublisher publisher = null; - try { - if ("ueb".equals(propNotificationType)) { - pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic ); - pub.send( "pdpReturnMessage", message ); - LOGGER.debug("Sending Message to UEB topic: " + topic); - pub.close(); - - } else if ("dmaap".equals(propNotificationType)){ - publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword); - publisher.send( "pdpReturnMessage", message ); - LOGGER.debug("Sending to Message to DMaaP topic: " + topic); - publisher.close(); - } - - } catch (Exception e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e); - } - if(pub != null){ - try { - pub.send( "pdpReturnMessage", message ); - LOGGER.debug("Sending to Message to tpoic" + topic); - pub.close(); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e); - } - } - } - - private String processMessage(String msg) { - LOGGER.debug("notification message: " + msg); - String[] UID = msg.split("=")[1].split("\""); - - String returnTopic = topic + UID[0]; - if(msg.contains("Starting-Topic")){ - return null; - } - return returnTopic; - } - public static void setUpdate(String update) { - ManualNotificationUpdateThread.update = update; - } - + private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class); + + private String topic = null; + private CambriaConsumer cConsumer = null; + private static String clusterList = null; + private static String update = null; + private BusConsumer dmaapConsumer = null; + private List<String> dmaapList = null; + private static String propNotificationType = null; + private static String aafLogin = null; + private static String aafPassword = null; + + public volatile boolean isRunning = false; + + public synchronized boolean isRunning() { + return this.isRunning; + } + + public synchronized void terminate() { + this.isRunning = false; + } + + /** + * + * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests + * + */ + @Override + public void run() { + synchronized (this) { + this.isRunning = true; + } + + URL aURL = null; + String group = UUID.randomUUID().toString(); + String id = "0"; + String returnTopic = null; + setPropNotification(); + if ("ueb".equals(propNotificationType)) { + try { + setCluster(); + String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); + aURL = new URL(url); + topic = aURL.getHost() + aURL.getPort(); + } catch (NumberFormatException e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e); + this.isRunning = false; + } catch (MalformedURLException e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + + "Error in processing URL to create topic for Notification ", e); + } + if (aURL != null) { + String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest"; + sendMessage(consumerTopic, "Starting-Topic"); + final LinkedList<String> urlList = new LinkedList<>(); + for (String u : clusterList.split(",")) { + urlList.add(u); + } + + try { + cConsumer = CambriaClientFactory.createConsumer(null, urlList, consumerTopic, group, id, 20 * 1000, + 1000); + } catch (MalformedURLException | GeneralSecurityException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1); + } + + while (this.isRunning()) { + LOGGER.debug("While loop test _ take out "); + try { + for (String msg : cConsumer.fetch()) { + LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : "); + returnTopic = processMessage(msg); + if (returnTopic != null) { + sendMessage(returnTopic, update); + } + } + } catch (IOException e) { + LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e); + } + } + LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster"); + } + } else if ("dmaap".equals(propNotificationType)) { + String dmaapServers = null; + try { + dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); + setAAFCreds(); + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e); + this.isRunning = false; + } + + if (dmaapServers == null || topic == null) { + LOGGER.error( + XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + } + + dmaapServers = dmaapServers.trim(); + topic = topic.trim(); + + String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim(); + sendMessage(consumerTopic, "Starting-Topic"); + dmaapList = new ArrayList<>(); + for (String u : dmaapServers.split(",")) { + dmaapList.add(u); + } + + try { + dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, + group, id, 20 * 1000, 1000); + } catch (Exception e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1); + } + + while (this.isRunning()) { + LOGGER.debug("While loop test _ take out "); + try { + for (String msg : dmaapConsumer.fetch()) { + LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : "); + returnTopic = processMessage(msg); + if (returnTopic != null) { + sendMessage(returnTopic, update); + } + } + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); + } + } + LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers"); + } + } + + private static void setAAFCreds() { + aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + if (aafLogin != null) { + aafLogin = aafLogin.trim(); + } + if (aafPassword != null) { + aafPassword = aafPassword.trim(); + } + } + + private static void setCluster() { + clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + if (clusterList != null) { + clusterList = clusterList.trim(); + } + } + + private static void setPropNotification() { + propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE); + } + + private void sendMessage(String topic, String message) { + CambriaPublisher pub = null; + BusPublisher publisher = null; + try { + if ("ueb".equals(propNotificationType)) { + pub = CambriaClientFactory.createSimplePublisher(null, clusterList, topic); + pub.send("pdpReturnMessage", message); + LOGGER.debug("Sending Message to UEB topic: " + topic); + pub.close(); + + } else if ("dmaap".equals(propNotificationType)) { + publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword); + publisher.send("pdpReturnMessage", message); + LOGGER.debug("Sending to Message to DMaaP topic: " + topic); + publisher.close(); + } + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update: ", e); + } + if (pub != null) { + try { + pub.send("pdpReturnMessage", message); + LOGGER.debug("Sending to Message to tpoic" + topic); + pub.close(); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e); + } + } + } + + private String processMessage(String msg) { + LOGGER.debug("notification message: " + msg); + String[] uID = msg.split("=")[1].split("\""); + + String returnTopic = topic + uID[0]; + if (msg.contains("Starting-Topic")) { + return null; + } + return returnTopic; + } + + public static void setUpdate(String update) { + ManualNotificationUpdateThread.update = update; + } + } diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationController.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationController.java index 109d421f8..577d5b347 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationController.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationController.java @@ -59,284 +59,301 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; /** - * NotificationController Checks for the Updated and Removed policies. It - * notifies the Server to send Notifications to the Client. + * NotificationController Checks for the Updated and Removed policies. It notifies the Server to send Notifications to + * the Client. * * @version 0.2 * */ public class NotificationController { - private static final Logger LOGGER = FlexLogger.getLogger(NotificationController.class); - private static Notification record = new Notification(); - private PDPStatus oldStatus = null; - private Removed removed = null; - private Updated updated = null; - private ManualNotificationUpdateThread registerMaunualNotificationRunnable = null; - private Thread manualNotificationThread = null; - private boolean manualThreadStarted = false; - - private static String notificationJSON = null; - private static String propNotificationType = null; - private static String pdpURL = null; - private static Boolean notificationFlag = false; - - public void check(PDPStatus newStatus,Map<String, PolicyDef> policyContainer) { - boolean updated = false; - boolean removed = false; - Notification notification = new Notification(); - HashSet<Removed> removedPolicies = new HashSet<>(); - HashSet<Updated> updatedPolicies = new HashSet<>(); + private static final Logger LOGGER = FlexLogger.getLogger(NotificationController.class); + private static Notification record = new Notification(); + private PDPStatus oldStatus = null; + private Removed removed = null; + private Updated updated = null; + private ManualNotificationUpdateThread registerMaunualNotificationRunnable = null; + private Thread manualNotificationThread = null; + private boolean manualThreadStarted = false; - if (oldStatus == null) { - oldStatus = newStatus; - } - // Debugging purpose only. - LOGGER.debug("old config Status :" + oldStatus.getStatus()); - LOGGER.debug("new config Status :" + newStatus.getStatus()); + private static String notificationJSON = null; + private static String propNotificationType = null; + private static String pdpURL = null; + private static Boolean notificationFlag = false; - // Depending on the above condition taking the Change as an Update. - if (oldStatus.getStatus().toString() != newStatus.getStatus().toString()) { - LOGGER.info("There is an Update to the PDP"); - LOGGER.debug(oldStatus.getLoadedPolicies()); - LOGGER.debug(newStatus.getLoadedPolicies()); - // Check if there is an Update/additions in the policy. - for (PDPPolicy newPolicy : newStatus.getLoadedPolicies()) { - boolean change = true; - for (PDPPolicy oldPolicy : oldStatus.getLoadedPolicies()) { - // Check if there are same policies. - if (oldPolicy.getId().equals(newPolicy.getId())) { - // Check if they have same version. - if (oldPolicy.getVersion().equals(newPolicy.getVersion())) { - change = false; - } - } - } - // if there is a change Send the notifications to the Client. - if (change) { - sendUpdate(newPolicy, policyContainer); - updated = true; - updatedPolicies.add(this.updated); - } - } - // Check if there is any removal of policy. - for (PDPPolicy oldPolicy : oldStatus.getLoadedPolicies()) { - boolean change = true; - for (PDPPolicy newPolicy : newStatus.getLoadedPolicies()) { - // Check if there are same policies. - if (oldPolicy.getId().equals(newPolicy.getId())) { - // Check if they have same version. - if (oldPolicy.getVersion().equals(newPolicy.getVersion())) { - change = false; - } - } - } - // if there is a change Send the notifications to the Client. - if (change) { - sendremove(oldPolicy); - removed = true; - removedPolicies.add(this.removed); - } - } - } - // At the end the oldStatus must be updated with the newStatus. - oldStatus = newStatus; - // Sending Notification to the Server to pass over to the clients - if (updated || removed) { - // Call the Notification Server.. - notification.setRemovedPolicies(removedPolicies); - notification.setLoadedPolicies(updatedPolicies); - notification = setUpdateTypes(updated, removed, notification); - ObjectWriter om = new ObjectMapper().writer(); - try { - notificationJSON = om.writeValueAsString(notification); - LOGGER.info(notificationJSON); - // NotificationServer Method here. - propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE); - pdpURL = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); - if (("ueb".equals(propNotificationType)||"dmaap".equals(propNotificationType)) && !manualThreadStarted) { - LOGGER.debug("Starting Thread to accept UEB or DMAAP notfications."); - this.registerMaunualNotificationRunnable = new ManualNotificationUpdateThread(); - this.manualNotificationThread = new Thread(this.registerMaunualNotificationRunnable); - this.manualNotificationThread.start(); - manualThreadStarted = true; - } - String notificationJSON= null; - notificationFlag = true; - try{ - notificationJSON= record(notification); - }catch(Exception e){ - LOGGER.error(e); - } - NotificationServer.setUpdate(notificationJSON); - ManualNotificationUpdateThread.setUpdate(notificationJSON); - } catch (JsonProcessingException e) { - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage() +e); - } - } - } - - public static void sendNotification(){ - if(notificationFlag){ - try { - NotificationServer.sendNotification(notificationJSON, propNotificationType, pdpURL); - } catch (Exception e) { - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); - } - notificationFlag = false; - } - } - - private void sendremove(PDPPolicy oldPolicy) { - removed = new Removed(); - // Want to know what is removed ? - LOGGER.info("Policy removed: " + oldPolicy.getId()+ " with version number: " + oldPolicy.getVersion()); - removed.setPolicyName(oldPolicy.getId()); - removed.setVersionNo(oldPolicy.getVersion()); - removeFile(oldPolicy); - } + public void check(PDPStatus newStatus, Map<String, PolicyDef> policyContainer) { + boolean updated = false; + boolean removed = false; + Notification notification = new Notification(); + HashSet<Removed> removedPolicies = new HashSet<>(); + HashSet<Updated> updatedPolicies = new HashSet<>(); - private void sendUpdate(PDPPolicy newPolicy,Map<String, PolicyDef> policyContainer) { - updated = new Updated(); - // Want to know what is new ? - LOGGER.info("The new Policy is: " + newPolicy.getId()); - LOGGER.info("The version no. is: " + newPolicy.getVersion()); - updated.setPolicyName(newPolicy.getId()); - updated.setVersionNo(newPolicy.getVersion()); - updated.setUpdateType(UpdateType.NEW); - // If the policy is of Config type then retrieve its matches. - if (newPolicy.getName().contains(".Config_")) { - // Take a Configuration copy to PDP webapps. - final String urlStart = "attributeId=URLID,expression"; - final String urlEnd = "}}},{"; - String policy = policyContainer.get(newPolicy.getId()).toString(); - if(policy.contains(urlStart)){ - String urlFinePartOne = policy.substring(policy.indexOf(urlStart)+urlStart.length()); - String urlFinePart = urlFinePartOne.substring(0,urlFinePartOne.indexOf(urlEnd)); - String urlString = urlFinePart.substring(urlFinePart.indexOf("value=$URL")+6); - callPap(urlString, "Config"); - } - Iterator<AnyOf> anyOfs = policyContainer.get(newPolicy.getId()).getTarget().getAnyOfs(); - while (anyOfs.hasNext()) { - AnyOf anyOf = anyOfs.next(); - Iterator<AllOf> allOfs = anyOf.getAllOfs(); - while (allOfs.hasNext()) { - AllOf allOf = allOfs.next(); - Iterator<Match> matches = allOf.getMatches(); - HashMap<String, String> matchValues = new HashMap<>(); - while (matches.hasNext()) { - Match match = matches.next(); - LOGGER.info("Attribute Value is: "+ match.getAttributeValue().getValue().toString()); - String[] result = match.getAttributeRetrievalBase().toString().split("attributeId="); - result[1] = result[1].replaceAll("}", ""); - if (!result[1].equals("urn:oasis:names:tc:xacml:1.0:subject:subject-id")) { - LOGGER.info("Attribute id is: " + result[1]); - } - matchValues.put(result[1], match.getAttributeValue().getValue().toString()); - LOGGER.info("Match is : "+ result[1]+ " , " + match.getAttributeValue().getValue().toString()); - } - updated.setMatches(matchValues); - } - } - }else if(newPolicy.getName().contains(".Action_")){ - // Take Configuration copy to PDP Webapps. - // Action policies have .json as extension. - String urlString = "$URL/Action/" + newPolicy.getId().substring(0, newPolicy.getId().lastIndexOf(".")) + ".json"; - callPap(urlString, "Action"); - } - } + if (oldStatus == null) { + oldStatus = newStatus; + } + // Debugging purpose only. + LOGGER.debug("old config Status :" + oldStatus.getStatus()); + LOGGER.debug("new config Status :" + newStatus.getStatus()); + + // Depending on the above condition taking the Change as an Update. + if (oldStatus.getStatus().toString() != newStatus.getStatus().toString()) { + LOGGER.info("There is an Update to the PDP"); + LOGGER.debug(oldStatus.getLoadedPolicies()); + LOGGER.debug(newStatus.getLoadedPolicies()); + // Check if there is an Update/additions in the policy. + for (PDPPolicy newPolicy : newStatus.getLoadedPolicies()) { + boolean change = true; + for (PDPPolicy oldPolicy : oldStatus.getLoadedPolicies()) { + // Check if there are same policies. + if (oldPolicy.getId().equals(newPolicy.getId())) { + // Check if they have same version. + if (oldPolicy.getVersion().equals(newPolicy.getVersion())) { + change = false; + } + } + } + // if there is a change Send the notifications to the Client. + if (change) { + sendUpdate(newPolicy, policyContainer); + updated = true; + updatedPolicies.add(this.updated); + } + } + // Check if there is any removal of policy. + for (PDPPolicy oldPolicy : oldStatus.getLoadedPolicies()) { + boolean change = true; + for (PDPPolicy newPolicy : newStatus.getLoadedPolicies()) { + // Check if there are same policies. + if (oldPolicy.getId().equals(newPolicy.getId())) { + // Check if they have same version. + if (oldPolicy.getVersion().equals(newPolicy.getVersion())) { + change = false; + } + } + } + // if there is a change Send the notifications to the Client. + if (change) { + sendremove(oldPolicy); + removed = true; + removedPolicies.add(this.removed); + } + } + } + // At the end the oldStatus must be updated with the newStatus. + oldStatus = newStatus; + // Sending Notification to the Server to pass over to the clients + if (updated || removed) { + // Call the Notification Server.. + notification.setRemovedPolicies(removedPolicies); + notification.setLoadedPolicies(updatedPolicies); + notification = setUpdateTypes(updated, removed, notification); + ObjectWriter om = new ObjectMapper().writer(); + try { + setNotificationJSON(om.writeValueAsString(notification)); + LOGGER.info(notificationJSON); + // NotificationServer Method here. + setPropNotification(); + if (("ueb".equals(propNotificationType) || "dmaap".equals(propNotificationType)) + && !manualThreadStarted) { + LOGGER.debug("Starting Thread to accept UEB or DMAAP notfications."); + this.registerMaunualNotificationRunnable = new ManualNotificationUpdateThread(); + this.manualNotificationThread = new Thread(this.registerMaunualNotificationRunnable); + this.manualNotificationThread.start(); + manualThreadStarted = true; + } + String notificationJSONString = null; + setNotificationFlag(true); + try { + notificationJSONString = record(notification); + } catch (Exception e) { + LOGGER.error(e); + } + NotificationServer.setUpdate(notificationJSONString); + ManualNotificationUpdateThread.setUpdate(notificationJSONString); + } catch (JsonProcessingException e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage() + e); + } + } + } + + private void setNotificationFlag(boolean value) { + notificationFlag = value; + } + + private static void setNotificationJSON(String message) { + notificationJSON = message; + } + + private static void setPropNotification() { + propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE); + pdpURL = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); + } + + public static void sendNotification() { + if (notificationFlag) { + try { + NotificationServer.sendNotification(notificationJSON, propNotificationType, pdpURL); + } catch (Exception e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: " + + e.getMessage() + e); + } + notificationFlag = false; + } + } + + private void sendremove(PDPPolicy oldPolicy) { + removed = new Removed(); + // Want to know what is removed ? + LOGGER.info("Policy removed: " + oldPolicy.getId() + " with version number: " + oldPolicy.getVersion()); + removed.setPolicyName(oldPolicy.getId()); + removed.setVersionNo(oldPolicy.getVersion()); + removeFile(oldPolicy); + } + + private void sendUpdate(PDPPolicy newPolicy, Map<String, PolicyDef> policyContainer) { + updated = new Updated(); + // Want to know what is new ? + LOGGER.info("The new Policy is: " + newPolicy.getId()); + LOGGER.info("The version no. is: " + newPolicy.getVersion()); + updated.setPolicyName(newPolicy.getId()); + updated.setVersionNo(newPolicy.getVersion()); + updated.setUpdateType(UpdateType.NEW); + // If the policy is of Config type then retrieve its matches. + if (newPolicy.getName().contains(".Config_")) { + // Take a Configuration copy to PDP webapps. + final String urlStart = "attributeId=URLID,expression"; + final String urlEnd = "}}},{"; + String policy = policyContainer.get(newPolicy.getId()).toString(); + if (policy.contains(urlStart)) { + String urlFinePartOne = policy.substring(policy.indexOf(urlStart) + urlStart.length()); + String urlFinePart = urlFinePartOne.substring(0, urlFinePartOne.indexOf(urlEnd)); + String urlString = urlFinePart.substring(urlFinePart.indexOf("value=$URL") + 6); + callPap(urlString, "Config"); + } + Iterator<AnyOf> anyOfs = policyContainer.get(newPolicy.getId()).getTarget().getAnyOfs(); + while (anyOfs.hasNext()) { + AnyOf anyOf = anyOfs.next(); + Iterator<AllOf> allOfs = anyOf.getAllOfs(); + while (allOfs.hasNext()) { + AllOf allOf = allOfs.next(); + Iterator<Match> matches = allOf.getMatches(); + HashMap<String, String> matchValues = new HashMap<>(); + while (matches.hasNext()) { + Match match = matches.next(); + LOGGER.info("Attribute Value is: " + match.getAttributeValue().getValue().toString()); + String[] result = match.getAttributeRetrievalBase().toString().split("attributeId="); + result[1] = result[1].replaceAll("}", ""); + if (!result[1].equals("urn:oasis:names:tc:xacml:1.0:subject:subject-id")) { + LOGGER.info("Attribute id is: " + result[1]); + } + matchValues.put(result[1], match.getAttributeValue().getValue().toString()); + LOGGER.info( + "Match is : " + result[1] + " , " + match.getAttributeValue().getValue().toString()); + } + updated.setMatches(matchValues); + } + } + } else if (newPolicy.getName().contains(".Action_")) { + // Take Configuration copy to PDP Webapps. + // Action policies have .json as extension. + String urlString = "$URL/Action/" + newPolicy.getId().substring(0, newPolicy.getId().lastIndexOf(".")) + + ".json"; + callPap(urlString, "Action"); + } + } + + // Adding this for Recording the changes to serve Polling requests.. + private static String record(Notification notification) { + // Initialization with updates. + if (record.getRemovedPolicies() == null || record.getLoadedPolicies() == null) { + record.setRemovedPolicies(notification.getRemovedPolicies()); + record.setLoadedPolicies(notification.getLoadedPolicies()); + } else { + // Check if there is anything new and update the record.. + if (record.getLoadedPolicies() != null || record.getRemovedPolicies() != null) { + HashSet<Removed> removedPolicies = (HashSet<Removed>) record.getRemovedPolicies(); + HashSet<Updated> updatedPolicies = (HashSet<Updated>) record.getLoadedPolicies(); - // Adding this for Recording the changes to serve Polling requests.. - private static String record(Notification notification) throws Exception { - // Initialization with updates. - if (record.getRemovedPolicies() == null || record.getLoadedPolicies() == null) { - record.setRemovedPolicies(notification.getRemovedPolicies()); - record.setLoadedPolicies(notification.getLoadedPolicies()); - } else { - // Check if there is anything new and update the record.. - if (record.getLoadedPolicies() != null || record.getRemovedPolicies() != null) { - HashSet<Removed> removedPolicies = (HashSet<Removed>) record.getRemovedPolicies(); - HashSet<Updated> updatedPolicies = (HashSet<Updated>) record.getLoadedPolicies(); + // Checking with New updated policies. + if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) { + for (Updated newUpdatedPolicy : notification.getLoadedPolicies()) { + // If it was removed earlier then we need to remove from our record + Iterator<Removed> oldRemovedPolicy = removedPolicies.iterator(); + while (oldRemovedPolicy.hasNext()) { + Removed policy = oldRemovedPolicy.next(); + if (newUpdatedPolicy.getPolicyName().equals(policy.getPolicyName())) { + if (newUpdatedPolicy.getVersionNo().equals(policy.getVersionNo())) { + oldRemovedPolicy.remove(); + } + } + } + // If it was previously updated need to Overwrite it to the record. + Iterator<Updated> oldUpdatedPolicy = updatedPolicies.iterator(); + while (oldUpdatedPolicy.hasNext()) { + Updated policy = oldUpdatedPolicy.next(); + if (newUpdatedPolicy.getPolicyName().equals(policy.getPolicyName())) { + if (newUpdatedPolicy.getVersionNo().equals(policy.getVersionNo())) { + oldUpdatedPolicy.remove(); + } + } + } + updatedPolicies.add(newUpdatedPolicy); + } + } + // Checking with New Removed policies. + if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) { + for (Removed newRemovedPolicy : notification.getRemovedPolicies()) { + // If it was previously removed Overwrite it to the record. + Iterator<Removed> oldRemovedPolicy = removedPolicies.iterator(); + while (oldRemovedPolicy.hasNext()) { + Removed policy = oldRemovedPolicy.next(); + if (newRemovedPolicy.getPolicyName().equals(policy.getPolicyName())) { + if (newRemovedPolicy.getVersionNo().equals(policy.getVersionNo())) { + oldRemovedPolicy.remove(); + } + } + } + // If it was added earlier then we need to remove from our record. + Iterator<Updated> oldUpdatedPolicy = updatedPolicies.iterator(); + while (oldUpdatedPolicy.hasNext()) { + Updated policy = oldUpdatedPolicy.next(); + if (newRemovedPolicy.getPolicyName().equals(policy.getPolicyName())) { + if (newRemovedPolicy.getVersionNo().equals(policy.getVersionNo())) { + oldUpdatedPolicy.remove(); + } + } + } + removedPolicies.add(newRemovedPolicy); + } + } + record.setRemovedPolicies(removedPolicies); + record.setLoadedPolicies(updatedPolicies); + } + } + // Send the Result to the caller. + ObjectWriter om = new ObjectMapper().writer(); + String json = null; + try { + json = om.writeValueAsString(record); + } catch (JsonProcessingException e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage() + e); + } + LOGGER.info(json); + return json; + } - // Checking with New updated policies. - if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) { - for (Updated newUpdatedPolicy : notification.getLoadedPolicies()) { - // If it was removed earlier then we need to remove from our record - Iterator<Removed> oldRemovedPolicy = removedPolicies.iterator(); - while (oldRemovedPolicy.hasNext()) { - Removed policy = oldRemovedPolicy.next(); - if (newUpdatedPolicy.getPolicyName().equals(policy.getPolicyName())) { - if (newUpdatedPolicy.getVersionNo().equals(policy.getVersionNo())) { - oldRemovedPolicy.remove(); - } - } - } - // If it was previously updated need to Overwrite it to the record. - Iterator<Updated> oldUpdatedPolicy = updatedPolicies.iterator(); - while (oldUpdatedPolicy.hasNext()) { - Updated policy = oldUpdatedPolicy.next(); - if (newUpdatedPolicy.getPolicyName().equals(policy.getPolicyName())) { - if (newUpdatedPolicy.getVersionNo().equals(policy.getVersionNo())) { - oldUpdatedPolicy.remove(); - } - } - } - updatedPolicies.add(newUpdatedPolicy); - } - } - // Checking with New Removed policies. - if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) { - for (Removed newRemovedPolicy : notification.getRemovedPolicies()) { - // If it was previously removed Overwrite it to the record. - Iterator<Removed> oldRemovedPolicy = removedPolicies.iterator(); - while (oldRemovedPolicy.hasNext()) { - Removed policy = oldRemovedPolicy.next(); - if (newRemovedPolicy.getPolicyName().equals(policy.getPolicyName())) { - if (newRemovedPolicy.getVersionNo().equals(policy.getVersionNo())) { - oldRemovedPolicy.remove(); - } - } - } - // If it was added earlier then we need to remove from our record. - Iterator<Updated> oldUpdatedPolicy = updatedPolicies.iterator(); - while (oldUpdatedPolicy.hasNext()) { - Updated policy = oldUpdatedPolicy.next(); - if (newRemovedPolicy.getPolicyName().equals(policy.getPolicyName())) { - if (newRemovedPolicy.getVersionNo().equals(policy.getVersionNo())) { - oldUpdatedPolicy.remove(); - } - } - } - removedPolicies.add(newRemovedPolicy); - } - } - record.setRemovedPolicies(removedPolicies); - record.setLoadedPolicies(updatedPolicies); - } - } - // Send the Result to the caller. - ObjectWriter om = new ObjectMapper().writer(); - String json = null; - try { - json = om.writeValueAsString(record); - } catch (JsonProcessingException e) { - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage() + e); - } - LOGGER.info(json); - return json; - } - - private static Notification setUpdateTypes(boolean updated, boolean removed, Notification notification) { - if(notification!=null){ - if(updated && removed){ + private static Notification setUpdateTypes(boolean updated, boolean removed, Notification notification) { + if (notification != null) { + if (updated && removed) { notification.setNotificationType(NotificationType.BOTH); - if(notification.getLoadedPolicies()!=null){ - HashSet<Updated> updatedPolicies = new HashSet<>(); - for(Updated oldUpdatedPolicy: notification.getLoadedPolicies()){ + if (notification.getLoadedPolicies() != null) { + HashSet<Updated> updatedPolicies = new HashSet<>(); + for (Updated oldUpdatedPolicy : notification.getLoadedPolicies()) { Updated updatePolicy = oldUpdatedPolicy; - if(notification.getRemovedPolicies()!=null){ - for(RemovedPolicy removedPolicy: notification.getRemovedPolicies()){ + if (notification.getRemovedPolicies() != null) { + for (RemovedPolicy removedPolicy : notification.getRemovedPolicies()) { String regex = ".(\\d)*.xml"; - if(removedPolicy.getPolicyName().replaceAll(regex, "").equals(oldUpdatedPolicy.getPolicyName().replaceAll(regex, ""))){ + if (removedPolicy.getPolicyName().replaceAll(regex, "") + .equals(oldUpdatedPolicy.getPolicyName().replaceAll(regex, ""))) { updatePolicy.setUpdateType(UpdateType.UPDATE); break; } @@ -346,70 +363,76 @@ public class NotificationController { } notification.setLoadedPolicies(updatedPolicies); } - }else if(updated){ + } else if (updated) { notification.setNotificationType(NotificationType.UPDATE); - }else if(removed){ + } else if (removed) { notification.setNotificationType(NotificationType.REMOVE); } } return notification; } - - private void removeFile(PDPPolicy oldPolicy) { - try{ - Path removedPolicyFile = Paths.get(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_CONFIG)+File.separator+oldPolicy.getId()); - Files.deleteIfExists(removedPolicyFile); - boolean delete=false; - File dir= null; - if(oldPolicy.getName().contains(".Config_")){ - delete = true; - dir = new File(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_WEBAPPS)+File.separator+"Config"); - }else if(oldPolicy.getName().contains(".Action_")){ - delete = true; - dir = new File(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_WEBAPPS)+File.separator+"Action"); - } - if(delete){ - FileFilter fileFilter = new WildcardFileFilter(oldPolicy.getId().substring(0, oldPolicy.getId().lastIndexOf("."))+".*"); - File[] configFile = dir.listFiles(fileFilter); - if(configFile.length==1){ - Files.deleteIfExists(configFile[0].toPath()); - } - } - }catch(Exception e){ - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Couldn't remove the policy/config file " + oldPolicy.getName() + e); - } - } - - private void callPap(String urlString, String type) { - Path configLocation = Paths.get(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_WEBAPPS)+File.separator+type); - if(Files.notExists(configLocation)){ - try { - Files.createDirectories(configLocation); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"Failed to create config directory: " + configLocation.toAbsolutePath().toString(), e); - } - } - PapUrlResolver papUrls = PapUrlResolver.getInstance(); - while(papUrls.hasMoreUrls()){ - String papPath = papUrls.getUrl(); - papPath = papPath.substring(0, papPath.lastIndexOf("/pap")); - String papAddress= urlString.replace("$URL", papPath); - String fileName = papAddress.substring(papAddress.lastIndexOf("/")+1); - String fileLocation = configLocation.toString() + File.separator + fileName; - try { - URL papURL = new URL(papAddress); - LOGGER.info("Calling " +papAddress + " for Configuration Copy."); - URLConnection urlConnection = papURL.openConnection(); - File file= new File(fileLocation); - try (InputStream is = urlConnection.getInputStream(); - OutputStream os = new FileOutputStream(file)) { - IOUtils.copy(is, os); - break; - } - } catch (Exception e) { - LOGGER.error(e + e.getMessage()); - } - papUrls.getNext(); - } - } + + private void removeFile(PDPPolicy oldPolicy) { + try { + Path removedPolicyFile = Paths.get(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_CONFIG) + + File.separator + oldPolicy.getId()); + Files.deleteIfExists(removedPolicyFile); + boolean delete = false; + File dir = null; + if (oldPolicy.getName().contains(".Config_")) { + delete = true; + dir = new File( + XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_WEBAPPS) + File.separator + "Config"); + } else if (oldPolicy.getName().contains(".Action_")) { + delete = true; + dir = new File( + XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_WEBAPPS) + File.separator + "Action"); + } + if (delete) { + FileFilter fileFilter = new WildcardFileFilter( + oldPolicy.getId().substring(0, oldPolicy.getId().lastIndexOf(".")) + ".*"); + File[] configFile = dir.listFiles(fileFilter); + if (configFile.length == 1) { + Files.deleteIfExists(configFile[0].toPath()); + } + } + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Couldn't remove the policy/config file " + + oldPolicy.getName() + e); + } + } + + private void callPap(String urlString, String type) { + Path configLocation = Paths + .get(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_WEBAPPS) + File.separator + type); + if (Files.notExists(configLocation)) { + try { + Files.createDirectories(configLocation); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create config directory: " + + configLocation.toAbsolutePath().toString(), e); + } + } + PapUrlResolver papUrls = PapUrlResolver.getInstance(); + while (papUrls.hasMoreUrls()) { + String papPath = papUrls.getUrl(); + papPath = papPath.substring(0, papPath.lastIndexOf("/pap")); + String papAddress = urlString.replace("$URL", papPath); + String fileName = papAddress.substring(papAddress.lastIndexOf("/") + 1); + String fileLocation = configLocation.toString() + File.separator + fileName; + try { + URL papURL = new URL(papAddress); + LOGGER.info("Calling " + papAddress + " for Configuration Copy."); + URLConnection urlConnection = papURL.openConnection(); + File file = new File(fileLocation); + try (InputStream is = urlConnection.getInputStream(); OutputStream os = new FileOutputStream(file)) { + IOUtils.copy(is, os); + break; + } + } catch (Exception e) { + LOGGER.error(e + e.getMessage()); + } + papUrls.getNext(); + } + } } diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java index 690d8c517..2f3d58203 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java @@ -38,6 +38,7 @@ import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; +import org.onap.policy.api.PolicyEngineException; import org.onap.policy.common.logging.eelf.MessageCodes; import org.onap.policy.common.logging.eelf.PolicyLogger; import org.onap.policy.common.logging.flexlogger.FlexLogger; @@ -66,8 +67,6 @@ public class NotificationServer { private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); private static Queue<Session> queue = new ConcurrentLinkedQueue<>(); private static String update = null; - private static String hosts = null; - private static URL aURL = null; @OnOpen public void openConnection(Session session) { @@ -88,7 +87,7 @@ public class NotificationServer { } @OnMessage - public void Message(String message, Session session) { + public void message(String message, Session session) { if(message.equalsIgnoreCase("Manual")) { try { @@ -101,14 +100,14 @@ public class NotificationServer { } } - public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception { + public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException { LOGGER.debug("Notification set to " + propNotificationType); if (propNotificationType.equals("ueb")){ String topic = null; try { - aURL = new URL(pdpURL); + URL aURL = new URL(pdpURL); topic = aURL.getHost() + aURL.getPort(); } catch (MalformedURLException e1) { pdpURL = pdpURL.replace("/", ""); @@ -116,7 +115,7 @@ public class NotificationServer { LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); } - hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY); String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET); @@ -125,13 +124,13 @@ public class NotificationServer { try { if(hosts==null || topic==null || apiKey==null || apiSecret==null){ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); - throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); } - hosts.trim(); - topic.trim(); - apiKey.trim(); - apiSecret.trim(); + hosts = hosts.trim(); + topic = topic.trim(); + apiKey = apiKey.trim(); + apiSecret = apiSecret.trim(); pub = new CambriaClientBuilders.PublisherBuilder () .usingHosts ( hosts ) .onTopic ( topic ) @@ -175,13 +174,13 @@ public class NotificationServer { try { if(dmaapServers==null || topic==null){ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); } - dmaapServers.trim(); - topic.trim(); - aafLogin.trim(); - aafPassword.trim(); + dmaapServers= dmaapServers.trim(); + topic= topic.trim(); + aafLogin= aafLogin.trim(); + aafPassword= aafPassword.trim(); List<String> dmaapList = null; if(dmaapServers.contains(",")) { |