diff options
Diffstat (limited to 'ONAP-PDP-REST/src/main/java')
-rw-r--r-- | ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/api/services/NotificationService.java | 517 |
1 files changed, 274 insertions, 243 deletions
diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/api/services/NotificationService.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/api/services/NotificationService.java index 3806d26d9..d61b7c006 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/api/services/NotificationService.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/api/services/NotificationService.java @@ -47,276 +47,307 @@ import org.springframework.http.HttpStatus; import com.att.research.xacml.util.XACMLProperties; public class NotificationService { - public static final String BACKUPFILE = "topicBackup.txt"; - private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName()); - private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>(); - private static int interval = 15000; - private static Thread backUpthread = null; - private static Object resourceLock = new Object(); - private static List<String> dmaapList = null; - private static String dmaapServers = null; - private static String aafLogin = null; - private static String aafPassword = null; - - private String notificationResponse = null; - private HttpStatus status = HttpStatus.BAD_REQUEST; - - /** - * NotificationService Constructor. - * - * @param notificationTopic Topic Name in String format. - * @param requestID Request ID in String format. - * @param serviceType Needs to be NotificationServiceType based enumeration value. - */ - public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) { - init(); - if(dmaapServers==null || aafLogin==null || aafPassword==null){ - notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file"; - return; - } - UUID requestUUID = null; - if (requestID != null && !requestID.isEmpty()) { - try { - requestUUID = UUID.fromString(requestID); - } catch (IllegalArgumentException e) { - requestUUID = UUID.randomUUID(); - logger.info("Generated Random UUID: " + requestUUID.toString(), e); - } - }else{ - requestUUID = UUID.randomUUID(); - logger.info("Generated Random UUID: " + requestUUID.toString()); - } - try{ + public static final String BACKUPFILE = "topicBackup.txt"; + private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName()); + private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>(); + private static int interval = 15000; + private static Thread backUpthread = null; + private static Object resourceLock = new Object(); + private static List<String> dmaapList = null; + private static String dmaapServers = null; + private static String aafLogin = null; + private static String aafPassword = null; + + private String notificationResponse = null; + private HttpStatus status = HttpStatus.BAD_REQUEST; + + /** + * NotificationService Constructor. + * + * @param notificationTopic Topic Name in String format. + * @param requestID Request ID in String format. + * @param serviceType Needs to be NotificationServiceType based enumeration value. + */ + public NotificationService( + String notificationTopic, String requestID, NotificationServiceType serviceType) { + init(); + if (dmaapServers == null || aafLogin == null || aafPassword == null) { + notificationResponse = + XACMLErrorConstants.ERROR_DATA_ISSUE + + "DMaaP properties are missing from the property file"; + return; + } + UUID requestUUID = null; + if (requestID != null && !requestID.isEmpty()) { + try { + requestUUID = UUID.fromString(requestID); + } catch (IllegalArgumentException e) { + requestUUID = UUID.randomUUID(); + logger.info("Generated Random UUID: " + requestUUID.toString(), e); + } + } else { + requestUUID = UUID.randomUUID(); + logger.info("Generated Random UUID: " + requestUUID.toString()); + } + try { run(notificationTopic, serviceType); - }catch(PolicyException e){ - notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e; + } catch (PolicyException e) { + notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e; status = HttpStatus.BAD_REQUEST; } - } + } - private static void init() { - if(dmaapServers==null || aafLogin==null || aafPassword==null){ - dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); - aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); - aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); - interval = Integer.parseInt(XACMLProperties.getProperty("CLIENT_INTERVAL", Integer.toString(interval))); - if(dmaapServers==null || aafLogin==null || aafPassword==null){ - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - return; - } - // Cleanup Values. - dmaapServers= dmaapServers.trim(); - aafLogin = aafLogin.trim(); - aafPassword = aafPassword.trim(); - // Get servers to List. - if(dmaapServers.contains(",")) { - dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); - } else { - dmaapList = new ArrayList<>(); - dmaapList.add(dmaapServers); - } - callThread(); - } - } - - public static void reloadProps(){ - dmaapServers = null; - aafLogin = null; - aafPassword = null; - backUpthread = null; - } + private static void init() { + if (dmaapServers == null || aafLogin == null || aafPassword == null) { + dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + interval = + Integer.parseInt( + XACMLProperties.getProperty("CLIENT_INTERVAL", Integer.toString(interval))); + if (dmaapServers == null || aafLogin == null || aafPassword == null) { + logger.error( + XACMLErrorConstants.ERROR_DATA_ISSUE + + "DMaaP properties are missing from the property file "); + return; + } + // Cleanup Values. + dmaapServers = dmaapServers.trim(); + aafLogin = aafLogin.trim(); + aafPassword = aafPassword.trim(); + // Get servers to List. + if (dmaapServers.contains(",")) { + dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); + } else { + dmaapList = new ArrayList<>(); + dmaapList.add(dmaapServers); + } + callThread(); + } + } - private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{ - // Check Validation - if(notificationTopic==null){ - String message = "Notification Topic is null"; + public static void reloadProps() { + dmaapServers = null; + aafLogin = null; + aafPassword = null; + backUpthread = null; + } + + private void run(String notificationTopic, NotificationServiceType serviceType) + throws PolicyException { + // Check Validation + if (notificationTopic == null) { + String message = "Notification Topic is null"; logger.error(message); throw new PolicyException(message); } notificationTopic = notificationTopic.trim(); - if(notificationTopic.isEmpty()){ - String message = "Notification Topic is not valid. "; + if (notificationTopic.isEmpty()) { + String message = "Notification Topic is not valid. "; logger.error(message); throw new PolicyException(message); } - // if already exists give error.Saying already registered. - // Get Result. - try{ + // if already exists give error.Saying already registered. + // Get Result. + try { status = HttpStatus.OK; switch (serviceType) { - case ADD: - addTopic(notificationTopic); - notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic + - "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms"; - break; - case REMOVE: - removeTopic(notificationTopic); - notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic."; - break; - case HB: - heartBeat(notificationTopic); - notificationResponse = "Success!! HeartBeat registered."; - break; - } - }catch (Exception e){ + case ADD: + addTopic(notificationTopic); + notificationResponse = + "Success!! Please give permissions to " + + aafLogin + + " that PDP will use to publish on given topic :" + + notificationTopic + + "\n Start calling /sendHeartbeat API at an interval less than " + + Integer.toString(interval) + + "ms"; + break; + case REMOVE: + removeTopic(notificationTopic); + notificationResponse = + "Notification Topic :" + + notificationTopic + + " has been removed and PDP will not publish notifications to this Topic."; + break; + case HB: + heartBeat(notificationTopic); + notificationResponse = "Success!! HeartBeat registered."; + break; + } + } catch (Exception e) { logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e); status = HttpStatus.BAD_REQUEST; throw new PolicyException(e); } - } - - // Used to register Heart beat. - private void heartBeat(String notificationTopic) throws PolicyException{ - if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){ - topicQueue.put(notificationTopic, new Date()); - }else{ - logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered."); - throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered."); - } - } + } - // Used to remove Topic. - private static void removeTopic(String notificationTopic) throws PolicyException{ - if(topicQueue.containsKey(notificationTopic)){ - topicQueue.remove(notificationTopic); - removeTopicFromBackup(notificationTopic); - }else{ - logger.info("Failed Removal, Topic " + notificationTopic + " is not registered."); - throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered."); - } - } + // Used to register Heart beat. + private void heartBeat(String notificationTopic) throws PolicyException { + if (!topicQueue.isEmpty() && topicQueue.containsKey(notificationTopic)) { + topicQueue.put(notificationTopic, new Date()); + } else { + logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered."); + throw new PolicyException( + "Failed HeartBeat, Topic " + notificationTopic + "is not registered."); + } + } - private static void removeTopicFromBackup(String notificationTopic) { - synchronized (resourceLock) { - try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) { - List<String> replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList()); - try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) { - replaced.forEach(line-> { - if(line.trim().isEmpty()){ - return; - } - pw.println(line); - }); - } - lines.close(); - } catch (IOException e) { - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e); - } - } - } + // Used to remove Topic. + private static void removeTopic(String notificationTopic) throws PolicyException { + if (topicQueue.containsKey(notificationTopic)) { + topicQueue.remove(notificationTopic); + removeTopicFromBackup(notificationTopic); + } else { + logger.info("Failed Removal, Topic " + notificationTopic + " is not registered."); + throw new PolicyException( + "Failed Removal, Topic " + notificationTopic + " is not registered."); + } + } - // Used to add Topic. - private void addTopic(String notificationTopic) throws PolicyException{ - // validate if topic exists. - if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){ - topicQueue.put(notificationTopic, new Date()); - logger.info("Topic " + notificationTopic + " is already registered."); - throw new PolicyException("Topic " + notificationTopic + " is already registered."); - } - topicQueue.put(notificationTopic, new Date()); - addTopictoBackUp(notificationTopic); - } + private static void removeTopicFromBackup(String notificationTopic) { + synchronized (resourceLock) { + try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) { + List<String> replaced = + lines + .map(line -> (line.split("=")[0].equals(notificationTopic) ? "" : line)) + .collect(Collectors.toList()); + try (PrintWriter pw = new PrintWriter(BACKUPFILE, "UTF-8")) { + replaced.forEach( + line -> { + if (line.trim().isEmpty()) { + return; + } + pw.println(line); + }); + } + } catch (IOException e) { + logger.error( + XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e); + } + } + } - private void addTopictoBackUp(String notificationTopic) { - synchronized (resourceLock) { - try { - Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND); - } catch (IOException e) { - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e); - } - } - } + // Used to add Topic. + private void addTopic(String notificationTopic) throws PolicyException { + // validate if topic exists. + if (!topicQueue.isEmpty() && topicQueue.containsKey(notificationTopic)) { + topicQueue.put(notificationTopic, new Date()); + logger.info("Topic " + notificationTopic + " is already registered."); + throw new PolicyException("Topic " + notificationTopic + " is already registered."); + } + topicQueue.put(notificationTopic, new Date()); + addTopictoBackUp(notificationTopic); + } + + private void addTopictoBackUp(String notificationTopic) { + synchronized (resourceLock) { + try { + Files.write( + Paths.get(BACKUPFILE), + (notificationTopic + "=" + new Date().toString() + "\n").getBytes(), + StandardOpenOption.APPEND); + } catch (IOException e) { + logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e); + } + } + } + + // Maintains BackUp and Queue Topic. + private static void callThread() { + // Create the backup file if it not exists. + backup(); + if (backUpthread == null) { + Runnable task = + () -> { + logger.info("BackUpThread not set. Starting now !"); + threadTask(); + }; + backUpthread = new Thread(task); + backUpthread.start(); + } + } - // Maintains BackUp and Queue Topic. - private static void callThread() { - // Create the backup file if it not exists. - backup(); - if(backUpthread==null){ - Runnable task = () -> { - logger.info("BackUpThread not set. Starting now !"); - threadTask(); - }; - backUpthread = new Thread(task); - backUpthread.start(); - } - } + private static void backup() { + synchronized (resourceLock) { + try { + File backUpFile = new File(BACKUPFILE); + if (!backUpFile.exists() && backUpFile.createNewFile()) { + logger.info(" BackUp File for topic's has been created !"); + } else { + // File Already exists. Process file and load the Memory. + Stream<String> stream = Files.lines(Paths.get(BACKUPFILE)); + Map<String, Date> data = + stream + .map(line -> line.split(",")) + .collect(Collectors.toMap(e -> e[0], e -> new Date())); + stream.close(); + data.forEach( + (key, value) -> + logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value)); + topicQueue.putAll(data); + } + } catch (IOException e) { + logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e); + } + } + } - private static void backup(){ - synchronized (resourceLock) { - try{ - File backUpFile = new File(BACKUPFILE); - if(!backUpFile.exists() && backUpFile.createNewFile()){ - logger.info(" BackUp File for topic's has been created !"); - }else{ - // File Already exists. Process file and load the Memory. - Stream<String> stream = Files.lines(Paths.get(BACKUPFILE)); - Map<String,Date> data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date())); - stream.close(); - data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value)); - topicQueue.putAll(data); - } - }catch(IOException e){ - logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e); - } - } - } + private static void threadTask() { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(interval); + for (Map.Entry<String, Date> map : topicQueue.entrySet()) { + Date currentTime = new Date(); + long timeDiff = 0; + timeDiff = currentTime.getTime() - map.getValue().getTime(); + if (timeDiff > (interval + 1500)) { + removeTopic(map.getKey()); + } + } + } catch (InterruptedException e) { + logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e); + Thread.currentThread().interrupt(); + } catch (PolicyException e) { + logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e); + } + } + } - private static void threadTask() { - while(true){ - try { - TimeUnit.MILLISECONDS.sleep(interval); - for(Map.Entry<String, Date> map : topicQueue.entrySet()){ - Date currentTime = new Date(); - long timeDiff = 0; - timeDiff = currentTime.getTime()-map.getValue().getTime(); - if(timeDiff > (interval+1500)){ - removeTopic(map.getKey()); - } - } - } catch (InterruptedException | PolicyException e) { - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e); - } - } - } + public String getResult() { + return notificationResponse; + } - public String getResult() { - return notificationResponse; - } + public HttpStatus getResponseCode() { + return status; + } - public HttpStatus getResponseCode() { - return status; - } + /** + * Entry point for sending Notifications from Notification Server. + * + * @param notification String JSON format of notification message which needs to be sent. + */ + public static void sendNotification(String notification) { + init(); + for (String topic : topicQueue.keySet()) { + sendDmaapMessage(topic, notification); + } + } - /** - * Entry point for sending Notifications from Notification Server. - * @param notification String JSON format of notification message which needs to be sent. - */ - public static void sendNotification(String notification) { - init(); - for (String topic: topicQueue.keySet()){ - sendDmaapMessage(topic, notification); - } - } - - private static void sendDmaapMessage(String topic, String notification) { - BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, - topic, - aafLogin, - aafPassword); - // Sending notification through DMaaP Message Router - logger.info("NotificationService: send DMaaP Message. "); - publisher.send( "MyPartitionKey", notification); - logger.info("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); - publisher.close(); - } - - /** - * Notification service Type Enumeration - */ - public enum NotificationServiceType{ - ADD, - REMOVE, - HB - } + private static void sendDmaapMessage(String topic, String notification) { + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword); + // Sending notification through DMaaP Message Router + logger.info("NotificationService: send DMaaP Message. "); + publisher.send("MyPartitionKey", notification); + logger.info("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); + publisher.close(); + } + /** Notification service Type Enumeration */ + public enum NotificationServiceType { + ADD, + REMOVE, + HB + } }
\ No newline at end of file |