diff options
-rw-r--r-- | ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/XACMLPdpLoader.java | 1158 | ||||
-rw-r--r-- | ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java | 359 |
2 files changed, 782 insertions, 735 deletions
diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/XACMLPdpLoader.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/XACMLPdpLoader.java index 52dcd96f3..9fd655194 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/XACMLPdpLoader.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/XACMLPdpLoader.java @@ -3,6 +3,7 @@ * ONAP-PDP-REST * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,22 @@ package org.onap.policy.pdp.rest; +import com.att.research.xacml.api.pap.PAPException; +import com.att.research.xacml.api.pap.PDPStatus; +import com.att.research.xacml.api.pap.PDPStatus.Status; +import com.att.research.xacml.api.pdp.PDPEngine; +import com.att.research.xacml.api.pdp.PDPEngineFactory; +import com.att.research.xacml.api.pip.PIPEngine; +import com.att.research.xacml.api.pip.PIPException; +import com.att.research.xacml.api.pip.PIPFinder; +import com.att.research.xacml.api.pip.PIPFinderFactory; +import com.att.research.xacml.util.FactoryException; +import com.att.research.xacml.util.XACMLProperties; +import com.att.research.xacmlatt.pdp.policy.PolicyDef; +import com.att.research.xacmlatt.pdp.policy.dom.DOMPolicyDef; +import com.att.research.xacmlatt.pdp.std.StdPolicyFinderFactory; +import com.google.common.base.Splitter; + import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -39,603 +56,608 @@ import java.util.Properties; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.onap.policy.common.logging.flexlogger.FlexLogger; +import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.pdp.rest.notifications.NotificationController; import org.onap.policy.rest.XacmlRest; import org.onap.policy.rest.XacmlRestProperties; -import org.onap.policy.common.logging.flexlogger.FlexLogger; -import org.onap.policy.common.logging.flexlogger.Logger; - import org.onap.policy.xacml.api.XACMLErrorConstants; -import com.att.research.xacml.api.pap.PAPException; -import com.att.research.xacml.api.pap.PDPStatus; -import com.att.research.xacml.api.pap.PDPStatus.Status; -import com.att.research.xacml.api.pdp.PDPEngine; -import com.att.research.xacml.api.pdp.PDPEngineFactory; -import com.att.research.xacml.api.pip.PIPEngine; -import com.att.research.xacml.api.pip.PIPException; -import com.att.research.xacml.api.pip.PIPFinder; -import com.att.research.xacml.api.pip.PIPFinderFactory; import org.onap.policy.xacml.std.pap.StdPDPPIPConfig; import org.onap.policy.xacml.std.pap.StdPDPPolicy; import org.onap.policy.xacml.std.pap.StdPDPStatus; -import com.att.research.xacml.util.FactoryException; -import com.att.research.xacml.util.XACMLProperties; -import com.att.research.xacmlatt.pdp.policy.PolicyDef; -import com.att.research.xacmlatt.pdp.policy.dom.DOMPolicyDef; -import com.att.research.xacmlatt.pdp.std.StdPolicyFinderFactory; -import com.google.common.base.Splitter; /** - * Does the work for loading policy and PIP configurations sent from the PAP - * servlet. - * - * - * + * Does the work for loading policy and PIP configurations sent from the PAP servlet. */ public class XACMLPdpLoader { - private static final Logger LOGGER = FlexLogger.getLogger(XACMLPdpLoader.class); - private static NotificationController notificationController = new NotificationController(); - private static final Long notifyDelay = (long) XACMLPdpServlet.getNotificationDelay(); + private static final Logger LOGGER = FlexLogger.getLogger(XACMLPdpLoader.class); + // Repeated string constants. + private static final String DOES_NOT_EXIST = " does NOT exist."; + private static final String CORRUPTED_POLICY_FILE_DELETING = "Corrupted policy file, deleting: "; + private static final String DOT_FILE = ".file"; - public static synchronized PDPEngine loadEngine(StdPDPStatus status, - Properties policyProperties, Properties pipProperties) { - LOGGER.info("loadEngine: " + policyProperties + " " + pipProperties); - // - // First load our policies - // - try { - // - // Were we given some properties? - // - if (policyProperties == null) { - // - // On init we have no incoming configuration, so just - // Load our current saved configuration - // - policyProperties = new Properties(); - try (InputStream is = Files.newInputStream(getPDPPolicyCache())) { - policyProperties.load(is); - } - } + private static NotificationController notificationController = new NotificationController(); + private static final Long notifyDelay = (long) XACMLPdpServlet.getNotificationDelay(); - // - // Get our policy cache up-to-date - // - // Side effects of this include: - // - downloading of policies from remote locations, and - // - creating new "<PolicyId>.file" properties for files existing - // local - // - LOGGER.info("XACMLPdpLoader: cache the policies."); - XACMLPdpLoader.cachePolicies(policyProperties); - // - // Validate the policies - // - LOGGER.info("XACMLPdpLoader: validating the policies."); - XACMLPdpLoader.validatePolicies(policyProperties, status); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Status: " + status); - } - } catch (ConcurrentModificationException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + e.getMessage() + e); - } catch (Exception e) { - String error = "Failed to load Policy Cache properties file: " - + e.getMessage(); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + error, e); - status.addLoadError(error); - status.setStatus(PDPStatus.Status.LOAD_ERRORS); - } - // - // Load our PIP configuration - // - try { - // - // Were we given some properties to use? - // - if (pipProperties == null) { - // - // Load our current saved configuration - // - pipProperties = new Properties(); - try (InputStream is = Files.newInputStream(getPIPConfig())) { - pipProperties.load(is); - } - } - // - // Validate our PIP configurations - // - XACMLPdpLoader.validatePipConfiguration(pipProperties, status); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Status: " + status); - } - } catch (Exception e) { - String error = "Failed to load/validate Pip Config properties file: " - + e.getMessage(); - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + error, e); - status.addLoadError(XACMLErrorConstants.ERROR_PROCESS_FLOW + error); - status.setStatus(PDPStatus.Status.LOAD_ERRORS); - } - // - // Were they validated? - // - if (status.getStatus() == Status.LOAD_ERRORS) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"there were load errors"); - return null; - } - // - // Reset our official properties the PDP factory - // uses to configure the PDP engine. - // - XacmlRest.loadXacmlProperties(policyProperties, pipProperties); - // - // Dump ALL our properties that we are trying to load - // - try { - LOGGER.info(XACMLProperties.getProperties().toString()); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to get XACML Properties", e); - } - // - // Now load the PDP engine - // - PDPEngineFactory factory = null; - PDPEngine engine = null; - try { - factory = PDPEngineFactory.newInstance(); - engine = factory.newEngine(); - LOGGER.info("Loaded new PDP engine."); - status.setStatus(Status.UP_TO_DATE); - } catch (FactoryException e) { - String error = "Failed to create new PDP Engine"; - LOGGER.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR +error, e); - status.addLoadError(error); - } - return engine; - } + public static synchronized PDPEngine loadEngine(StdPDPStatus status, Properties policyProperties, + Properties pipProperties) { + LOGGER.info("loadEngine: " + policyProperties + " " + pipProperties); + // + // First load our policies + // + try { + // + // Were we given some properties? + // + if (policyProperties == null) { + // + // On init we have no incoming configuration, so just + // Load our current saved configuration + // + policyProperties = new Properties(); + try (InputStream is = Files.newInputStream(getPDPPolicyCache())) { + policyProperties.load(is); + } + } - private static HashMap<String, PolicyDef> policyContainer = null; + // + // Get our policy cache up-to-date + // + // Side effects of this include: + // - downloading of policies from remote locations, and + // - creating new "<PolicyId>.file" properties for files existing + // local + // + LOGGER.info("XACMLPdpLoader: cache the policies."); + XACMLPdpLoader.cachePolicies(policyProperties); + // + // Validate the policies + // + LOGGER.info("XACMLPdpLoader: validating the policies."); + XACMLPdpLoader.validatePolicies(policyProperties, status); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Status: " + status); + } + } catch (ConcurrentModificationException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + e.getMessage() + e); + } catch (Exception e) { + String error = "Failed to load Policy Cache properties file: " + e.getMessage(); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + error, e); + status.addLoadError(error); + status.setStatus(PDPStatus.Status.LOAD_ERRORS); + } + // + // Load our PIP configuration + // + try { + // + // Were we given some properties to use? + // + if (pipProperties == null) { + // + // Load our current saved configuration + // + pipProperties = new Properties(); + try (InputStream is = Files.newInputStream(getPIPConfig())) { + pipProperties.load(is); + } + } + // + // Validate our PIP configurations + // + XACMLPdpLoader.validatePipConfiguration(pipProperties, status); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Status: " + status); + } + } catch (Exception e) { + String error = "Failed to load/validate Pip Config properties file: " + e.getMessage(); + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + error, e); + status.addLoadError(XACMLErrorConstants.ERROR_PROCESS_FLOW + error); + status.setStatus(PDPStatus.Status.LOAD_ERRORS); + } + // + // Were they validated? + // + if (status.getStatus() == Status.LOAD_ERRORS) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "there were load errors"); + return null; + } + // + // Reset our official properties the PDP factory + // uses to configure the PDP engine. + // + XacmlRest.loadXacmlProperties(policyProperties, pipProperties); + // + // Dump ALL our properties that we are trying to load + // + try { + LOGGER.info(XACMLProperties.getProperties().toString()); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to get XACML Properties", e); + } + // + // Now load the PDP engine + // + PDPEngineFactory factory = null; + PDPEngine engine = null; + try { + factory = PDPEngineFactory.newInstance(); + engine = factory.newEngine(); + LOGGER.info("Loaded new PDP engine."); + status.setStatus(Status.UP_TO_DATE); + } catch (FactoryException e) { + String error = "Failed to create new PDP Engine"; + LOGGER.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + error, e); + status.addLoadError(error); + } + return engine; + } - public static synchronized void sendNotification(){ - Thread notify = new Thread(){ - @Override - public void run(){ - try{ - Thread.sleep(notifyDelay); - NotificationController.sendNotification(); - }catch(Exception e){ - LOGGER.error(XACMLErrorConstants.ERROR_UNKNOWN + e); - } - } - }; - notify.start(); - } + private static HashMap<String, PolicyDef> policyContainer = null; - public static synchronized void validatePolicies(Properties properties, - StdPDPStatus status) throws PAPException { - Set<String> rootPolicies = XACMLProperties.getRootPolicyIDs(properties); - Set<String> refPolicies = XACMLProperties - .getReferencedPolicyIDs(properties); - policyContainer = new HashMap<String, PolicyDef>(); + /** + * Thread for sending notifications. + */ + public static synchronized void sendNotification() { + Thread notify = new Thread() { + @Override + public void run() { + try { + Thread.sleep(notifyDelay); + NotificationController.sendNotification(); + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_UNKNOWN + e); + } + } + }; + notify.start(); + } - LOGGER.info("XACMLPdpLoader: load rootPolicies"); - for (String id : rootPolicies) { - loadPolicy(properties, status, id, true); - } - // remember which policies were root policies - status.addAllLoadedRootPolicies(status.getLoadedPolicies()); - LOGGER.info("XACMLPdpLoader: load referencedPolicies"); - for (String id : refPolicies) { - loadPolicy(properties, status, id, false); - } - LOGGER.info("Loaded " + status.getLoadedPolicies().size() - + " policies, failed to load " - + status.getFailedPolicies().size() + " policies, " - + status.getLoadedRootPolicies().size() + " root policies"); - notificationController.check(status, policyContainer); - if (status.getLoadedRootPolicies().size() == 0) { - LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW +"NO ROOT POLICIES LOADED!!! Cannot serve PEP Requests."); - status.addLoadWarning("NO ROOT POLICIES LOADED!!! Cannot serve PEP Requests."); - } - policyContainer.clear(); - } + /** + * Method to validate policies. + * + * @param properties The properties of the policies + * @param status the PDP status + * @throws PAPException on validation errors + */ + public static synchronized void validatePolicies(Properties properties, StdPDPStatus status) throws PAPException { + Set<String> rootPolicies = XACMLProperties.getRootPolicyIDs(properties); + policyContainer = new HashMap<>(); + LOGGER.info("XACMLPdpLoader: load rootPolicies"); + for (String id : rootPolicies) { + loadPolicy(properties, status, id, true); + } + // remember which policies were root policies + status.addAllLoadedRootPolicies(status.getLoadedPolicies()); + LOGGER.info("XACMLPdpLoader: load referencedPolicies"); + Set<String> refPolicies = XACMLProperties.getReferencedPolicyIDs(properties); + for (String id : refPolicies) { + loadPolicy(properties, status, id, false); + } + LOGGER.info("Loaded " + status.getLoadedPolicies().size() + " policies, failed to load " + + status.getFailedPolicies().size() + " policies, " + status.getLoadedRootPolicies().size() + + " root policies"); + notificationController.check(status, policyContainer); + if (status.getLoadedRootPolicies().isEmpty()) { + LOGGER.warn( + XACMLErrorConstants.ERROR_PROCESS_FLOW + "NO ROOT POLICIES LOADED!!! Cannot serve PEP Requests."); + status.addLoadWarning("NO ROOT POLICIES LOADED!!! Cannot serve PEP Requests."); + } + policyContainer.clear(); + } - public static synchronized void loadPolicy(Properties properties, - StdPDPStatus status, String id, boolean isRoot) throws PAPException { - PolicyDef policy = null; - String location = null; - URI locationURI = null; - boolean isFile = false; - boolean rougeFile = false; - try { - location = properties.getProperty(id + ".file"); - if(location != null){ - isFile = true; - locationURI = Paths.get(location).toUri(); - try (InputStream is = Files.newInputStream(Paths.get(location))) { - policy = DOMPolicyDef.load(is); - } catch (Exception e){ - // This Happens if a any issue with the error policyFile. Lets remove it. - try { - LOGGER.error("Corrupted policy file, deleting: " + location + e); - Files.delete(Paths.get(location)); - properties.remove(id + ".file"); - rougeFile = true; - } catch (IOException e1) { - LOGGER.error(e1); - } - } - } - if(location==null || rougeFile){ - if(rougeFile){ - rougeFile = false; - } - location = properties.getProperty(id + ".url"); - if (location != null) { - // - // Construct the URL - // - int errorCount=0; - boolean error= false; - do{ - error=false; - PapUrlResolver papUrls = PapUrlResolver.getInstance(); - while(papUrls.hasMoreUrls()){ - String papID = papUrls.getUserId(); - String papPass = papUrls.getPass(); - Base64.Encoder encoder = Base64.getEncoder(); - String encoding = encoder.encodeToString((papID+":"+papPass).getBytes(StandardCharsets.UTF_8)); - locationURI = URI.create(papUrls.getUrl(PapUrlResolver.extractIdFromUrl(location))); - URL url = locationURI.toURL(); - URLConnection urlConnection = null; - try{ - urlConnection = url.openConnection(); - } catch (IOException e){ - LOGGER.error("Exception Occured while opening connection" +e); - papUrls.failed(); - papUrls.getNext(); - break; - } - urlConnection.setRequestProperty(XacmlRestProperties.PROP_PDP_HTTP_HEADER_ID, - XACMLProperties.getProperty(XacmlRestProperties.PROP_PDP_ID)); - urlConnection.setRequestProperty("Authorization", "Basic " + encoding); - // - // Now construct the output file name - // - Path outFile = Paths.get(getPDPConfig().toAbsolutePath() - .toString(), id); - // - // Copy it to disk - // - try (FileOutputStream fos = new FileOutputStream( - outFile.toFile())) { - IOUtils.copy(urlConnection.getInputStream(), fos); - } catch(IOException e){ - LOGGER.error("Exception Occured while Copying input stream" +e); - papUrls.failed(); - papUrls.getNext(); - break; - } - // - // Now try to load - // - isFile = true; - try (InputStream fis = Files.newInputStream(outFile)) { - policy = DOMPolicyDef.load(fis); - }catch(Exception e){ - try { - LOGGER.error("Corrupted policy file, deleting: " + location +e); - Files.delete(outFile); - error = true; - errorCount++; - break; - } catch (IOException e1) { - LOGGER.error(e1); - } - } - // - // Save it - // - properties.setProperty(id + ".file", outFile - .toAbsolutePath().toString()); - error = false; - break; - } - }while(error && errorCount>2); - } - } - if (policy != null) { - status.addLoadedPolicy(new StdPDPPolicy(id, isRoot, - locationURI, properties)); - LOGGER.info("Loaded policy: " + policy.getIdentifier() - + " version: " + policy.getVersion().stringValue()); - // Sending the policy objects to the Notification Controller. - policyContainer.put(id, policy); - } else { - String error = "Failed to load policy " + location; - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + error); - status.setStatus(PDPStatus.Status.LOAD_ERRORS); - status.addLoadError(error); - status.addFailedPolicy(new StdPDPPolicy(id, isRoot)); - } - } catch (Exception e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW +"Failed to load policy '" + id + "' from location '" - + location + "'", e); - status.setStatus(PDPStatus.Status.LOAD_ERRORS); - status.addFailedPolicy(new StdPDPPolicy(id, isRoot)); - // - // Is it a file? - // - if (isFile) { - // - // Let's remove it - // - try { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Corrupted policy file, deleting: " + location); - Files.delete(Paths.get(location)); + /** + * Load a policy. + * + * @param properties the policy properties + * @param status the PDP status + * @param id the policy ID + * @param isRoot indicates if operation being done as root + * @throws PAPException on loading errors + */ + public static synchronized void loadPolicy(Properties properties, StdPDPStatus status, String id, boolean isRoot) + throws PAPException { + PolicyDef policy = null; + String location = null; + URI locationUri = null; + boolean isFile = false; + boolean rougeFile = false; + try { + location = properties.getProperty(id + DOT_FILE); + if (location != null) { + isFile = true; + locationUri = Paths.get(location).toUri(); + try (InputStream is = Files.newInputStream(Paths.get(location))) { + policy = DOMPolicyDef.load(is); + } catch (Exception e) { + // This Happens if a any issue with the error policyFile. Lets remove it. + try { + LOGGER.error(CORRUPTED_POLICY_FILE_DELETING + location + e); + Files.delete(Paths.get(location)); + properties.remove(id + DOT_FILE); + rougeFile = true; + } catch (IOException e1) { + LOGGER.error(e1); + } + } + } + if (location == null || rougeFile) { + if (rougeFile) { + rougeFile = false; + } + location = properties.getProperty(id + ".url"); + if (location != null) { + // + // Construct the URL + // + int errorCount = 0; + boolean error = false; + do { + error = false; + PapUrlResolver papUrls = PapUrlResolver.getInstance(); + while (papUrls.hasMoreUrls()) { + String papID = papUrls.getUserId(); + String papPass = papUrls.getPass(); + Base64.Encoder encoder = Base64.getEncoder(); + locationUri = URI.create(papUrls.getUrl(PapUrlResolver.extractIdFromUrl(location))); + URL url = locationUri.toURL(); + URLConnection urlConnection = null; + try { + urlConnection = url.openConnection(); + } catch (IOException e) { + LOGGER.error("Exception Occured while opening connection" + e); + papUrls.failed(); + papUrls.getNext(); + break; + } + String encoding = + encoder.encodeToString((papID + ":" + papPass).getBytes(StandardCharsets.UTF_8)); + urlConnection.setRequestProperty(XacmlRestProperties.PROP_PDP_HTTP_HEADER_ID, + XACMLProperties.getProperty(XacmlRestProperties.PROP_PDP_ID)); + urlConnection.setRequestProperty("Authorization", "Basic " + encoding); + // + // Now construct the output file name + // + Path outFile = Paths.get(getPDPConfig().toAbsolutePath().toString(), id); + // + // Copy it to disk + // + try (FileOutputStream fos = new FileOutputStream(outFile.toFile())) { + IOUtils.copy(urlConnection.getInputStream(), fos); + } catch (IOException e) { + LOGGER.error("Exception Occured while Copying input stream" + e); + papUrls.failed(); + papUrls.getNext(); + break; + } + // + // Now try to load + // + isFile = true; + try (InputStream fis = Files.newInputStream(outFile)) { + policy = DOMPolicyDef.load(fis); + } catch (Exception e) { + try { + LOGGER.error(CORRUPTED_POLICY_FILE_DELETING + location + e); + Files.delete(outFile); + error = true; + errorCount++; + break; + } catch (IOException e1) { + LOGGER.error(e1); + } + } + // + // Save it + // + properties.setProperty(id + DOT_FILE, outFile.toAbsolutePath().toString()); + error = false; + break; + } + } + while (error && errorCount > 2); + } + } + if (policy != null) { + status.addLoadedPolicy(new StdPDPPolicy(id, isRoot, locationUri, properties)); + LOGGER.info( + "Loaded policy: " + policy.getIdentifier() + " version: " + policy.getVersion().stringValue()); + // Sending the policy objects to the Notification Controller. + policyContainer.put(id, policy); + } else { + String error = "Failed to load policy " + location; + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + error); + status.setStatus(PDPStatus.Status.LOAD_ERRORS); + status.addLoadError(error); + status.addFailedPolicy(new StdPDPPolicy(id, isRoot)); + } + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to load policy '" + id + "' from location '" + + location + "'", e); + status.setStatus(PDPStatus.Status.LOAD_ERRORS); + status.addFailedPolicy(new StdPDPPolicy(id, isRoot)); + // + // Is it a file? + // + if (isFile) { + // + // Let's remove it + // + try { + LOGGER.error( + XACMLErrorConstants.ERROR_PROCESS_FLOW + CORRUPTED_POLICY_FILE_DELETING + location); + Files.delete(Paths.get(location)); - } catch (IOException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e1); - } - } - } - } + } catch (IOException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e1); + } + } + } + } - public static synchronized void validatePipConfiguration( - Properties properties, StdPDPStatus status) throws PAPException { - try { - PIPFinderFactory factory = PIPFinderFactory.newInstance(properties); - if (factory == null) { - throw new FactoryException( - "Could not create PIP Finder Factory: " - + properties - .getProperty(XACMLProperties.PROP_PIPFINDERFACTORY)); - } - PIPFinder finder = factory.getFinder(properties); - // - // Check for this, although it should always return something - // - if (finder == null) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "pip finder factory returned a null engine."); - throw new PIPException("Could not create PIP Finder"); - } else { - LOGGER.info("Loaded PIP finder"); - } - for (PIPEngine engine : finder.getPIPEngines()) { - LOGGER.info("Configured PIP Engine: " + engine.getName()); - StdPDPPIPConfig config = new StdPDPPIPConfig(); - config.setName(engine.getName()); - status.addLoadedPipConfig(config); - } - } catch (FactoryException | PIPException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "validate PIP configuration failed: " - + e.getLocalizedMessage()); - status.addLoadError(e.getLocalizedMessage()); - status.setStatus(Status.LOAD_ERRORS); - throw new PAPException(e); - } - } + /** + * Validate PIP configuration. + * + * @param properties the properties to validate + * @param status the PDP status + * @throws PAPException on validation exceptions + */ + public static synchronized void validatePipConfiguration(Properties properties, StdPDPStatus status) + throws PAPException { + try { + PIPFinderFactory factory = PIPFinderFactory.newInstance(properties); + if (factory == null) { + throw new FactoryException("Could not create PIP Finder Factory: " + + properties.getProperty(XACMLProperties.PROP_PIPFINDERFACTORY)); + } + PIPFinder finder = factory.getFinder(properties); + // + // Check for this, although it should always return something + // + if (finder == null) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "pip finder factory returned a null engine."); + throw new PIPException("Could not create PIP Finder"); + } else { + LOGGER.info("Loaded PIP finder"); + } + for (PIPEngine engine : finder.getPIPEngines()) { + LOGGER.info("Configured PIP Engine: " + engine.getName()); + StdPDPPIPConfig config = new StdPDPPIPConfig(); + config.setName(engine.getName()); + status.addLoadedPipConfig(config); + } + } catch (FactoryException | PIPException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "validate PIP configuration failed: " + + e.getLocalizedMessage()); + status.addLoadError(e.getLocalizedMessage()); + status.setStatus(Status.LOAD_ERRORS); + throw new PAPException(e); + } + } - /** - * Iterates the policies defined in the props object to ensure they are - * loaded locally. Policies are searched for in the following order: - see - * if the current properties has a "<PolicyID>.file" entry and that - * file exists in the local directory - if not, see if the file exists in - * the local directory; if so create a ".file" property for it. - if not, - * get the "<PolicyID>.url" property and try to GET the policy from - * that location (and set the ".file" property) - * - * If the ".file" property is created, then true is returned to tell the - * caller that the props object changed. - * - * @param props - * @return true/false if anything was changed in the props object - * @throws PAPException - */ - public static synchronized boolean cachePolicies(Properties props) - throws PAPException { - boolean changed = false; - String[] lists = new String[2]; - lists[0] = props.getProperty(XACMLProperties.PROP_ROOTPOLICIES); - lists[1] = props.getProperty(XACMLProperties.PROP_REFERENCEDPOLICIES); - for (String list : lists) { - // - // Check for a null or empty parameter - // - if (list == null || list.length() == 0) { - continue; - } - Iterable<String> policies = Splitter.on(',').trimResults() - .omitEmptyStrings().split(list); - for (String policy : policies) { - boolean policyExists = false; + /** + * Iterates the policies defined in the props object to ensure they are loaded locally. Policies are searched for in + * the following order: - see if the current properties has a "<PolicyID>.file" entry and that file exists in + * the local directory - if not, see if the file exists in the local directory; if so create a ".file" property for + * it. - if not, get the "<PolicyID>.url" property and try to GET the policy from that location (and set the + * ".file" property) + * + * <p>If the ".file" property is created, then true is returned to tell the caller that the props object changed. + * + * @param props the properties to cache + * @return true/false if anything was changed in the props object + * @throws PAPException on cacheing exceptions + */ + public static synchronized boolean cachePolicies(Properties props) throws PAPException { + boolean changed = false; + String[] lists = new String[2]; + lists[0] = props.getProperty(XACMLProperties.PROP_ROOTPOLICIES); + lists[1] = props.getProperty(XACMLProperties.PROP_REFERENCEDPOLICIES); + for (String list : lists) { + // + // Check for a null or empty parameter + // + if (list == null || list.length() == 0) { + continue; + } + Iterable<String> policies = Splitter.on(',').trimResults().omitEmptyStrings().split(list); + for (String policy : policies) { + boolean policyExists = false; - // First look for ".file" property and verify the file exists - String propLocation = props.getProperty(policy - + StdPolicyFinderFactory.PROP_FILE); - if (propLocation != null) { - // - // Does it exist? - // - policyExists = Files.exists(Paths.get(propLocation)); - if (policyExists == false) { - LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Policy file " + policy + " expected at " - + propLocation + " does NOT exist."); - } - } + // First look for ".file" property and verify the file exists + String propLocation = props.getProperty(policy + StdPolicyFinderFactory.PROP_FILE); + if (propLocation != null) { + // + // Does it exist? + // + policyExists = Paths.get(propLocation).toFile().exists(); + if (!policyExists) { + LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Policy file " + policy + " expected at " + + propLocation + DOES_NOT_EXIST); + } + } - // If ".file" property does not exist, try looking for the local - // file anyway - // (it might exist without having a ".file" property set for it) - if (policyExists == false) { - // - // Now construct the output file name - // - Path outFile = Paths.get(getPDPConfig().toAbsolutePath() - .toString(), policy); - // - // Double check to see if we pulled it at some point - // - policyExists = Files.exists(outFile); - if (policyExists) { - // - // Set the property so the PDP engine doesn't have - // to pull it from the URL but rather the FILE. - // - LOGGER.info("Policy does exist: " - + outFile.toAbsolutePath().toString()); - props.setProperty(policy - + StdPolicyFinderFactory.PROP_FILE, outFile - .toAbsolutePath().toString()); - // - // Indicate that there were changes made to the - // properties - // - changed = true; - } else { + // If ".file" property does not exist, try looking for the local + // file anyway + // (it might exist without having a ".file" property set for it) + if (!policyExists) { + // + // Now construct the output file name + // + Path outFile = Paths.get(getPDPConfig().toAbsolutePath().toString(), policy); + // + // Double check to see if we pulled it at some point + // + policyExists = outFile.toFile().exists(); + if (policyExists) { + // + // Set the property so the PDP engine doesn't have + // to pull it from the URL but rather the FILE. + // + LOGGER.info("Policy does exist: " + outFile.toAbsolutePath().toString()); + props.setProperty(policy + StdPolicyFinderFactory.PROP_FILE, + outFile.toAbsolutePath().toString()); + // + // Indicate that there were changes made to the + // properties + // + changed = true; + } else { - // File does not exist locally, so we need to get it - // from the location given in the ".url" property (which - // MUST exist) + // File does not exist locally, so we need to get it + // from the location given in the ".url" property (which + // MUST exist) - // - // There better be a URL to retrieve it - // - propLocation = props.getProperty(policy - + StdPolicyFinderFactory.PROP_URL); - if (propLocation != null) { - // - // Get it - // - PapUrlResolver papUrls = PapUrlResolver.getInstance(); - while(papUrls.hasMoreUrls()){ - String papID = papUrls.getUserId(); - String papPass = papUrls.getPass(); - Base64.Encoder encoder = Base64.getEncoder(); - String encoding = encoder.encodeToString((papID+":"+papPass).getBytes(StandardCharsets.UTF_8)); - URL url = null; - try { - // - // Create the URL - // - url = new URL(papUrls.getUrl(PapUrlResolver.extractIdFromUrl(propLocation))); - LOGGER.info("Pulling " + url.toString()); - // - // Open the connection - // - URLConnection urlConnection = url.openConnection(); - urlConnection.setRequestProperty(XacmlRestProperties.PROP_PDP_HTTP_HEADER_ID, - XACMLProperties.getProperty(XacmlRestProperties.PROP_PDP_ID)); - urlConnection.setRequestProperty("Authorization", "Basic " + encoding); - // - // Copy it to disk - // - try (InputStream is = urlConnection - .getInputStream(); - OutputStream os = new FileOutputStream( - outFile.toFile())) { - IOUtils.copy(is, os); - } - // - // Now save it in the properties as a .file - // - LOGGER.info("Pulled policy: " - + outFile.toAbsolutePath().toString()); - props.setProperty(policy - + StdPolicyFinderFactory.PROP_FILE, - outFile.toAbsolutePath().toString()); - papUrls.succeeded(); - // - // Indicate that there were changes made to the - // properties - // - changed = true; - } catch (MalformedURLException e) { - papUrls.failed(); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Policy '" + policy - + "' had bad URL in new configuration, URL='" + propLocation + "'"); - } catch (Exception e) { - papUrls.failed(); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error while retrieving policy " - + policy + " from URL " + url + ", e=" + e); - } - papUrls.getNext(); - } - } else { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Policy " + policy - + " does NOT exist and does NOT have a URL"); - } - } - } - } - } - return changed; - } + // + // There better be a URL to retrieve it + // + propLocation = props.getProperty(policy + StdPolicyFinderFactory.PROP_URL); + if (propLocation != null) { + // + // Get it + // + PapUrlResolver papUrls = PapUrlResolver.getInstance(); + while (papUrls.hasMoreUrls()) { + String papID = papUrls.getUserId(); + String papPass = papUrls.getPass(); + Base64.Encoder encoder = Base64.getEncoder(); + String encoding = encoder + .encodeToString((papID + ":" + papPass).getBytes(StandardCharsets.UTF_8)); + URL url = null; + try { + // + // Create the URL + // + url = new URL(papUrls.getUrl(PapUrlResolver.extractIdFromUrl(propLocation))); + LOGGER.info("Pulling " + url.toString()); + // + // Open the connection + // + URLConnection urlConnection = url.openConnection(); + urlConnection.setRequestProperty(XacmlRestProperties.PROP_PDP_HTTP_HEADER_ID, + XACMLProperties.getProperty(XacmlRestProperties.PROP_PDP_ID)); + urlConnection.setRequestProperty("Authorization", "Basic " + encoding); + // + // Copy it to disk + // + try (InputStream is = urlConnection.getInputStream(); + OutputStream os = new FileOutputStream(outFile.toFile())) { + IOUtils.copy(is, os); + } + // + // Now save it in the properties as a .file + // + LOGGER.info("Pulled policy: " + outFile.toAbsolutePath().toString()); + props.setProperty(policy + StdPolicyFinderFactory.PROP_FILE, + outFile.toAbsolutePath().toString()); + papUrls.succeeded(); + // + // Indicate that there were changes made to the + // properties + // + changed = true; + } catch (MalformedURLException e) { + papUrls.failed(); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Policy '" + policy + + "' had bad URL in new configuration, URL='" + propLocation + "'"); + } catch (Exception e) { + papUrls.failed(); + LOGGER.error( + XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error while retrieving policy " + + policy + " from URL " + url + ", e=" + e); + } + papUrls.getNext(); + } + } else { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Policy " + policy + + " does NOT exist and does NOT have a URL"); + } + } + } + } + } + return changed; + } - public static synchronized Path getPDPPolicyCache() throws PAPException { - Path config = getPDPConfig(); - Path policyProperties = Paths.get(config.toAbsolutePath().toString(), - "xacml.policy.properties"); - if (Files.notExists(policyProperties)) { - LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + policyProperties.toAbsolutePath().toString() - + " does NOT exist."); - // - // Try to create the file - // - try { - Files.createFile(policyProperties); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create policy properties file: " - + policyProperties.toAbsolutePath().toString() +e); - throw new PAPException( - "Failed to create policy properties file: " - + policyProperties.toAbsolutePath().toString()); - } - } - return policyProperties; - } + /** + * Get the PDP policy cache. + * + * @return the PDP policy cache + * @throws PAPException on cache get errors + */ + public static synchronized Path getPDPPolicyCache() throws PAPException { + Path config = getPDPConfig(); + Path policyProperties = Paths.get(config.toAbsolutePath().toString(), "xacml.policy.properties"); + if (!policyProperties.toFile().exists()) { + LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + policyProperties.toAbsolutePath().toString() + + DOES_NOT_EXIST); + // + // Try to create the file + // + try { + Files.createFile(policyProperties); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create policy properties file: " + + policyProperties.toAbsolutePath().toString() + e); + throw new PAPException( + "Failed to create policy properties file: " + policyProperties.toAbsolutePath().toString()); + } + } + return policyProperties; + } - public static synchronized Path getPIPConfig() throws PAPException { - Path config = getPDPConfig(); - Path pipConfigProperties = Paths.get( - config.toAbsolutePath().toString(), "xacml.pip.properties"); - if (Files.notExists(pipConfigProperties)) { - LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + pipConfigProperties.toAbsolutePath().toString() - + " does NOT exist."); - // - // Try to create the file - // - try { - Files.createFile(pipConfigProperties); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create pip properties file: " - + pipConfigProperties.toAbsolutePath().toString() +e); - throw new PAPException("Failed to create pip properties file: " - + pipConfigProperties.toAbsolutePath().toString()); - } - } - return pipConfigProperties; - } + /** + * Get the PIP configuration. + * + * @return the PIP configuration + * @throws PAPException on get exceptions + */ + public static synchronized Path getPIPConfig() throws PAPException { + Path config = getPDPConfig(); + Path pipConfigProperties = Paths.get(config.toAbsolutePath().toString(), "xacml.pip.properties"); + if (!pipConfigProperties.toFile().exists()) { + LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + pipConfigProperties.toAbsolutePath().toString() + + DOES_NOT_EXIST); + // + // Try to create the file + // + try { + Files.createFile(pipConfigProperties); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create pip properties file: " + + pipConfigProperties.toAbsolutePath().toString() + e); + throw new PAPException( + "Failed to create pip properties file: " + pipConfigProperties.toAbsolutePath().toString()); + } + } + return pipConfigProperties; + } - public static synchronized Path getPDPConfig() throws PAPException { - Path config = Paths.get(XACMLProperties - .getProperty(XacmlRestProperties.PROP_PDP_CONFIG)); - if (Files.notExists(config)) { - LOGGER.warn(XACMLErrorConstants.ERROR_PROCESS_FLOW + config.toAbsolutePath().toString() + " does NOT exist."); - // - // Try to create the directory - // - try { - Files.createDirectories(config); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create config directory: " - + config.toAbsolutePath().toString(), e); - throw new PAPException("Failed to create config directory: " - + config.toAbsolutePath().toString()); - } - } - return config; - } + /** + * Get the PDP configuration. + * + * @return the PDP configuration + * @throws PAPException on get exceptions + */ + public static synchronized Path getPDPConfig() throws PAPException { + Path config = Paths.get(XACMLProperties.getProperty(XacmlRestProperties.PROP_PDP_CONFIG)); + if (!config.toFile().exists()) { + LOGGER.warn( + XACMLErrorConstants.ERROR_PROCESS_FLOW + config.toAbsolutePath().toString() + DOES_NOT_EXIST); + // + // Try to create the directory + // + try { + Files.createDirectories(config); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Failed to create config directory: " + + config.toAbsolutePath().toString(), e); + throw new PAPException("Failed to create config directory: " + config.toAbsolutePath().toString()); + } + } + return config; + } } diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java index 8c962192b..5c1162a7d 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java @@ -3,6 +3,7 @@ * ONAP-PDP-REST * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,10 @@ package org.onap.policy.pdp.rest.notifications; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.research.xacml.util.XACMLProperties; + import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -48,180 +53,200 @@ import org.onap.policy.rest.XacmlRestProperties; import org.onap.policy.utils.BusPublisher; import org.onap.policy.xacml.api.XACMLErrorConstants; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.research.xacml.util.XACMLProperties; - /** - * The NotificationServer sends the Server Notifications to the Clients once there is any Event. - * WebSockets is being used as a medium for sending Notifications. - * UEB is being used as a medium for sending Notifications. - * DMAAP is being used as a medium for sending Notifications. + * The NotificationServer sends the Server Notifications to the Clients once there is any Event. WebSockets is being + * used as a medium for sending Notifications. UEB is being used as a medium for sending Notifications. DMAAP is being + * used as a medium for sending Notifications. * * @version 0.2 * **/ @ServerEndpoint(value = "/notifications") public class NotificationServer { - private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); - private static Queue<Session> queue = new ConcurrentLinkedQueue<>(); - private static String update = null; - - @OnOpen - public void openConnection(Session session) { - LOGGER.info("Session Connected: " + session.getId()); - queue.add(session); - } - - @OnClose - public void closeConnection(Session session) { - queue.remove(session); - } - - @OnError - public void error(Session session, Throwable t) { - queue.remove(session); - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); - - } - - @OnMessage - public void message(String message, Session session) { - - if(message.equalsIgnoreCase("Manual")) { - try { - session.getBasicRemote().sendText(update); - session.close(); - } catch (IOException e) { - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); - } - } - } - - public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException { - - LOGGER.debug("Notification set to " + propNotificationType); - if (propNotificationType.equals("ueb")){ - - String topic = null; - try { - URL aURL = new URL(pdpURL); - topic = aURL.getHost() + aURL.getPort(); - } catch (MalformedURLException e1) { - pdpURL = pdpURL.replace("/", ""); - topic = pdpURL.replace(":", ""); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); - PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); - } - String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); - String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY); - String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET); - - LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); - CambriaBatchingPublisher pub = null; - try { - if(hosts==null || topic==null || apiKey==null || apiSecret==null){ - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); - throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); - } - - hosts = hosts.trim(); - topic = topic.trim(); - apiKey = apiKey.trim(); - apiSecret = apiSecret.trim(); - pub = new CambriaClientBuilders.PublisherBuilder () - .usingHosts ( hosts ) - .onTopic ( topic ) - .authenticatedBy ( apiKey, apiSecret ) - .build () - ; - - } catch (MalformedURLException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); - } catch (GeneralSecurityException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1); - } - if(pub != null){ - try { - pub.send( "MyPartitionKey", notification ); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); - } - // close the publisher. The batching publisher does not send events - // immediately, so you MUST use close to send any remaining messages. - // You provide the amount of time you're willing to wait for the sends - // to succeed before giving up. If any messages are unsent after that time, - // they're returned to your app. You could, for example, persist to disk - // and try again later. - final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS ); - - if (!stuck.isEmpty()){ - LOGGER.error( stuck.size() + " messages unsent" ); - }else{ - LOGGER.info( "Clean exit; all messages sent: " + notification ); - } - } - } else if (propNotificationType.equals("dmaap")) { - - // Setting up the Publisher for DMaaP MR - String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); - String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC); - String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); - String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); - - try { - if(dmaapServers==null || topic==null){ - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - } - - dmaapServers= dmaapServers.trim(); - topic= topic.trim(); - aafLogin= aafLogin.trim(); - aafPassword= aafPassword.trim(); - - List<String> dmaapList = null; - if(dmaapServers.contains(",")) { - dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); - } else { - dmaapList = new ArrayList<>(); - dmaapList.add(dmaapServers); - } - - BusPublisher publisher = - new BusPublisher.DmaapPublisherWrapper(dmaapList, - topic, - aafLogin, - aafPassword); - - // Sending notification through DMaaP Message Router - publisher.send( "MyPartitionKey", notification); - LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); - publisher.close(); - - } catch (Exception e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); - } - } - - for(Session session: queue) { - try { - LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + "\n " - + "PDPUrl is " + pdpURL); - LOGGER.info("NotificationServer: sending text message"); - session.getBasicRemote().sendText(notification); - } catch (IOException e) { - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); - } - } - - NotificationService.sendNotification(notification); - } - - public static void setUpdate(String update) { - NotificationServer.update = update; - } + private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); + private static Queue<Session> queue = new ConcurrentLinkedQueue<>(); + private static String update = null; + + @OnOpen + public void openConnection(Session session) { + LOGGER.info("Session Connected: " + session.getId()); + queue.add(session); + } + + @OnClose + public void closeConnection(Session session) { + queue.remove(session); + } + + /** + * Error callback method. + * @param session The session on which the error occurs + * @param throwable exception thrown on the error callback + */ + @OnError + public void error(Session session, Throwable throwable) { + queue.remove(session); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + + throwable.getMessage()); + } + + /** + * Message callback method. + * @param message the message on the callback + * @param session The session on which the error occurs + */ + @OnMessage + public void message(String message, Session session) { + + if (message.equalsIgnoreCase("Manual")) { + try { + session.getBasicRemote().sendText(update); + session.close(); + } catch (IOException e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: " + + e.getMessage() + e); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); + } + } + } + + /** + * Send a notification. + * @param notification The notification type + * @param propNotificationType Notification type properties + * @param pdpUrl URL of the PDP + * @throws PolicyEngineException on errors from the policy engine + * @throws IOException exceptions on IO errors + * @throws InterruptedException interrupts + */ + public static void sendNotification(String notification, String propNotificationType, String pdpUrl) + throws PolicyEngineException, IOException, InterruptedException { + + LOGGER.debug("Notification set to " + propNotificationType); + if (propNotificationType.equals("ueb")) { + + String topic = null; + try { + URL notificationUrl = new URL(pdpUrl); + topic = notificationUrl.getHost() + notificationUrl.getPort(); + } catch (MalformedURLException e1) { + pdpUrl = pdpUrl.replace("/", ""); + topic = pdpUrl.replace(":", ""); + LOGGER.error( + XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); + PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, + "Error in parsing out pdpURL for UEB notfication "); + } + String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); + String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY); + String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET); + + LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); + CambriaBatchingPublisher pub = null; + try { + if (hosts == null || topic == null || apiKey == null || apiSecret == null) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + + "UEB properties are missing from the property file "); + throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + + "UEB properties are missing from the property file "); + } + + hosts = hosts.trim(); + topic = topic.trim(); + apiKey = apiKey.trim(); + apiSecret = apiSecret.trim(); + pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(hosts).onTopic(topic) + .authenticatedBy(apiKey, apiSecret).build(); + + } catch (MalformedURLException e1) { + LOGGER.error( + XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); + } catch (GeneralSecurityException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + + e1.getMessage() + e1); + } + if (pub != null) { + try { + pub.send("MyPartitionKey", notification); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + + e.getMessage() + e); + } + // close the publisher. The batching publisher does not send events + // immediately, so you MUST use close to send any remaining messages. + // You provide the amount of time you're willing to wait for the sends + // to succeed before giving up. If any messages are unsent after that time, + // they're returned to your app. You could, for example, persist to disk + // and try again later. + final List<?> stuck = pub.close(20, TimeUnit.SECONDS); + + if (!stuck.isEmpty()) { + LOGGER.error(stuck.size() + " messages unsent"); + } else { + LOGGER.info("Clean exit; all messages sent: " + notification); + } + } + } else if (propNotificationType.equals("dmaap")) { + + // Setting up the Publisher for DMaaP MR + String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); + String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC); + String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + + try { + if (dmaapServers == null || topic == null) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + + "DMaaP properties are missing from the property file "); + throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + + "DMaaP properties are missing from the property file "); + } + + dmaapServers = dmaapServers.trim(); + topic = topic.trim(); + aafLogin = aafLogin.trim(); + aafPassword = aafPassword.trim(); + + List<String> dmaapList = null; + if (dmaapServers.contains(",")) { + dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); + } else { + dmaapList = new ArrayList<>(); + dmaapList.add(dmaapServers); + } + + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword); + + // Sending notification through DMaaP Message Router + publisher.send("MyPartitionKey", notification); + LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); + publisher.close(); + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + + e.getMessage() + e); + } + } + + for (Session session : queue) { + try { + LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + + "\n " + "PDPUrl is " + pdpUrl); + LOGGER.info("NotificationServer: sending text message"); + session.getBasicRemote().sendText(notification); + } catch (IOException e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: " + + e.getMessage() + e); + } + } + + NotificationService.sendNotification(notification); + } + + public static void setUpdate(String update) { + NotificationServer.update = update; + } } |