diff options
author | ITSERVICES\rb7147 <rb7147@att.com> | 2017-04-25 11:46:00 -0400 |
---|---|---|
committer | ITSERVICES\rb7147 <rb7147@att.com> | 2017-05-03 09:58:17 -0400 |
commit | e0addf5b588a1244f9679becd90999dfcb4c3a94 (patch) | |
tree | 1212772d6366730266ff0e093c874b07aa716c29 /ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications | |
parent | 39fb0f30472777e4b60d6a7ac8aa4eb9773961ff (diff) |
Policy 1707 commit to LF
Change-Id: Ibe6f01d92f9a434c040abb05d5386e89d675ae65
Signed-off-by: ITSERVICES\rb7147 <rb7147@att.com>
Diffstat (limited to 'ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications')
6 files changed, 349 insertions, 122 deletions
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java index 6b8857273..5f62be3ac 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java @@ -24,28 +24,36 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.security.GeneralSecurityException; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.UUID; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; import org.openecomp.policy.rest.XACMLRestProperties; +import org.openecomp.policy.utils.BusConsumer; +import org.openecomp.policy.utils.BusPublisher; +import org.openecomp.policy.xacml.api.XACMLErrorConstants; import com.att.nsa.cambria.client.CambriaClientFactory; import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.cambria.client.CambriaPublisher; -import org.openecomp.policy.xacml.api.XACMLErrorConstants; import com.att.research.xacml.util.XACMLProperties; -import org.openecomp.policy.common.logging.flexlogger.*; - public class ManualNotificationUpdateThread implements Runnable { - private static final Logger logger = FlexLogger.getLogger(ManualNotificationUpdateThread.class); -// private static List<String> uebURLList = null; + + private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class); + private static String topic = null; private static CambriaConsumer CConsumer = null; -// private static Collection<String> clusterList = null; private static String clusterList = null; -// private Collection<String> urlList = null; private static String update = null; + private static BusConsumer dmaapConsumer = null; + private static List<String> dmaapList = null; + private static String propNotificationType = null; + private static String aafLogin = null; + private static String aafPassword = null; public volatile boolean isRunning = false; @@ -67,77 +75,145 @@ public class ManualNotificationUpdateThread implements Runnable { synchronized(this) { this.isRunning = true; } + URL aURL = null; String group = UUID.randomUUID ().toString (); String id = "0"; String returnTopic = null; - try { - ManualNotificationUpdateThread.clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER); - String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); - aURL = new URL(url); - topic = aURL.getHost() + aURL.getPort(); - } catch (NumberFormatException e) { - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e); - this.isRunning = false; - } catch (MalformedURLException e) { - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e); - } - String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest"; - SendMessage(consumerTopic, "Starting-Topic"); - final LinkedList<String> urlList = new LinkedList<String> (); - for ( String u : clusterList.split ( "," ) ){ - urlList.add ( u ); - } - - try { - CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 ); - } catch (MalformedURLException | GeneralSecurityException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - - - while (this.isRunning()) { - logger.debug("While loop test _ take out "); + propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE); + if ("ueb".equals(propNotificationType)){ + try { + clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim(); + String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); + aURL = new URL(url); + topic = aURL.getHost() + aURL.getPort(); + } catch (NumberFormatException e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e); + this.isRunning = false; + } catch (MalformedURLException e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e); + } + + String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest"; + SendMessage(consumerTopic, "Starting-Topic"); + final LinkedList<String> urlList = new LinkedList<String> (); + for ( String u : clusterList.split ( "," ) ){ + urlList.add ( u ); + } + try { - for ( String msg : CConsumer.fetch () ){ - logger.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : "); - returnTopic = processMessage(msg); - if(returnTopic != null){ - SendMessage(returnTopic, update); + CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 ); + } catch (MalformedURLException | GeneralSecurityException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1); + } + + while (this.isRunning()) { + LOGGER.debug("While loop test _ take out "); + try { + for ( String msg : CConsumer.fetch () ){ + LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : "); + returnTopic = processMessage(msg); + if(returnTopic != null){ + SendMessage(returnTopic, update); + } } + } catch (IOException e) { + LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message"); } - } catch (IOException e) { - logger.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message"); } - } - logger.debug("Stopping UEB Consuer loop will not logger fetch messages from the cluser"); + LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster"); + } else if ("dmaap".equals(propNotificationType)) { + String dmaapServers = null; + try { + dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); + aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e); + this.isRunning = false; + } + + if(dmaapServers==null || topic==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + try { + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + dmaapServers.trim(); + topic.trim(); + aafLogin.trim(); + aafPassword.trim(); + + String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim(); + SendMessage(consumerTopic, "Starting-Topic"); + dmaapList = new ArrayList<String>(); + for ( String u : dmaapServers.split ( "," ) ){ + dmaapList.add ( u ); + } + + try { + + dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000); + } catch (Exception e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1); + } + while (this.isRunning()) { + LOGGER.debug("While loop test _ take out "); + try { + for ( String msg : dmaapConsumer.fetch () ){ + LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : "); + returnTopic = processMessage(msg); + if(returnTopic != null){ + SendMessage(returnTopic, update); + } + } + }catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); } + } + LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers"); } + } private void SendMessage( String topic, String message) { CambriaPublisher pub = null; + BusPublisher publisher = null; try { - pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic ); - } catch (MalformedURLException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } catch (GeneralSecurityException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); + if ("ueb".equals(propNotificationType)) { + pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic ); + pub.send( "pdpReturnMessage", message ); + LOGGER.debug("Sending Message to UEB topic: " + topic); + pub.close(); + + } else if ("dmaap".equals(propNotificationType)){ + publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword); + publisher.send( "pdpReturnMessage", message ); + LOGGER.debug("Sending to Message to DMaaP topic: " + topic); + publisher.close(); + } + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e); } + try { pub.send( "pdpReturnMessage", message ); - logger.debug("Sending to Message to tpoic" + topic); + LOGGER.debug("Sending to Message to tpoic" + topic); } catch (IOException e) { - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update"); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update"); } pub.close(); } private String processMessage(String msg) { - logger.debug("notification message: " + msg); + LOGGER.debug("notification message: " + msg); String[] UID = msg.split("=")[1].split("\""); + String returnTopic = topic + UID[0]; if(msg.contains("Starting-Topic")){ return null; diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java index 5ab165b0a..c9b510ec3 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Notification.java @@ -22,16 +22,19 @@ package org.openecomp.policy.pdp.rest.notifications; import java.util.Collection; +import org.openecomp.policy.api.NotificationType; + /** * Notification is the POJO which will be used to send the Notifications to the Server. * Notification must contain the Removal and Updated policies. * - * @version 0.1 + * @version 0.2 * */ public class Notification { private Collection<Removed> removedPolicies = null; private Collection<Updated> loadedPolicies = null; + private NotificationType notificationType= null; public Collection<Removed> getRemovedPolicies() { return removedPolicies; @@ -49,4 +52,11 @@ public class Notification { this.loadedPolicies = loadedPolicies; } + public NotificationType getNotificationType() { + return notificationType; + } + + public void setNotificationType(NotificationType notificationType){ + this.notificationType= notificationType; + } } diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java index 1dfd07422..14d7aa165 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationController.java @@ -39,12 +39,15 @@ import java.util.Iterator; import org.apache.commons.io.IOUtils; import org.apache.commons.io.filefilter.WildcardFileFilter; -import org.openecomp.policy.pdp.rest.PapUrlResolver; -import org.openecomp.policy.rest.XACMLRestProperties; +import org.openecomp.policy.api.NotificationType; +import org.openecomp.policy.api.RemovedPolicy; +import org.openecomp.policy.api.UpdateType; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; - +import org.openecomp.policy.pdp.rest.PapUrlResolver; +import org.openecomp.policy.rest.XACMLRestProperties; import org.openecomp.policy.xacml.api.XACMLErrorConstants; + import com.att.research.xacml.api.pap.PDPPolicy; import com.att.research.xacml.api.pap.PDPStatus; import com.att.research.xacml.util.XACMLProperties; @@ -64,7 +67,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; * */ public class NotificationController { - private static final Logger logger = FlexLogger.getLogger(NotificationController.class); + private static final Logger LOGGER = FlexLogger.getLogger(NotificationController.class); private static Notification record = new Notification(); private PDPStatus oldStatus = null; private Removed removed = null; @@ -89,14 +92,14 @@ public class NotificationController { oldStatus = newStatus; } // Debugging purpose only. - logger.debug("old config Status :" + oldStatus.getStatus()); - logger.debug("new config Status :" + newStatus.getStatus()); + LOGGER.debug("old config Status :" + oldStatus.getStatus()); + LOGGER.debug("new config Status :" + newStatus.getStatus()); // Depending on the above condition taking the Change as an Update. if (oldStatus.getStatus().toString() != newStatus.getStatus().toString()) { - logger.info("There is an Update to the PDP"); - logger.debug(oldStatus.getLoadedPolicies()); - logger.debug(newStatus.getLoadedPolicies()); + LOGGER.info("There is an Update to the PDP"); + LOGGER.debug(oldStatus.getLoadedPolicies()); + LOGGER.debug(newStatus.getLoadedPolicies()); // Check if there is an Update/additions in the policy. for (PDPPolicy newPolicy : newStatus.getLoadedPolicies()) { boolean change = true; @@ -143,15 +146,16 @@ public class NotificationController { // Call the Notification Server.. notification.setRemovedPolicies(removedPolicies); notification.setLoadedPolicies(updatedPolicies); + notification = setUpdateTypes(updated, removed, notification); ObjectWriter om = new ObjectMapper().writer(); try { notificationJSON = om.writeValueAsString(notification); - logger.info(notificationJSON); + LOGGER.info(notificationJSON); // NotificationServer Method here. propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE); pdpURL = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID); - if (propNotificationType!=null && propNotificationType.equals("ueb") && !manualThreadStarted) { - logger.debug("Starting Thread to accept UEB notfications."); + if (("ueb".equals(propNotificationType)||"dmaap".equals(propNotificationType)) && !manualThreadStarted) { + LOGGER.debug("Starting Thread to accept UEB or DMAAP notfications."); this.registerMaunualNotificationRunnable = new ManualNotificationUpdateThread(); this.manualNotificationThread = new Thread(this.registerMaunualNotificationRunnable); this.manualNotificationThread.start(); @@ -162,15 +166,15 @@ public class NotificationController { try{ notificationJSON= record(notification); }catch(Exception e){ - logger.error(e); - // TODO:EELF Cleanup - Remove logger + LOGGER.error(e); + // TODO:EELF Cleanup - Remove LOGGER //PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, ""); } NotificationServer.setUpdate(notificationJSON); ManualNotificationUpdateThread.setUpdate(notificationJSON); } catch (JsonProcessingException e) { - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage()); - // TODO:EELF Cleanup - Remove logger + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage()); + // TODO:EELF Cleanup - Remove LOGGER //PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, ""); } } @@ -178,7 +182,12 @@ public class NotificationController { public static void sendNotification(){ if(notificationFlag){ - NotificationServer.sendNotification(notificationJSON, propNotificationType, pdpURL); + try { + NotificationServer.sendNotification(notificationJSON, propNotificationType, pdpURL); + } catch (Exception e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); + e.printStackTrace(); + } notificationFlag = false; } } @@ -186,9 +195,9 @@ public class NotificationController { private void sendremove(PDPPolicy oldPolicy) { removed = new Removed(); // Want to know what is removed ? - // logger.info("The Policy removed is: " + oldPolicy.getId()); - // logger.info("The version no. is: " + oldPolicy.getVersion()); - logger.info("Policy removed: " + oldPolicy.getId()+ " with version number: " + oldPolicy.getVersion()); + // LOGGER.info("The Policy removed is: " + oldPolicy.getId()); + // LOGGER.info("The version no. is: " + oldPolicy.getVersion()); + LOGGER.info("Policy removed: " + oldPolicy.getId()+ " with version number: " + oldPolicy.getVersion()); removed.setPolicyName(oldPolicy.getId()); removed.setVersionNo(oldPolicy.getVersion()); removeFile(oldPolicy); @@ -197,12 +206,13 @@ public class NotificationController { private void sendUpdate(PDPPolicy newPolicy,HashMap<String, PolicyDef> policyContainer) { updated = new Updated(); // Want to know what is new ? - logger.info("The new Policy is: " + newPolicy.getId()); - logger.info("The version no. is: " + newPolicy.getVersion()); + LOGGER.info("The new Policy is: " + newPolicy.getId()); + LOGGER.info("The version no. is: " + newPolicy.getVersion()); updated.setPolicyName(newPolicy.getId()); updated.setVersionNo(newPolicy.getVersion()); + updated.setUpdateType(UpdateType.NEW); // If the policy is of Config type then retrieve its matches. - if (newPolicy.getName().startsWith("Config")) { + if (newPolicy.getName().contains(".Config_")) { // Take a Configuration copy to PDP webapps. final String urlStart = "attributeId=URLID,expression"; final String urlEnd = "}}},{"; @@ -223,19 +233,19 @@ public class NotificationController { HashMap<String, String> matchValues = new HashMap<String, String>(); while (matches.hasNext()) { Match match = matches.next(); - logger.info("Attribute Value is: "+ match.getAttributeValue().getValue().toString()); + LOGGER.info("Attribute Value is: "+ match.getAttributeValue().getValue().toString()); String[] result = match.getAttributeRetrievalBase().toString().split("attributeId="); result[1] = result[1].replaceAll("}", ""); if (!result[1].equals("urn:oasis:names:tc:xacml:1.0:subject:subject-id")) { - logger.info("Attribute id is: " + result[1]); + LOGGER.info("Attribute id is: " + result[1]); } matchValues.put(result[1], match.getAttributeValue().getValue().toString()); - logger.info("Match is : "+ result[1]+ " , " + match.getAttributeValue().getValue().toString()); + LOGGER.info("Match is : "+ result[1]+ " , " + match.getAttributeValue().getValue().toString()); } updated.setMatches(matchValues); } } - }else if(newPolicy.getName().startsWith("Action")){ + }else if(newPolicy.getName().contains(".Action_")){ // Take Configuration copy to PDP Webapps. // Action policies have .json as extension. String urlString = "$URL/Action/" + newPolicy.getId().substring(0, newPolicy.getId().lastIndexOf(".")) + ".json"; @@ -244,7 +254,7 @@ public class NotificationController { } // Adding this for Recording the changes to serve Polling requests.. - public static String record(Notification notification) throws Exception { + private static String record(Notification notification) throws Exception { // Initialization with updates. if (record.getRemovedPolicies() == null || record.getLoadedPolicies() == null) { record.setRemovedPolicies(notification.getRemovedPolicies()); @@ -317,14 +327,44 @@ public class NotificationController { try { json = om.writeValueAsString(record); } catch (JsonProcessingException e) { - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage()); - // TODO:EELF Cleanup - Remove logger + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e.getMessage()); + // TODO:EELF Cleanup - Remove LOGGER //PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, ""); } - logger.info(json); + LOGGER.info(json); return json; } + private static Notification setUpdateTypes(boolean updated, boolean removed, Notification notification) { + if(notification!=null){ + if(updated && removed){ + notification.setNotificationType(NotificationType.BOTH); + if(notification.getLoadedPolicies()!=null){ + HashSet<Updated> updatedPolicies = new HashSet<Updated>(); + for(Updated oldUpdatedPolicy: notification.getLoadedPolicies()){ + Updated updatePolicy = oldUpdatedPolicy; + if(notification.getRemovedPolicies()!=null){ + for(RemovedPolicy removedPolicy: notification.getRemovedPolicies()){ + String regex = ".(\\d)*.xml"; + if(removedPolicy.getPolicyName().replaceAll(regex, "").equals(oldUpdatedPolicy.getPolicyName().replaceAll(regex, ""))){ + updatePolicy.setUpdateType(UpdateType.UPDATE); + break; + } + } + } + updatedPolicies.add(updatePolicy); + } + notification.setLoadedPolicies(updatedPolicies); + } + }else if(updated){ + notification.setNotificationType(NotificationType.UPDATE); + }else if(removed){ + notification.setNotificationType(NotificationType.REMOVE); + } + } + return notification; + } + private void removeFile(PDPPolicy oldPolicy) { try{ Path removedPolicyFile = Paths.get(XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_CONFIG)+File.separator+oldPolicy.getId()); @@ -346,8 +386,8 @@ public class NotificationController { } } }catch(Exception e){ - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Couldn't remove the policy/config file " + oldPolicy.getName()); - // TODO:EELF Cleanup - Remove logger + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Couldn't remove the policy/config file " + oldPolicy.getName()); + // TODO:EELF Cleanup - Remove LOGGER //PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e, "Couldn't remove the policy file " + oldPolicy.getName()); } } @@ -358,7 +398,7 @@ public class NotificationController { try { Files.createDirectories(configLocation); } catch (IOException e) { - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"Failed to create config directory: " + configLocation.toAbsolutePath().toString(), e); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"Failed to create config directory: " + configLocation.toAbsolutePath().toString(), e); } } PapUrlResolver papUrls = PapUrlResolver.getInstance(); @@ -370,7 +410,7 @@ public class NotificationController { String fileLocation = configLocation.toString() + File.separator + fileName; try { URL papURL = new URL(papAddress); - logger.info("Calling " +papAddress + " for Configuration Copy."); + LOGGER.info("Calling " +papAddress + " for Configuration Copy."); URLConnection urlConnection = papURL.openConnection(); File file= new File(fileLocation); try (InputStream is = urlConnection.getInputStream(); @@ -379,11 +419,11 @@ public class NotificationController { break; } } catch (MalformedURLException e) { - logger.error(e + e.getMessage()); + LOGGER.error(e + e.getMessage()); } catch(FileNotFoundException e){ - logger.error(e + e.getMessage()); + LOGGER.error(e + e.getMessage()); } catch (IOException e) { - logger.error(e + e.getMessage()); + LOGGER.error(e + e.getMessage()); } papUrls.getNext(); } diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java index d6cda7491..fe295ebb8 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java @@ -24,8 +24,12 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import javax.websocket.OnClose; import javax.websocket.OnError; @@ -34,28 +38,31 @@ import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; -import org.openecomp.policy.rest.XACMLRestProperties; import org.openecomp.policy.common.logging.eelf.MessageCodes; import org.openecomp.policy.common.logging.eelf.PolicyLogger; -import com.att.nsa.cambria.client.CambriaClientFactory; -import com.att.nsa.cambria.client.CambriaPublisher; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.rest.XACMLRestProperties; +import org.openecomp.policy.utils.BusPublisher; import org.openecomp.policy.xacml.api.XACMLErrorConstants; -import com.att.research.xacml.util.XACMLProperties; -import org.openecomp.policy.common.logging.flexlogger.*; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.research.xacml.util.XACMLProperties; /** * The NotificationServer sends the Server Notifications to the Clients once there is any Event. * WebSockets is being used as a medium for sending Notifications. - * UEB is being used as a medium for sending Notifications. + * UEB is being used as a medium for sending Notifications. + * DMAAP is being used as a medium for sending Notifications. * - * @version 0.1 + * @version 0.2 * **/ @ServerEndpoint(value = "/notifications") public class NotificationServer { - private static final Logger logger = FlexLogger.getLogger(NotificationServer.class); + private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>(); private static String update = null; private static String hosts = null; @@ -63,7 +70,7 @@ public class NotificationServer { @OnOpen public void openConnection(Session session) { - logger.info("Session Connected: " + session.getId()); + LOGGER.info("Session Connected: " + session.getId()); queue.add(session); } @@ -75,7 +82,7 @@ public class NotificationServer { @OnError public void error(Session session, Throwable t) { queue.remove(session); - logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); } @@ -87,16 +94,17 @@ public class NotificationServer { session.getBasicRemote().sendText(update); session.close(); } catch (IOException e) { - logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); } } } - public static void sendNotification(String notification, String propNotificationType, String pdpURL){ + public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception { - logger.debug("Notification set to " + propNotificationType); + LOGGER.debug("Notification set to " + propNotificationType); if (propNotificationType.equals("ueb")){ + String topic = null; try { aURL = new URL(pdpURL); @@ -104,33 +112,111 @@ public class NotificationServer { } catch (MalformedURLException e1) { pdpURL = pdpURL.replace("/", ""); topic = pdpURL.replace(":", ""); - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); } - hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER); - logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); - CambriaPublisher pub = null; + hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY); + String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET); + + LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); + CambriaBatchingPublisher pub = null; try { - pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic ); + if(hosts==null || topic==null || apiKey==null || apiSecret==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + } + + hosts.trim(); + topic.trim(); + apiKey.trim(); + apiSecret.trim(); + pub = new CambriaClientBuilders.PublisherBuilder () + .usingHosts ( hosts ) + .onTopic ( topic ) + .authenticatedBy ( apiKey, apiSecret ) + .build () + ; + } catch (MalformedURLException e1) { - // TODO Auto-generated catch block + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); e1.printStackTrace(); } catch (GeneralSecurityException e1) { - // TODO Auto-generated catch block + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); e1.printStackTrace(); } + try { pub.send( "MyPartitionKey", notification ); } catch (IOException e) { - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"); - } - pub.close(); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage()); + } + + // close the publisher. The batching publisher does not send events + // immediately, so you MUST use close to send any remaining messages. + // You provide the amount of time you're willing to wait for the sends + // to succeed before giving up. If any messages are unsent after that time, + // they're returned to your app. You could, for example, persist to disk + // and try again later. + final List stuck = pub.close ( 20, TimeUnit.SECONDS ); + + if ( stuck.size () > 0 ) + { + System.err.println ( stuck.size() + " messages unsent" ); + } + else + { + System.out.println ( "Clean exit; all messages sent: " + notification ); + } + + } else if (propNotificationType.equals("dmaap")) { + + // Setting up the Publisher for DMaaP MR + String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); + String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + + try { + if(dmaapServers==null || topic==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + } + + dmaapServers.trim(); + topic.trim(); + aafLogin.trim(); + aafPassword.trim(); + + List<String> dmaapList = null; + if(dmaapServers.contains(",")) { + dmaapList = new ArrayList<String>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); + } else { + dmaapList = new ArrayList<String>(); + dmaapList.add(dmaapServers); + } + + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, + topic, + aafLogin, + aafPassword); + + // Sending notification through DMaaP Message Router + publisher.send( "MyPartitionKey", notification); + LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); + publisher.close(); + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage()); + } } + for(Session session: queue) { try { session.getBasicRemote().sendText(notification); } catch (IOException e) { - logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); } } } diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java index 6ba073815..151b40125 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Removed.java @@ -20,15 +20,17 @@ package org.openecomp.policy.pdp.rest.notifications; +import org.openecomp.policy.api.RemovedPolicy; + /** * Removal is the POJO for removal updates of the Policy. * It must have the Policy removed and its Version number. * - * @version 0.1 + * @version 0.2 * */ -public class Removed { +public class Removed implements RemovedPolicy{ private String policyName = null; private String versionNo = null; diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java index 39236bada..f0d9aa38d 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/Updated.java @@ -22,17 +22,21 @@ package org.openecomp.policy.pdp.rest.notifications; import java.util.HashMap; +import org.openecomp.policy.api.LoadedPolicy; +import org.openecomp.policy.api.UpdateType; + /** * Updated is the POJO which consists of any new or Updated Policy information. * It must hold the Policy Name, version Number, Matches. * - * @version 0.1 + * @version 0.2 * */ -public class Updated { +public class Updated implements LoadedPolicy{ private String policyName = null; private String versionNo = null; private HashMap<String,String> matches = null; + private UpdateType updateType = null; public String getPolicyName() { return policyName; @@ -57,5 +61,14 @@ public class Updated { public void setMatches(HashMap<String,String> matches) { this.matches = matches; } + + @Override + public UpdateType getUpdateType() { + return this.updateType; + } + + public void setUpdateType(UpdateType updateType){ + this.updateType = updateType; + } } |