aboutsummaryrefslogtreecommitdiffstats
path: root/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java')
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java136
1 files changed, 111 insertions, 25 deletions
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
index d6cda7491..fe295ebb8 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java
@@ -24,8 +24,12 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import javax.websocket.OnClose;
import javax.websocket.OnError;
@@ -34,28 +38,31 @@ import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
-import org.openecomp.policy.rest.XACMLRestProperties;
import org.openecomp.policy.common.logging.eelf.MessageCodes;
import org.openecomp.policy.common.logging.eelf.PolicyLogger;
-import com.att.nsa.cambria.client.CambriaClientFactory;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusPublisher;
import org.openecomp.policy.xacml.api.XACMLErrorConstants;
-import com.att.research.xacml.util.XACMLProperties;
-import org.openecomp.policy.common.logging.flexlogger.*;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
/**
* The NotificationServer sends the Server Notifications to the Clients once there is any Event.
* WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications.
+ * UEB is being used as a medium for sending Notifications.
+ * DMAAP is being used as a medium for sending Notifications.
*
- * @version 0.1
+ * @version 0.2
*
**/
@ServerEndpoint(value = "/notifications")
public class NotificationServer {
- private static final Logger logger = FlexLogger.getLogger(NotificationServer.class);
+ private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>();
private static String update = null;
private static String hosts = null;
@@ -63,7 +70,7 @@ public class NotificationServer {
@OnOpen
public void openConnection(Session session) {
- logger.info("Session Connected: " + session.getId());
+ LOGGER.info("Session Connected: " + session.getId());
queue.add(session);
}
@@ -75,7 +82,7 @@ public class NotificationServer {
@OnError
public void error(Session session, Throwable t) {
queue.remove(session);
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
}
@@ -87,16 +94,17 @@ public class NotificationServer {
session.getBasicRemote().sendText(update);
session.close();
} catch (IOException e) {
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
}
}
}
- public static void sendNotification(String notification, String propNotificationType, String pdpURL){
+ public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception {
- logger.debug("Notification set to " + propNotificationType);
+ LOGGER.debug("Notification set to " + propNotificationType);
if (propNotificationType.equals("ueb")){
+
String topic = null;
try {
aURL = new URL(pdpURL);
@@ -104,33 +112,111 @@ public class NotificationServer {
} catch (MalformedURLException e1) {
pdpURL = pdpURL.replace("/", "");
topic = pdpURL.replace(":", "");
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
}
- hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
- logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
- CambriaPublisher pub = null;
+ hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
+ String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
+
+ LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+ CambriaBatchingPublisher pub = null;
try {
- pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic );
+ if(hosts==null || topic==null || apiKey==null || apiSecret==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+ }
+
+ hosts.trim();
+ topic.trim();
+ apiKey.trim();
+ apiSecret.trim();
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts ( hosts )
+ .onTopic ( topic )
+ .authenticatedBy ( apiKey, apiSecret )
+ .build ()
+ ;
+
} catch (MalformedURLException e1) {
- // TODO Auto-generated catch block
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
e1.printStackTrace();
} catch (GeneralSecurityException e1) {
- // TODO Auto-generated catch block
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
e1.printStackTrace();
}
+
try {
pub.send( "MyPartitionKey", notification );
} catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update");
- }
- pub.close();
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+ }
+
+ // close the publisher. The batching publisher does not send events
+ // immediately, so you MUST use close to send any remaining messages.
+ // You provide the amount of time you're willing to wait for the sends
+ // to succeed before giving up. If any messages are unsent after that time,
+ // they're returned to your app. You could, for example, persist to disk
+ // and try again later.
+ final List stuck = pub.close ( 20, TimeUnit.SECONDS );
+
+ if ( stuck.size () > 0 )
+ {
+ System.err.println ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ System.out.println ( "Clean exit; all messages sent: " + notification );
+ }
+
+ } else if (propNotificationType.equals("dmaap")) {
+
+ // Setting up the Publisher for DMaaP MR
+ String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+ String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+ String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+
+ try {
+ if(dmaapServers==null || topic==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ }
+
+ dmaapServers.trim();
+ topic.trim();
+ aafLogin.trim();
+ aafPassword.trim();
+
+ List<String> dmaapList = null;
+ if(dmaapServers.contains(",")) {
+ dmaapList = new ArrayList<String>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+ } else {
+ dmaapList = new ArrayList<String>();
+ dmaapList.add(dmaapServers);
+ }
+
+ BusPublisher publisher =
+ new BusPublisher.DmaapPublisherWrapper(dmaapList,
+ topic,
+ aafLogin,
+ aafPassword);
+
+ // Sending notification through DMaaP Message Router
+ publisher.send( "MyPartitionKey", notification);
+ LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+ publisher.close();
+
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+ }
}
+
for(Session session: queue) {
try {
session.getBasicRemote().sendText(notification);
} catch (IOException e) {
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
}
}
}