aboutsummaryrefslogtreecommitdiffstats
path: root/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java')
-rw-r--r--ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java359
1 files changed, 192 insertions, 167 deletions
diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java
index 8c962192b..5c1162a7d 100644
--- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java
+++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java
@@ -3,6 +3,7 @@
* ONAP-PDP-REST
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +21,10 @@
package org.onap.policy.pdp.rest.notifications;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
+
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -48,180 +53,200 @@ import org.onap.policy.rest.XacmlRestProperties;
import org.onap.policy.utils.BusPublisher;
import org.onap.policy.xacml.api.XACMLErrorConstants;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.research.xacml.util.XACMLProperties;
-
/**
- * The NotificationServer sends the Server Notifications to the Clients once there is any Event.
- * WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications.
- * DMAAP is being used as a medium for sending Notifications.
+ * The NotificationServer sends the Server Notifications to the Clients once there is any Event. WebSockets is being
+ * used as a medium for sending Notifications. UEB is being used as a medium for sending Notifications. DMAAP is being
+ * used as a medium for sending Notifications.
*
* @version 0.2
*
**/
@ServerEndpoint(value = "/notifications")
public class NotificationServer {
- private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
- private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
- private static String update = null;
-
- @OnOpen
- public void openConnection(Session session) {
- LOGGER.info("Session Connected: " + session.getId());
- queue.add(session);
- }
-
- @OnClose
- public void closeConnection(Session session) {
- queue.remove(session);
- }
-
- @OnError
- public void error(Session session, Throwable t) {
- queue.remove(session);
- LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
-
- }
-
- @OnMessage
- public void message(String message, Session session) {
-
- if(message.equalsIgnoreCase("Manual")) {
- try {
- session.getBasicRemote().sendText(update);
- session.close();
- } catch (IOException e) {
- LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
- }
- }
- }
-
- public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException {
-
- LOGGER.debug("Notification set to " + propNotificationType);
- if (propNotificationType.equals("ueb")){
-
- String topic = null;
- try {
- URL aURL = new URL(pdpURL);
- topic = aURL.getHost() + aURL.getPort();
- } catch (MalformedURLException e1) {
- pdpURL = pdpURL.replace("/", "");
- topic = pdpURL.replace(":", "");
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
- PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
- }
- String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
- String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
- String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
-
- LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
- CambriaBatchingPublisher pub = null;
- try {
- if(hosts==null || topic==null || apiKey==null || apiSecret==null){
- LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
- throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
- }
-
- hosts = hosts.trim();
- topic = topic.trim();
- apiKey = apiKey.trim();
- apiSecret = apiSecret.trim();
- pub = new CambriaClientBuilders.PublisherBuilder ()
- .usingHosts ( hosts )
- .onTopic ( topic )
- .authenticatedBy ( apiKey, apiSecret )
- .build ()
- ;
-
- } catch (MalformedURLException e1) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
- } catch (GeneralSecurityException e1) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1);
- }
- if(pub != null){
- try {
- pub.send( "MyPartitionKey", notification );
- } catch (IOException e) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
- }
- // close the publisher. The batching publisher does not send events
- // immediately, so you MUST use close to send any remaining messages.
- // You provide the amount of time you're willing to wait for the sends
- // to succeed before giving up. If any messages are unsent after that time,
- // they're returned to your app. You could, for example, persist to disk
- // and try again later.
- final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
-
- if (!stuck.isEmpty()){
- LOGGER.error( stuck.size() + " messages unsent" );
- }else{
- LOGGER.info( "Clean exit; all messages sent: " + notification );
- }
- }
- } else if (propNotificationType.equals("dmaap")) {
-
- // Setting up the Publisher for DMaaP MR
- String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
- String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
- String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
- String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
-
- try {
- if(dmaapServers==null || topic==null){
- LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
- throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
- }
-
- dmaapServers= dmaapServers.trim();
- topic= topic.trim();
- aafLogin= aafLogin.trim();
- aafPassword= aafPassword.trim();
-
- List<String> dmaapList = null;
- if(dmaapServers.contains(",")) {
- dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
- } else {
- dmaapList = new ArrayList<>();
- dmaapList.add(dmaapServers);
- }
-
- BusPublisher publisher =
- new BusPublisher.DmaapPublisherWrapper(dmaapList,
- topic,
- aafLogin,
- aafPassword);
-
- // Sending notification through DMaaP Message Router
- publisher.send( "MyPartitionKey", notification);
- LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
- publisher.close();
-
- } catch (Exception e) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
- }
- }
-
- for(Session session: queue) {
- try {
- LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + "\n "
- + "PDPUrl is " + pdpURL);
- LOGGER.info("NotificationServer: sending text message");
- session.getBasicRemote().sendText(notification);
- } catch (IOException e) {
- LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
- }
- }
-
- NotificationService.sendNotification(notification);
- }
-
- public static void setUpdate(String update) {
- NotificationServer.update = update;
- }
+ private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
+ private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
+ private static String update = null;
+
+ @OnOpen
+ public void openConnection(Session session) {
+ LOGGER.info("Session Connected: " + session.getId());
+ queue.add(session);
+ }
+
+ @OnClose
+ public void closeConnection(Session session) {
+ queue.remove(session);
+ }
+
+ /**
+ * Error callback method.
+ * @param session The session on which the error occurs
+ * @param throwable exception thrown on the error callback
+ */
+ @OnError
+ public void error(Session session, Throwable throwable) {
+ queue.remove(session);
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: "
+ + throwable.getMessage());
+ }
+
+ /**
+ * Message callback method.
+ * @param message the message on the callback
+ * @param session The session on which the error occurs
+ */
+ @OnMessage
+ public void message(String message, Session session) {
+
+ if (message.equalsIgnoreCase("Manual")) {
+ try {
+ session.getBasicRemote().sendText(update);
+ session.close();
+ } catch (IOException e) {
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
+ + e.getMessage() + e);
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+ }
+ }
+ }
+
+ /**
+ * Send a notification.
+ * @param notification The notification type
+ * @param propNotificationType Notification type properties
+ * @param pdpUrl URL of the PDP
+ * @throws PolicyEngineException on errors from the policy engine
+ * @throws IOException exceptions on IO errors
+ * @throws InterruptedException interrupts
+ */
+ public static void sendNotification(String notification, String propNotificationType, String pdpUrl)
+ throws PolicyEngineException, IOException, InterruptedException {
+
+ LOGGER.debug("Notification set to " + propNotificationType);
+ if (propNotificationType.equals("ueb")) {
+
+ String topic = null;
+ try {
+ URL notificationUrl = new URL(pdpUrl);
+ topic = notificationUrl.getHost() + notificationUrl.getPort();
+ } catch (MalformedURLException e1) {
+ pdpUrl = pdpUrl.replace("/", "");
+ topic = pdpUrl.replace(":", "");
+ LOGGER.error(
+ XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+ PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1,
+ "Error in parsing out pdpURL for UEB notfication ");
+ }
+ String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
+ String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
+ String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
+
+ LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+ CambriaBatchingPublisher pub = null;
+ try {
+ if (hosts == null || topic == null || apiKey == null || apiSecret == null) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
+ + "UEB properties are missing from the property file ");
+ throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
+ + "UEB properties are missing from the property file ");
+ }
+
+ hosts = hosts.trim();
+ topic = topic.trim();
+ apiKey = apiKey.trim();
+ apiSecret = apiSecret.trim();
+ pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(hosts).onTopic(topic)
+ .authenticatedBy(apiKey, apiSecret).build();
+
+ } catch (MalformedURLException e1) {
+ LOGGER.error(
+ XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
+ } catch (GeneralSecurityException e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher"
+ + e1.getMessage() + e1);
+ }
+ if (pub != null) {
+ try {
+ pub.send("MyPartitionKey", notification);
+ } catch (IOException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
+ + e.getMessage() + e);
+ }
+ // close the publisher. The batching publisher does not send events
+ // immediately, so you MUST use close to send any remaining messages.
+ // You provide the amount of time you're willing to wait for the sends
+ // to succeed before giving up. If any messages are unsent after that time,
+ // they're returned to your app. You could, for example, persist to disk
+ // and try again later.
+ final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+
+ if (!stuck.isEmpty()) {
+ LOGGER.error(stuck.size() + " messages unsent");
+ } else {
+ LOGGER.info("Clean exit; all messages sent: " + notification);
+ }
+ }
+ } else if (propNotificationType.equals("dmaap")) {
+
+ // Setting up the Publisher for DMaaP MR
+ String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
+ String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
+ String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+ String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+
+ try {
+ if (dmaapServers == null || topic == null) {
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
+ + "DMaaP properties are missing from the property file ");
+ throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
+ + "DMaaP properties are missing from the property file ");
+ }
+
+ dmaapServers = dmaapServers.trim();
+ topic = topic.trim();
+ aafLogin = aafLogin.trim();
+ aafPassword = aafPassword.trim();
+
+ List<String> dmaapList = null;
+ if (dmaapServers.contains(",")) {
+ dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+ } else {
+ dmaapList = new ArrayList<>();
+ dmaapList.add(dmaapServers);
+ }
+
+ BusPublisher publisher =
+ new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
+
+ // Sending notification through DMaaP Message Router
+ publisher.send("MyPartitionKey", notification);
+ LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+ publisher.close();
+
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
+ + e.getMessage() + e);
+ }
+ }
+
+ for (Session session : queue) {
+ try {
+ LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId()
+ + "\n " + "PDPUrl is " + pdpUrl);
+ LOGGER.info("NotificationServer: sending text message");
+ session.getBasicRemote().sendText(notification);
+ } catch (IOException e) {
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
+ + e.getMessage() + e);
+ }
+ }
+
+ NotificationService.sendNotification(notification);
+ }
+
+ public static void setUpdate(String update) {
+ NotificationServer.update = update;
+ }
}