aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-01-10 11:00:50 -0500
committerAlex Shatov <alexs@att.com>2018-01-10 11:07:30 -0500
commit1369bea8b3c24ef063799acefbfc01659878f034 (patch)
tree95fa3e5580f62be9c1e1d630ed0c6496b9fb03a2 /policyhandler
parentdc5da5bf63ae4a4ac11b4b5c46407e58da16fbfe (diff)
variable collection of policies per component
* new feature variable collection of policies per component in DCAE * massive refactoring * dissolved the external PolicyEngine.py into policy_receiver.py - kept only the web-socket communication to PolicyEngine * new /healthcheck - shows some stats of service running * Unit Test coverage 75% Change-Id: I816b7d5713ae0dd88fa73d3656f272b4f3e7946e Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/PolicyEngine.py1219
-rw-r--r--policyhandler/config.py61
-rw-r--r--policyhandler/deploy_handler.py35
-rw-r--r--policyhandler/discovery.py30
-rw-r--r--policyhandler/onap/audit.py142
-rw-r--r--policyhandler/onap/health.py104
-rw-r--r--policyhandler/policy_consts.py2
-rw-r--r--policyhandler/policy_engine.py103
-rw-r--r--policyhandler/policy_handler.py13
-rw-r--r--policyhandler/policy_receiver.py195
-rw-r--r--policyhandler/policy_rest.py459
-rw-r--r--policyhandler/policy_updater.py107
-rw-r--r--policyhandler/policy_utils.py134
-rw-r--r--policyhandler/web_server.py215
14 files changed, 1075 insertions, 1744 deletions
diff --git a/policyhandler/PolicyEngine.py b/policyhandler/PolicyEngine.py
deleted file mode 100644
index 7a69894..0000000
--- a/policyhandler/PolicyEngine.py
+++ /dev/null
@@ -1,1219 +0,0 @@
-"""
-PolicyEngine API for Python
-
-@author: Tarun, Mike
-@version: 0.9
-@change:
- added Enum 'Other' Type and supported it in PolicyConfig class - 1/13
- supporting Remote URL capability to the PolicyEngine as the parameter - 1/13
- Request format has been updated accordingly. No need to send the PDP URLs anymore - 1/26
- Feature where the PolicyEngine chooses available PyPDP among different URLs. 1/26
- Major feature addition required for Notifications 2/17 , Fixed Session issues. 2/25
- Major change in API structure for combining results 3/4.
- Added Security support for Notifications and clearNotification Method 3/18
- newMethod for retrieving configuration using policyFileName 3/20
- logging 3/24
- Notification Bug Fixes 7/21
- basic Auth 7/22
- Notification Changes 9/3
- ECOMP Error codes included 10/1
- -2016
- Major Changes to the Policy Engine API 3/29
- DeletePolicy API and Changes to pushPolicy and getConfig 5/27
- ConfigRequestParmeters and Error code Change 7/11
- New Environment Variable and Client Authorizations Change 7/21
- Changes to the Policy Parameters 8/21
- Allow Clients to use their own Password Protection.
- Dictionary Types and its Fixes.
-"""
-
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import json,sys, os, collections, websocket , logging, time, base64, uuid
-from websocket import create_connection
-from enum import Enum
-from xml.etree.ElementTree import XML
-try:
- # For Python 3.0 and Later
- from pip._vendor import requests
-except ImportError:
- # For backend Support to Python 2's urllib2
- import requests
-try:
- # For Python 3.0 and Later
- from urllib.request import urlopen
-except ImportError:
- # Fall back for Python 2.*
- from urllib2 import urlopen
-try:
- import thread
-except ImportError:
- #Fall Back for Python 2.x
- import _thread as thread
-
-PolicyConfigStatus = Enum('PolicyConfigStatus', 'CONFIG_NOT_FOUND CONFIG_RETRIEVED')
-PolicyType = Enum('PolicyType', 'JSON XML PROPERTIES OTHER')
-PolicyResponseStatus = Enum('PolicyResponseStatus', 'ACTION_ADVISED ACTION_TAKEN NO_ACTION_REQUIRED')
-NotificationType = Enum('NotificationType', 'BOTH UPDATE REMOVE')
-UpdateType = Enum('UpdateType', 'UPDATE NEW')
-NotificationScheme = Enum('NotificationScheme', 'AUTO_ALL_NOTIFICATIONS AUTO_NOTIFICATIONS MANUAL_ALL_NOTIFICATIONS MANUAL_NOTIFICATIONS')
-AttributeType = Enum('AttributeType', 'MATCHING MICROSERVICE RULE SETTINGS')
-PolicyClass = Enum('PolicyClass', 'Action Config Decision')
-PolicyConfigType = Enum('PolicyConfigType', 'Base BRMS_PARAM BRMS_RAW ClosedLoop_Fault ClosedLoop_PM Firewall MicroService Extended')
-DeletePolicyCondition = Enum('DeletePolicyCondition', 'ONE ALL')
-RuleProvider = Enum('RuleProvider', 'Custom AAF GUARD_YAML')
-DictionaryType = Enum('DictionaryType', 'Common Action ClosedLoop Firewall Decision BRMS GOC MicroService DescriptiveScope PolicyScope Enforcer SafePolicy' )
-ImportType = Enum('ImportType', 'MICROSERVICE')
-
-class PolicyEngine:
- """
- PolicyEngine is the Class which needs to be instantiated to call the PDP server.
- It needs the *.properties* file path as the parameter to the constructor.
- """
- def __init__(self, filename, scheme=None, handler=None, clientKey=None, basic_client_auth=True):
- """
- @param filename: String format of the path location of .properties file Could also be A remote URL eg: http://localhost:8080/config.properties
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- @param clientKey: Decoded Client Key to be used by PolicyEngine.
- @attention:The .properties file must contain the PYPDP_URL parameter in it. The parameter can have multiple URLs the PolicyEngine chooses the available PyPDP among them.
- """
- self.filename = filename
- self.urldict = {}
- self.matchStore = []
- self.autoURL = None
- self.scheme = None
- self.handler = None
- self.thread = thread
- self.autows = None
- self.mclose = False
- self.restart = False
- self.logger = logging.getLogger()
- self.resturl= []
- self.encoded= []
- self.clientInfo = None
- self.environment = None
- self.policyheader = {}
- if(filename.startswith("http")):
- try:
- policy_data = urlopen(filename)
- for line in policy_data :
- line = line.decode('utf-8')
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- #print("key= "+key+" Value =" +value )
- self.urldict[key] = value
- except:
- self.logger.error("PE300 - Data Issue: Error with the Config URL: %s" , filename )
- print("PE300 - Data Issue: Config Properties URL Error")
- sys.exit(0)
- else :
- fileExtension = os.path.splitext(filename)
- if(fileExtension[1]!=".properties"):
- self.logger.error("PE300 - Data Issue: File is not in properties format: %s", filename)
- print("PE300 - Data Issue: Not a .properties file!")
- sys.exit(0)
- try :
- with open(self.filename, 'r') as f:
- for line in f:
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- # self.logger.info("key=%s Value=%s", key, value)
- self.urldict[key] = value
- except FileNotFoundError:
- self.logger.error("PE300 - Data Issue: File Not found: %s", filename)
- print("PE300 - Data Issue: File Doesn't exist in the given Location")
- sys.exit(0)
- #TODO logic for best available PyPDP servers
- try:
- self.urldict = collections.OrderedDict(sorted(self.urldict.items()))
- clientID = self.urldict.get("CLIENT_ID")
- # self.logger.info("clientID decoded %s", base64.b64decode(clientID).decode("utf-8"))
- # client_parts = base64.b64decode(clientID).split(":")
- # client_parts = clientID.split(":")
- # self.logger.info("ClientAuth:Basic %s", base64.b64encode(clientID))
- # self.logger.info("CLIENT_ID[0] = %s", client_parts[0])
- # self.logger.info("CLIENT_ID[0] base64 = %s", base64.b64encode(client_parts[0]))
- # self.logger.info("CLIENT_KEY base64 = %s", base64.b64encode(client_parts[1]))
- if(clientKey is None):
- try:
- client = base64.b64decode(self.urldict.get("CLIENT_KEY")).decode("utf-8")
- except Exception:
- self.logger.warn("PE300 - Data Issue: CLIENT_KEY parameter is not in the required encoded Format taking Value as clear Text")
- client = self.urldict.get("CLIENT_KEY")
- else:
- client = clientKey
- if(clientID is None or client is None):
- self.logger.error("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file")
- sys.exit(0)
- else:
- uid = clientID.encode('ascii')
- password = client.encode('ascii')
- self.clientInfo = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.environment = self.urldict.get("ENVIRONMENT")
- if(self.environment is None):
- self.logger.info("Missing Environment Variable setting to Default Value.")
- self.environment = "DEVL"
- self.policyheader = {
- "Content-type" : "application/json",
- "Accept" : "application/json",
- "ClientAuth" : ("Basic " if basic_client_auth else "") + self.clientInfo,
- "Environment" : self.environment
- }
- for key in self.urldict.keys():
- if(key.startswith("PYPDP_URL")):
- pypdpVal = self.urldict.get(key)
- if pypdpVal is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- if ";" in pypdpVal:
- pdpDefault = pypdpVal.split(";")
- if pdpDefault is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- else:
- for count in range(0, len(pdpDefault)):
- self.__pdpParam(pdpDefault[count])
- else:
- self.__pdpParam(pypdpVal)
-
- self.logger.info("PolicyEngine url: %s policyheader: %s urldict: %s", \
- self.resturl, json.dumps(self.policyheader), json.dumps(self.urldict))
- if len(self.resturl)==0:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- except:
- self.logger.error("PE300 - Data Issue: missing parameter(s) in the properties file: %s ", filename)
- print("PE300 - Data Issue: missing parameter(s) in the properties file")
- sys.exit(0)
- # Scheme and Handler code to be handled from here.
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("handler should be a object of NotificationHandler class")
- # sys.exit(0)
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- self.logger.error("PE300 - Data Issue: Scheme not a type of NotificationScheme: %s", scheme.name)
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
- sys.exit(0)
-
- def __pdpParam(self,pdpValue):
- """
- Internal Usage for reading PyPDP Parameters
- """
- if pdpValue is None:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- elif "," in pdpValue:
- pdpValues = pdpValue.split(",")
- if (len(pdpValues)==3):
- # 0 is pypdp URL
- self.resturl.append(pdpValues[0])
- # 1 and 2 are user name password
- if pdpValues[1] and pdpValues[2]:
- uid = pdpValues[1].encode('ascii')
- password = pdpValues[2].encode('ascii')
- encoded = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.encoded.append(encoded)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
-
- def getConfigByPolicyName(self, policyName, requestID=None):
- """
- @param policyName: String format of the PolicyFile Name whose configuration is required.
- @return: Returns a List of PolicyConfig Object(s).
- @deprecated: use getConfig instead.
- """
- __policyNameURL = "/getConfigByPolicyName"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__policyNamejson = {}
- self.__policyNamejson['policyName'] = policyName
- self.__cpnResponse = self.__callPDP(__policyNameURL, json.dumps(self.__policyNamejson), __headers, "POST")
- self.__cpnJSON = self.__cpnResponse.json()
- policyConfigs= self.__configResponse(self.__cpnJSON)
- return policyConfigs
-
- def listConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- listConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyNames.
- """
- __configURL = "/listConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- __configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- __cResponse = self.__callPDP(__configURL, json.dumps(__configjson), __headers, "POST")
- return __cResponse.json()
-
-
- def getConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- getConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyConfig Object(s).
- """
- __configURL = "/getConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- self.__cResponse = self.__callPDP(__configURL, json.dumps(self.__configjson), __headers, "POST")
- #self.__configURL = self.resturl+__configURL
- #self.__cResponse = requests.post(self.__configURL, data=json.dumps(self.__configjson), headers = __headers)
- self.__cJSON = self.__cResponse.json()
- policyConfigs= self.__configResponse(self.__cJSON)
- # if we have successfully retrieved a policy we will store the match values.
- matchFound = False
- for policyConfig in policyConfigs:
- if policyConfig._policyConfigStatus == PolicyConfigStatus.CONFIG_RETRIEVED.name:
- # self.logger.info("Policy has been Retrieved !!")
- matchFound = True
- if matchFound:
- __match = {}
- __match["ECOMPName"] = eCOMPComponentName
- if configName is not None:
- __match["ConfigName"] = configName
- if configAttributes is not None:
- __match.update(configAttributes)
- if not self.matchStore:
- self.matchStore.append(__match)
- else:
- __booMatch = False
- for eachDict in self.matchStore:
- if eachDict==__match:
- __booMatch = True
- break
- if __booMatch==False:
- self.matchStore.append(__match)
- return policyConfigs
-
- def __configRequestParametersJSON(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False):
- """ Internal Function to set JSON from configRequestParameters
- """
- json= {}
- if eCOMPComponentName is not None:
- json['ecompName'] = eCOMPComponentName
- if configName is not None:
- json['configName'] = configName
- if configAttributes is not None:
- json['configAttributes'] = configAttributes
- if policyName is not None:
- json['policyName'] = policyName
- json['unique'] = unique
- return json
-
- def __configResponse(self, cJSON):
- """
- Internal function to take the convert JSON to Response Object.
- """
- policyConfigs=[]
- for configJSON in cJSON:
- policyConfig = PolicyConfig()
- policyConfig._policyConfigMessage = configJSON['policyConfigMessage']
- policyConfig._policyConfigStatus = configJSON['policyConfigStatus']
- policyConfig._policyType = configJSON['type']
- policyConfig._policyName = configJSON['policyName']
- policyConfig._policyVersion = configJSON['policyVersion']
- policyConfig._matchingConditions = configJSON['matchingConditions']
- policyConfig._responseAttributes = configJSON['responseAttributes']
- if PolicyType.JSON.name == policyConfig._policyType:
- policyConfig._json = configJSON['config']
- elif PolicyType.XML.name == policyConfig._policyType:
- policyConfig._xml = XML(configJSON['config'])
- elif PolicyType.PROPERTIES.name == policyConfig._policyType:
- policyConfig._properties = configJSON['property']
- elif PolicyType.OTHER.name == policyConfig._policyType:
- policyConfig._other = configJSON['config']
- policyConfigs.append(policyConfig)
- return policyConfigs
-
- def getDecision(self, decisionAttributes, ecompcomponentName, requestID = None):
- """
- getDecision function sends the Decision Attributes to the PDP server and gets the response to the client from PDP.
- @param decisionAttributes: Dictionary of Decision Attributes in Key and Value String formats.
- @param ecompcomponentName:
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a DecisionResponse Object.
- """
- __decisionurl = "/getDecision"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__decisionjson={}
- self.__decisionjson['decisionAttributes'] = decisionAttributes
- self.__decisionjson['ecompcomponentName'] = ecompcomponentName
- self.__dResponse = self.__callPDP(__decisionurl, json.dumps(self.__decisionjson), __headers, "POST")
- self.__dJSON = self.__dResponse.json()
- decisionResponse = DecisionResponse()
- decisionResponse._decision = self.__dJSON['decision']
- decisionResponse._details = self.__dJSON['details']
- return decisionResponse
-
- def sendEvent(self, eventAttributes, requestID=None):
- """
- sendEvent function sends the Event to the PDP server and gets the response to the client from the PDP.
- @param eventAttributes:Dictonary of the EventAttributes in Key and Value String formats.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyResponse Object(s).
- """
- __eventurl = "/sendEvent"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__eventjson = {}
- self.__eventjson['eventAttributes'] = eventAttributes
- #self.__eventjson['pdp_URL'] = self.pdp_url
- self.__eResponse = self.__callPDP(__eventurl, json.dumps(self.__eventjson), __headers, "POST")
- #self.__eventurl = self.resturl+__eventurl
- #self.__eResponse = requests.post(self.__eventurl, data=json.dumps(self.__eventjson), headers = __headers)
- self.__eJSON = self.__eResponse.json()
- policyResponses=[]
- for eventJSON in self.__eJSON:
- policyResponse = PolicyResponse()
- policyResponse._policyResponseMessage = eventJSON['policyResponseMessage']
- policyResponse._policyResponseStatus = eventJSON['policyResponseStatus']
- policyResponse._actionAdvised = eventJSON['actionAdvised']
- policyResponse._actionTaken = eventJSON['actionTaken']
- policyResponse._requestAttributes = eventJSON['requestAttributes']
- policyResponses.append(policyResponse)
- return policyResponses
-
- def createPolicy(self, policyParameters):
- """
- 'createPolicy creates Policy using the policyParameters sent'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __createurl = "/createPolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__createJson = {}
- self.__createJson = self.__policyParametersJSON(policyParameters)
- self.__createResponse = self.__callPDP(__createurl, json.dumps(self.__createJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__createResponse.status_code
- policyChangeResponse._responseMessage = self.__createResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def updatePolicy(self, policyParameters):
- """
- 'updatePolicy updates Policy using the policyParameters sent.'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __updateurl = "/updatePolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__updateJson = {}
- self.__updateJson = self.__policyParametersJSON(policyParameters)
- self.__updateResponse = self.__callPDP(__updateurl, json.dumps(self.__updateJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__updateResponse.status_code
- policyChangeResponse._responseMessage = self.__updateResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def __policyParametersJSON(self, policyParameters):
- """ Internal Function to set JSON from policyParameters Object
- """
- json= {}
- if policyParameters._actionAttribute is not None:
- json['actionAttribute'] = policyParameters._actionAttribute
- if policyParameters._actionPerformer is not None:
- json['actionPerformer'] = policyParameters._actionPerformer
- if policyParameters._attributes is not None:
- json['attributes'] = policyParameters._attributes
- if policyParameters._configBody is not None:
- json['configBody'] = policyParameters._configBody
- if policyParameters._configBodyType is not None:
- json['configBodyType'] = policyParameters._configBodyType
- if policyParameters._configName is not None:
- json['configName'] = policyParameters._configName
- if policyParameters._controllerName is not None:
- json['controllerName'] = policyParameters._controllerName
- if policyParameters._dependencyNames is not None:
- json['dependencyNames'] = policyParameters._dependencyNames
- if policyParameters._dynamicRuleAlgorithmLabels is not None:
- json['dynamicRuleAlgorithmLabels'] = policyParameters._dynamicRuleAlgorithmLabels
- if policyParameters._dynamicRuleAlgorithmField1 is not None:
- json['dynamicRuleAlgorithmField1'] = policyParameters._dynamicRuleAlgorithmField1
- if policyParameters._dynamicRuleAlgorithmField2 is not None:
- json['dynamicRuleAlgorithmField2'] = policyParameters._dynamicRuleAlgorithmField2
- if policyParameters._dynamicRuleAlgorithmFunctions is not None:
- json['dynamicRuleAlgorithmFunctions'] = policyParameters._dynamicRuleAlgorithmFunctions
- if policyParameters._ecompName is not None:
- json['ecompName'] = policyParameters._ecompName
- if policyParameters._extendedOption is not None:
- json['extendedOption'] = policyParameters._extendedOption
- if policyParameters._guard is not None:
- json['guard'] = policyParameters._guard
- if policyParameters._policyClass is not None:
- json['policyClass'] = policyParameters._policyClass
- if policyParameters._policyConfigType is not None:
- json['policyConfigType'] = policyParameters._policyConfigType
- if policyParameters._policyName is not None:
- json['policyName'] = policyParameters._policyName
- if policyParameters._policyDescription is not None:
- json['policyDescription'] = policyParameters._policyDescription
- if policyParameters._priority is not None:
- json['priority'] = policyParameters._priority
- if policyParameters._requestID is not None:
- json['requestID'] = policyParameters._requestID
- if policyParameters._riskLevel is not None:
- json['riskLevel'] = policyParameters._riskLevel
- if policyParameters._riskType is not None:
- json['riskType'] = policyParameters._riskType
- if policyParameters._ruleProvider is not None:
- json['ruleProvider'] = policyParameters._ruleProvider
- if policyParameters._ttlDate is not None:
- json['ttlDate'] = policyParameters._ttlDate
- return json
-
- def pushPolicy(self, pushPolicyParameters, requestID = None):
- """
- 'pushPolicy pushes a policy based on the given Push Policy Parameters. '
- @param pushPolicyParameters: This is an object of PushPolicyParameters which is required as a parameter to this method.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a PolicyChangeResponse Object
- """
- __pushurl = "/pushPolicy"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- try:
- self.__pushJson = {}
- self.__pushJson['pdpGroup'] = pushPolicyParameters._pdpGroup
- self.__pushJson['policyName'] = pushPolicyParameters._policyName
- self.__pushJson['policyType'] = pushPolicyParameters._policyType
- self.__pushResponse = self.__callPDP(__pushurl, json.dumps(self.__pushJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__pushResponse.status_code
- policyChangeResponse._responseMessage = self.__pushResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the pushPolicyParameters Object. It needs to be object of PushPolicyParameters ")
- print("PE300 - Data Issue: pushPolicyParamters object Error")
-
- def deletePolicy(self, deletePolicyParameters):
- """
- 'deletePolicy Deletes a policy or all its version according to the given deletePolicyParameters'
- @param deletePolicyParameters: This is an Object of DeletePolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __deleteurl = "/deletePolicy"
- __createdictionaryurl = "/createDictionaryItem"
- __headers = self.policyheader
- try:
- if deletePolicyParameters._requestID is None:
- deletePolicyParameters._requestID = str(uuid.uuid4())
- self.__deleteJson = {}
- self.__deleteJson['deleteCondition'] = deletePolicyParameters._deleteCondition
- self.__deleteJson['pdpGroup'] = deletePolicyParameters._pdpGroup
- self.__deleteJson['policyComponent'] = deletePolicyParameters._policyComponent
- self.__deleteJson['policyName'] = deletePolicyParameters._policyName
- self.__deleteJson['policyType'] = deletePolicyParameters._policyType
- self.__deleteJson['requestID'] = deletePolicyParameters._requestID
- self.__deleteResponse = self.__callPDP(__deleteurl, json.dumps(self.__deleteJson), self.policyheader, "DELETE")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__deleteResponse.status_code
- policyChangeResponse._responseMessage = self.__deleteResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the deletePolicyParameters Object. It needs to be object of DeletePolicyParameters ")
- print("PE300 - Data Issue: deletePolicyParameters object Error")
-
- def createDictionaryItems(self, dictionaryParameters):
- """
- 'createDictionaryItems adds dictionary items to the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __createdictionaryurl = '/createDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__createResponse = self.__callPDP(__createdictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__createResponse.status_code
- dictionaryResponse._responseMessage = self.__createResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
-
- def updateDictionaryItems(self, dictionaryParameters):
- """
- 'updateDictionaryItems edits dictionary items in the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __updatedictionaryurl = '/updateDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__updateResponse = self.__callPDP(__updatedictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__updateResponse.status_code
- dictionaryResponse._responseMessage = self.__updateResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getDictionaryItems(self, dictionaryParameters):
- """
- 'getDictionaryItems gets all the dictionary items stored in the database for a specified dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method.
- @return: Returns a DictionaryResponse object
- """
- __retrievedictionaryurl = "/getDictionaryItems"
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json = {}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__getResponse = self.__callPDP(__retrievedictionaryurl, json.dumps(self.__json), __headers, "POST")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__getResponse.status_code
- dictionaryResponse._responseMessage = self.__getResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getNotification(self):
- """
- gets the PDPNotification if the appropriate NotificationScheme is selected.
- @return: Returns a PDPNotification Object.
- """
- if ((self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # Manual Client for websocket Code in here.
- if(self.resturl[0].startswith("https")):
- __man_url = self.resturl[0].replace("https","wss")+"notifications"
- else:
- __man_url = self.resturl[0].replace("http","ws")+"notifications"
- __result = self.__manualRequest(__man_url)
- self.logger.debug("Manual Notification with server: %s \n result is: %s" , __man_url , __result)
- # TODO convert the result to PDP Notifications.
- if (self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name):
- # need to add all the values to the PDPNotification..
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if __result is None:
- return None
- if __result['removedPolicies']:
- removedPolicies = []
- for removed in __result['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if __result['loadedPolicies']:
- updatedPolicies = []
- for updated in __result['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
- elif (self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name):
- return self.__checkNotification(__result)
- else:
- return None
-
- def setNotification(self, scheme, handler = None):
- """
- setNotification allows changes to the NotificationScheme and the NotificationHandler.
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- """
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("Error: handler should be a object of NotificationHandler class")
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- self.__startAuto()
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
-
- def clearNotification(self):
- """
- clearNotification ShutsDown the AutoNotification service if running.
- """
- if self.scheme is not None:
- if((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.logger.info("Notification Service Stopped.")
- print("Notification Service is Stopped!!")
-
- def __callPDP(self,urlFunction, jsonData, headerData,method, files= None, params = None):
- """
- This function call is for internal usage purpose only.
- Calls the available PyPDP
- """
- connected = False
- response = None
- errormessage = ''
- for count in range(0, len(self.resturl)):
- try:
- logging.basicConfig(level=logging.DEBUG)
- request_url = self.resturl[0]+ urlFunction
- self.logger.debug("--- Sending Request to : %s",request_url)
- try:
- self.logger.debug("Request ID %s :",headerData["X-ECOMP-RequestID"])
- except:
- if jsonData is not None:
- self.logger.debug("Request ID %s :",json.loads(jsonData)['requestID'])
- self.logger.debug("Request Data is: %s" ,jsonData)
- headerData["Authorization"]= "Basic " + self.encoded[0]
- if(method=="PUT"):
- response = requests.put(request_url, data = jsonData, headers = headerData)
- elif(method=="DELETE"):
- response = requests.delete(request_url, data = jsonData, headers = headerData)
- elif(method=="POST"):
- if params is not None:
- # files = files, params = params,
- response = requests.post(request_url, params = params, headers = headerData)
- else:
- response = requests.post(request_url, data = jsonData, headers = headerData)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #response = requests.post(request_url, data = jsonData, headers = headerData, verify=False)
- self.logger.debug("--- Response is : ---")
- self.logger.debug(response.status_code)
- self.logger.debug(response.headers)
- self.logger.debug(response.text)
- if(response.status_code == 200) :
- connected = True
- self.logger.info("connected to the PyPDP: %s", request_url)
- break
- elif(response.status_code == 202) :
- connected = True
- break
- elif(response.status_code == 400):
- self.logger.debug("PE400 - Schema Issue: Incorrect Params passed: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE400 - Schema Issue: Incorrect Params passed: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 401):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 403):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- else:
- self.logger.debug("PE200 - System Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- except Exception as e:
- print(str(e));
- self.logger.debug("PE200 - System Error: PyPDP Error: %s", self.resturl[0])
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- if(connected):
- if(self.autoURL==None):
- self.__startAuto()
- elif(self.autoURL!= self.resturl[0]):
- self.__startAuto()
- return response
- else:
- self.logger.error("PE200 - System Error: cannot connect to given PYPDPServer(s) %s", self.resturl)
- print(errormessage)
- sys.exit(0)
-
- def __rotatePDP(self):
- self.resturl = collections.deque(self.resturl)
- self.resturl.rotate(-1)
- self.encoded = collections.deque(self.encoded)
- self.encoded.rotate(-1)
-
- def __checkNotification(self, resultJson):
- """
- This function call is for Internal usage purpose only.
- Checks the Notification JSON compares it with the MatchStore and returns the PDPNotification object.
- """
- if not resultJson:
- return None
- if not self.matchStore:
- return None
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if resultJson['removedPolicies']:
- removedPolicies = []
- for removed in resultJson['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if resultJson['updatedPolicies']:
- updatedPolicies = []
- for updated in resultJson['updatedPolicies']:
- updatedPolicy = LoadedPolicy()
- # check if it has matches then it is a Config Policy and compare it with Match Store.
- if updated['matches']:
- # compare the matches with our Stored Matches
- for eachDict in self.matchStore:
- if eachDict==updated['matches']:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- else:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- pDPNotification._loadedPolicies= updatedPolicies
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
-
- def __startAuto(self):
- """
- Starts the Auto Notification Feature..
- """
- if self.scheme is not None:
- if ((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.handler is None:
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- if self.autoURL is None:
- self.autoURL = self.resturl[0]
- elif self.autoURL != self.resturl[0]:
- self.autoURL = self.resturl[0]
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- self.autows = None
- if self.autows is None:
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
- elif self.autows.sock is not None:
- if not (self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.restart = True
- self.__rotatePDP()
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
-
- else:
- #stop the Auto Notification Service if it is running.
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
-
- def __onEvent(self, message):
- """
- Handles the event Notification received.
- """
- message = json.loads(message)
- if self.handler is not None:
- if (self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name):
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if message['removedPolicies']:
- removedPolicies = []
- for removed in message['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if message['loadedPolicies']:
- updatedPolicies = []
- for updated in message['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- # call the Handler.
- self.handler.notificationReceived(pDPNotification)
- elif (self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name):
- # call the handler
- self.handler(self.__checkNotification(message))
-
- def __manualRequest(self,request_url):
- """
- Takes the request_URL given and returns the JSON response back to the Caller.
- """
- ws = create_connection(request_url)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #ws = create_connection(request_url, sslopt={"cert_reqs": ssl.CERT_NONE})
- ws.send("Manual")
- try:
- return json.loads(ws.recv())
- except:
- return None
- ws.close()
- ws.shutdown()
-
- def __onMessage(self, ws,message):
- """Occurs on Event
- """
- self.logger.info("Received AutoNotification message: %s" , message)
- self.__onEvent(message)
-
- def __onError(self, ws, error):
- """Self Restart the Notification Service on Error
- """
- self.logger.error("PE500 - Process Flow Issue: Auto Notification service Error!! : %s" , error)
-
- def __onclose(self, ws):
- """Occurs on Close ? Try to start again in case User didn't do it.
- """
- self.logger.debug("Connection has been Closed. ")
- if not self.mclose:
- self.__startAuto()
- self.mclose = False
-
- def __autoRequest(self, request_url):
- """
- Takes the request_URL and invokes the PolicyEngine method on any receiving a Message
- """
- websocket.enableTrace(True)
- self.autows = websocket.WebSocketApp(request_url, on_message= self.__onMessage, on_close= self.__onclose, on_error= self.__onError)
- # wait for to 5 seconds to restart
- if self.restart:
- time.sleep(5)
- self.autows.run_forever()
-
-class NotificationHandler:
- """
- 'Defines the methods which need to run when an Event or a Notification is received.'
- """
- def notificationReceived(self, notification):
- """
- Will be triggered automatically whenever a Notification is received by the PEP
- @param notification: PDPNotification object which has the information of the Policies.
- @attention: This method must be implemented by the user for AUTO type NotificationScheme
- """
- raise Exception("Unimplemented abstract method: %s" % __functionId(self, 1))
-
-def __functionId(obj, nFramesUp):
- """ Internal Usage only..
- Create a string naming the function n frames up on the stack. """
- fr = sys._getframe(nFramesUp+1)
- co = fr.f_code
- return "%s.%s" % (obj.__class__, co.co_name)
-
-class PolicyConfig:
- """
- 'PolicyConfig is the return object resulted by getConfig Call.'
- """
- def __init__(self):
- self._policyConfigMessage = None
- self._policyConfigStatus = None
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._responseAttributes = None
- self._policyType = None
- self._json = None
- self._xml = None
- self._prop = None
- self._other = None
-
-class PolicyResponse:
- """
- 'PolicyResponse is the return object resulted by sendEvent Call.'
- """
- def __init__(self):
- self._policyResponseStatus = None
- self._policyResponseMessage = None
- self._requestAttributes = None
- self._actionTaken = None
- self._actionAdvised= None
-
-class PDPNotification:
- """
- 'Defines the Notification Event sent from the PDP to PEP Client.'
- """
- def __init__(self):
- self._removedPolicies = None
- self._loadedPolicies = None
- self._notificationType = None
-
-class RemovedPolicy:
- """
- 'Defines the structure of the Removed Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
-
-class LoadedPolicy:
- """
- 'Defines the Structure of the Loaded Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._updateType = None
-
-class PolicyParameters:
- """
- 'Defines the Structure of the Policy to Create or Update'
- """
- def __init__(self):
- self._actionPerformer = None
- self._actionAttribute = None
- self._attributes = None
- self._configBody = None
- self._configBodyType = None
- self._configName = None
- self._controllerName = None
- self._dependencyNames = None
- self._dynamicRuleAlgorithmLabels = None
- self._dynamicRuleAlgorithmFunctions = None
- self._dynamicRuleAlgorithmField1 = None
- self._dynamicRuleAlgorithmField2 = None
- self._ecompName = None
- self._extendedOption = None
- self._guard = None
- self._policyClass = None
- self._policyConfigType = None
- self._policyName = None
- self._policyDescription = None
- self._priority = None
- self._requestID = None
- self._riskLevel = None
- self._riskType = None
- self._ruleProvider = None
- self._ttlDate = None
-
-class PushPolicyParameters:
- """
- 'Defines the Structure of the Push Policy Parameters'
- """
- def __init__(self):
- self._pdpGroup = None
- self._policyName = None
- self._policyType = None
-
-class PolicyChangeResponse:
- """
- 'Defines the Structure of the policy Changes made from PDP'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
-
-class DeletePolicyParameters:
- """
- 'Defines the Structure of the Delete Policy Parameters'
- """
- def __init__(self):
- self._deleteCondition = None
- self._pdpGroup = None
- self._policyComponent = None
- self._policyName = None
- self._policyType = None
- self._requestID = None
-
-class DictionaryParameters:
- """
- 'Defines the Structure of the Dictionary Parameters'
- """
- def __init__(self):
- self._dictionaryType = None
- self._dictionary = None
- self._dictionaryJson = None
- self._requestID = None
-
-class DictionaryResponse:
- """
- 'Defines the Structure of the dictionary response'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
- self._dictionaryJson = None
- self._dictionaryData = None
-
-class DecisionResponse:
- """
- 'Defines the Structure of Decision Response'
- """
- def __init__(self):
- self._decision = None
- self._details = None
-
-class ImportParameters:
- """
- 'Defines the Structure of Policy Model Import'
- """
- def __init__(self):
- self._serviceName = None
- self._description = None
- self._requestID = None
- self._filePath = None
- self._importBody = None
- self._version = None
- self._importType = None
diff --git a/policyhandler/config.py b/policyhandler/config.py
index 7033096..a36f032 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -22,8 +22,6 @@
import os
import json
import copy
-import re
-import base64
import logging
import logging.config
@@ -65,11 +63,7 @@ class Config(object):
"""find the name of the policy-handler system
to be used as the key in consul-kv for config of policy-handler
"""
- system_name = None
- if Config.config:
- system_name = Config.config.get(Config.FIELD_SYSTEM)
-
- return system_name or Config.SERVICE_NAME_POLICY_HANDLER
+ return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER)
@staticmethod
def discover():
@@ -88,19 +82,6 @@ class Config(object):
Config._logger.debug("merged config from discovery: %s", json.dumps(Config.config))
@staticmethod
- def upload_to_discovery():
- """upload the current config settings to the discovery service"""
- if not Config.config or not isinstance(Config.config, dict):
- Config._logger.error("unexpected config: %s", Config.config)
- return
-
- discovery_key = Config.get_system_name()
- latest_config = json.dumps({Config.SERVICE_NAME_POLICY_HANDLER:Config.config})
- DiscoveryClient.put_kv(discovery_key, latest_config)
- Config._logger.debug("uploaded config to discovery(%s): %s", \
- discovery_key, latest_config)
-
- @staticmethod
def load_from_file(file_path=None):
"""read and store the config from config file"""
if not file_path:
@@ -123,43 +104,3 @@ class Config(object):
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
return True
-
-class PolicyEngineConfig(object):
- """main config of the application"""
- # PATH_TO_PROPERTIES = r'logs/policy_engine.properties'
- PATH_TO_PROPERTIES = r'tmp/policy_engine.properties'
- PYPDP_URL = "PYPDP_URL = {0}{1}, {2}, {3}\n"
- CLIENT_ID = "CLIENT_ID = {0}\n"
- CLIENT_KEY = "CLIENT_KEY = {0}\n"
- ENVIRONMENT = "ENVIRONMENT = {0}\n"
- _logger = logging.getLogger("policy_handler.pe_config")
-
- @staticmethod
- def save_to_file():
- """create the policy_engine.properties for policy-engine client"""
- file_path = PolicyEngineConfig.PATH_TO_PROPERTIES
-
- try:
- config = Config.config[Config.FIELD_POLICY_ENGINE]
- headers = config["headers"]
- remove_basic = re.compile(r"(^Basic )")
- client_auth = headers["ClientAuth"]
- basic_client_auth = bool(remove_basic.match(client_auth))
- client_parts = base64.b64decode(remove_basic.sub("", client_auth)).split(":")
- auth_parts = base64.b64decode(remove_basic.sub("", headers["Authorization"])).split(":")
-
- props = PolicyEngineConfig.PYPDP_URL.format(config["url"], config["path_pdp"],
- auth_parts[0], auth_parts[1])
- props += PolicyEngineConfig.CLIENT_ID.format(client_parts[0])
- props += PolicyEngineConfig.CLIENT_KEY.format(base64.b64encode(client_parts[1]))
- props += PolicyEngineConfig.ENVIRONMENT.format(headers["Environment"])
-
- with open(file_path, 'w') as prp_file:
- prp_file.write(props)
- PolicyEngineConfig._logger.info("created %s basic_client_auth %s",
- file_path, basic_client_auth)
- return basic_client_auth
- except IOError:
- PolicyEngineConfig._logger.error("failed to save to %s", file_path)
- except KeyError:
- PolicyEngineConfig._logger.error("unexpected config for %s", Config.FIELD_POLICY_ENGINE)
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 1d50fc3..a641a95 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -59,14 +59,27 @@ class DeployHandler(object):
DeployHandler._target_entity = Config.config["deploy_handler"]
DeployHandler._url = DiscoveryClient.get_service_url(DeployHandler._target_entity)
- DeployHandler._url_path = DeployHandler._url + '/policy'
+ DeployHandler._url_path = (DeployHandler._url or "") + '/policy'
DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url)
@staticmethod
- def policy_update(audit, latest_policies):
- """ post policy_updated message to deploy-handler """
+ def policy_update(audit, latest_policies, removed_policies=None,
+ errored_policies=None, catch_up=False):
+ """post policy_updated message to deploy-handler"""
+ if not latest_policies and not removed_policies and not catch_up:
+ return
+
+ latest_policies = latest_policies or {}
+ removed_policies = removed_policies or {}
+ errored_policies = errored_policies or {}
+
DeployHandler._lazy_init()
- msg = {"latest_policies":latest_policies}
+ msg = {
+ "catch_up" : catch_up,
+ "latest_policies" : latest_policies,
+ "removed_policies" : removed_policies,
+ "errored_policies" : errored_policies
+ }
sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_path)
headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
@@ -74,10 +87,22 @@ class DeployHandler(object):
msg_str = json.dumps(msg)
headers_str = json.dumps(headers)
+ DeployHandler._logger.info(
+ "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]",
+ catch_up, len(latest_policies), len(removed_policies), len(errored_policies))
log_line = "post to deployment-handler {0} msg={1} headers={2}".format(
DeployHandler._url_path, msg_str, headers_str)
- sub_aud.metrics_start(log_line)
+
DeployHandler._logger.info(log_line)
+ sub_aud.metrics_start(log_line)
+
+ if not DeployHandler._url:
+ error_msg = "no url found to {0}".format(log_line)
+ DeployHandler._logger.error(error_msg)
+ sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ sub_aud.metrics(error_msg)
+ return
res = None
try:
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
index 7e16b90..33c3265 100644
--- a/policyhandler/discovery.py
+++ b/policyhandler/discovery.py
@@ -41,6 +41,8 @@ class DiscoveryClient(object):
CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}"
CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}"
SERVICE_MASK = "http://{0}:{1}"
+ SERVICE_ADDRESS = "ServiceAddress"
+ SERVICE_PORT = "ServicePort"
_logger = logging.getLogger("policy_handler.discovery")
@@ -51,23 +53,25 @@ class DiscoveryClient(object):
DiscoveryClient._logger.info("discover %s", service_path)
response = requests.get(service_path)
response.raise_for_status()
- service = response.json()[0]
- return DiscoveryClient.SERVICE_MASK.format( \
- service["ServiceAddress"], service["ServicePort"])
+ service = response.json()
+ if not service:
+ DiscoveryClient._logger.error("failed discover %s", service_path)
+ return
+ service = service[0]
+ return DiscoveryClient.SERVICE_MASK.format(
+ service[DiscoveryClient.SERVICE_ADDRESS], service[DiscoveryClient.SERVICE_PORT]
+ )
@staticmethod
def get_value(key):
"""get the value for the key from consul-kv"""
response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key))
response.raise_for_status()
- data = response.json()[0]
- value = base64.b64decode(data["Value"]).decode("utf-8")
- DiscoveryClient._logger.info("consul-kv key=%s data=%s value(%s)", \
- key, json.dumps(data), value)
+ data = response.json()
+ if not data:
+ DiscoveryClient._logger.error("failed get_value %s", key)
+ return
+ value = base64.b64decode(data[0]["Value"]).decode("utf-8")
+ DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s",
+ key, value, json.dumps(data))
return json.loads(value)
-
- @staticmethod
- def put_kv(key, value):
- """put the value under the key in consul-kv"""
- response = requests.put(DiscoveryClient.CONSUL_KV_MASK.format(key), data=value)
- response.raise_for_status()
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index a1df861..c338b76 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -27,14 +27,18 @@
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import os
+import sys
import json
import uuid
import time
import copy
+from datetime import datetime
from threading import Lock
from enum import Enum
+from pip import utils as pip_utils
from .CommonLogger import CommonLogger
+from .health import Health
REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID"
REQUEST_REMOTE_ADDR = "Remote-Addr"
@@ -44,6 +48,7 @@ HOSTNAME = "HOSTNAME"
AUDIT_REQUESTID = 'requestID'
AUDIT_IPADDRESS = 'IPAddress'
AUDIT_SERVER = 'server'
+AUDIT_TARGET_ENTITY = 'targetEntity'
HEADER_CLIENTAUTH = "clientauth"
HEADER_AUTHORIZATION = "authorization"
@@ -51,9 +56,10 @@ HEADER_AUTHORIZATION = "authorization"
class AuditHttpCode(Enum):
"""audit http codes"""
HTTP_OK = 200
- DATA_NOT_FOUND_ERROR = 400
PERMISSION_UNAUTHORIZED_ERROR = 401
PERMISSION_FORBIDDEN_ERROR = 403
+ RESPONSE_ERROR = 400
+ DATA_NOT_FOUND_ERROR = 404
SERVER_INTERNAL_ERROR = 500
SERVICE_UNAVAILABLE_ERROR = 503
DATA_ERROR = 1030
@@ -72,23 +78,25 @@ class AuditResponseCode(Enum):
@staticmethod
def get_response_code(http_status_code):
"""calculates the response_code from max_http_status_code"""
+ response_code = AuditResponseCode.UNKNOWN_ERROR
if http_status_code <= AuditHttpCode.HTTP_OK.value:
- return AuditResponseCode.SUCCESS
-
- if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \
- AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
- return AuditResponseCode.PERMISSION_ERROR
- if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
- return AuditResponseCode.AVAILABILITY_ERROR
- if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- return AuditResponseCode.BUSINESS_PROCESS_ERROR
- if http_status_code in [AuditHttpCode.DATA_ERROR.value, \
- AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
- return AuditResponseCode.DATA_ERROR
- if http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
- return AuditResponseCode.SCHEMA_ERROR
-
- return AuditResponseCode.UNKNOWN_ERROR
+ response_code = AuditResponseCode.SUCCESS
+
+ elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value,
+ AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
+ response_code = AuditResponseCode.PERMISSION_ERROR
+ elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
+ response_code = AuditResponseCode.AVAILABILITY_ERROR
+ elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR
+ elif http_status_code in [AuditHttpCode.DATA_ERROR.value,
+ AuditHttpCode.RESPONSE_ERROR.value,
+ AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
+ response_code = AuditResponseCode.DATA_ERROR
+ elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
+ response_code = AuditResponseCode.SCHEMA_ERROR
+
+ return response_code
@staticmethod
def get_human_text(response_code):
@@ -109,16 +117,23 @@ class Audit(object):
:kwargs: - put any request related params into kwargs
"""
_service_name = ""
+ _service_version = ""
_service_instance_UUID = str(uuid.uuid4())
+ _started = datetime.now()
_logger_debug = None
_logger_error = None
_logger_metrics = None
_logger_audit = None
+ _health = Health()
+ _py_ver = sys.version.replace("\n", "")
+ _packages = sorted([pckg.project_name + "==" + pckg.version
+ for pckg in pip_utils.get_installed_distributions()])
@staticmethod
- def init(service_name, config_file_path):
+ def init(service_name, service_version, config_file_path):
"""init static invariants and loggers"""
Audit._service_name = service_name
+ Audit._service_version = service_version
Audit._logger_debug = CommonLogger(config_file_path, "debug", \
instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
Audit._logger_error = CommonLogger(config_file_path, "error", \
@@ -128,6 +143,22 @@ class Audit(object):
Audit._logger_audit = CommonLogger(config_file_path, "audit", \
instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
+ @staticmethod
+ def health():
+ """returns json for health check"""
+ now = datetime.now()
+ return {
+ "service_name" : Audit._service_name,
+ "service_version" : Audit._service_version,
+ "service_instance_UUID" : Audit._service_instance_UUID,
+ "python" : Audit._py_ver,
+ "started" : str(Audit._started),
+ "now" : str(now),
+ "uptime" : str(now - Audit._started),
+ "stats" : Audit._health.dump(),
+ "packages" : Audit._packages
+ }
+
def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs):
"""create audit object per each request in the system
@@ -193,6 +224,13 @@ class Audit(object):
self.max_http_status_code = max(http_status_code, self.max_http_status_code)
self._lock.release()
+ def get_max_http_status_code(self):
+ """returns the highest(worst) http status code"""
+ self._lock.acquire()
+ max_http_status_code = self.max_http_status_code
+ self._lock.release()
+ return max_http_status_code
+
@staticmethod
def get_status_code(success):
"""COMPLETE versus ERROR"""
@@ -222,12 +260,24 @@ class Audit(object):
return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
- def get_response_code(self):
- """calculates the response_code from max_http_status_code"""
- self._lock.acquire()
- max_http_status_code = self.max_http_status_code
- self._lock.release()
- return AuditResponseCode.get_response_code(max_http_status_code)
+ def is_serious_error(self, status_code):
+ """returns whether the response_code is success and a human text for response code"""
+ return AuditResponseCode.PERMISSION_ERROR.value \
+ == AuditResponseCode.get_response_code(status_code).value \
+ or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value
+
+ def _get_response_status(self):
+ """calculates the response status fields from max_http_status_code"""
+ max_http_status_code = self.get_max_http_status_code()
+ response_code = AuditResponseCode.get_response_code(max_http_status_code)
+ success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ response_description = AuditResponseCode.get_human_text(response_code)
+ return success, max_http_status_code, response_code, response_description
+
+ def is_success(self):
+ """returns whether the response_code is success and a human text for response code"""
+ success, _, _, _ = self._get_response_status()
+ return success
def debug(self, log_line, **kwargs):
"""debug - the debug=lowest level of logging"""
@@ -275,46 +325,56 @@ class Audit(object):
def metrics(self, log_line, **kwargs):
"""debug+metrics - the metrics=sub-audit level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- response_code = self.get_response_code()
- success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ success, max_http_status_code, response_code, response_description = \
+ self._get_response_status()
metrics_func = None
+ timer = Audit.get_elapsed_time(self._metrics_started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
metrics_func = Audit._logger_metrics.info
+ Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
else:
log_line = "failed: {0}".format(log_line)
self.error(log_line, errorCode=response_code.value, \
- errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs)
+ errorDescription=response_description, **all_kwargs)
metrics_func = Audit._logger_metrics.error
+ Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
- metrics_func(log_line, begTime=self._metrics_start_event, \
- timer=Audit.get_elapsed_time(self._metrics_started), \
- statusCode=Audit.get_status_code(success), responseCode=response_code.value, \
- responseDescription=AuditResponseCode.get_human_text(response_code), \
- **all_kwargs)
+ metrics_func(log_line, begTime=self._metrics_start_event, timer=timer,
+ statusCode=Audit.get_status_code(success), responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs
+ )
self.metrics_start()
+ return (success, max_http_status_code, response_description)
def audit_done(self, result=None, **kwargs):
"""debug+audit - the audit=top level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- response_code = self.get_response_code()
- success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ success, max_http_status_code, response_code, response_description = \
+ self._get_response_status()
log_line = "{0} {1}".format(self.req_message, result or "").strip()
audit_func = None
+ timer = Audit.get_elapsed_time(self._started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
audit_func = Audit._logger_audit.info
+ Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
else:
log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value, \
- errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs)
+ self.error(log_line, errorCode=response_code.value,
+ errorDescription=response_description, **all_kwargs)
audit_func = Audit._logger_audit.error
+ Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer)
+
+ audit_func(log_line, begTime=self._start_event, timer=timer,
+ statusCode=Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs
+ )
- audit_func(log_line, begTime=self._start_event, \
- timer=Audit.get_elapsed_time(self._started), \
- statusCode=Audit.get_status_code(success), responseCode=response_code.value, \
- responseDescription=AuditResponseCode.get_human_text(response_code), \
- **all_kwargs)
+ return (success, max_http_status_code, response_description)
diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py
new file mode 100644
index 0000000..eefa7d2
--- /dev/null
+++ b/policyhandler/onap/health.py
@@ -0,0 +1,104 @@
+"""generic class to keep track of app health"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import uuid
+from threading import Lock
+from datetime import datetime
+
+class HealthStats(object):
+ """keep track of stats for calls"""
+ def __init__(self, name):
+ """keep track of stats for metrics calls"""
+ self._name = name or "stats_" + str(uuid.uuid4())
+ self._lock = Lock()
+ self._call_count = 0
+ self._error_count = 0
+ self._longest_timer = 0
+ self._total_timer = 0
+ self._last_success = None
+ self._last_error = None
+
+ def dump(self):
+ """returns dict of stats"""
+ dump = None
+ with self._lock:
+ dump = {
+ "call_count" : self._call_count,
+ "error_count" : self._error_count,
+ "last_success" : str(self._last_success),
+ "last_error" : str(self._last_error),
+ "longest_timer_millisecs" : self._longest_timer,
+ "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \
+ if self._call_count else 0)
+ }
+ return dump
+
+ def success(self, timer):
+ """records the successful execution"""
+ with self._lock:
+ self._call_count += 1
+ self._last_success = datetime.now()
+ self._total_timer += timer
+ if not self._longest_timer or self._longest_timer < timer:
+ self._longest_timer = timer
+
+ def error(self, timer):
+ """records the errored execution"""
+ with self._lock:
+ self._call_count += 1
+ self._error_count += 1
+ self._last_error = datetime.now()
+ self._total_timer += timer
+ if not self._longest_timer or self._longest_timer < timer:
+ self._longest_timer = timer
+
+class Health(object):
+ """Health stats for multiple requests"""
+ def __init__(self):
+ """Health stats for application"""
+ self._all_stats = {}
+ self._lock = Lock()
+
+ def _add_or_get_stats(self, stats_name):
+ """add to or get from the ever growing dict of HealthStats"""
+ stats = None
+ with self._lock:
+ stats = self._all_stats.get(stats_name)
+ if not stats:
+ self._all_stats[stats_name] = stats = HealthStats(stats_name)
+ return stats
+
+ def success(self, stats_name, timer):
+ """records the successful execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.success(timer)
+
+ def error(self, stats_name, timer):
+ """records the error execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.error(timer)
+
+ def dump(self):
+ """returns dict of stats"""
+ with self._lock:
+ stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems())
+
+ return stats
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 640b724..30fe9b2 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -24,5 +24,3 @@ POLICY_VERSION = "policyVersion"
POLICY_NAME = "policyName"
POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
-
-POLICY_GET_CONFIG = 'getConfig'
diff --git a/policyhandler/policy_engine.py b/policyhandler/policy_engine.py
deleted file mode 100644
index a0ff697..0000000
--- a/policyhandler/policy_engine.py
+++ /dev/null
@@ -1,103 +0,0 @@
-"""policy-engine-client communicates with policy-engine thru PolicyEngine client object"""
-
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import logging
-import re
-
-from .config import Config, PolicyEngineConfig
-from .onap.audit import Audit
-from .PolicyEngine import PolicyEngine, NotificationHandler, NotificationScheme
-from .policy_updater import PolicyUpdater
-
-class PolicyNotificationHandler(NotificationHandler):
- """handler of the policy-engine push notifications"""
- _logger = logging.getLogger("policy_handler.policy_notification")
-
- def __init__(self, policy_updater):
- scope_prefixes = [scope_prefix.replace(".", "[.]")
- for scope_prefix in Config.config["scope_prefixes"]]
- self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
- PolicyNotificationHandler._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
- self._policy_updater = policy_updater
- self._policy_updater.start()
-
- def notificationReceived(self, notification):
- if not notification or not notification._loadedPolicies:
- return
-
- policy_names = [loaded._policyName
- for loaded in notification._loadedPolicies
- if self._policy_scopes.match(loaded._policyName)]
-
- if not policy_names:
- PolicyNotificationHandler._logger.info("no policy updated for scopes %s",
- self._policy_scopes.pattern)
- return
-
- audit = Audit(req_message="notificationReceived from PDP")
- audit.retry_get_config = True
- self._policy_updater.enqueue(audit, policy_names)
-
-class PolicyEngineClient(object):
- """ policy-engine client"""
- _logger = logging.getLogger("policy_handler.policy_engine")
- _policy_updater = None
- _pdp_notification_handler = None
- _policy_engine = None
-
- @staticmethod
- def shutdown(audit):
- """Shutdown the notification-handler"""
- PolicyEngineClient._policy_updater.shutdown(audit)
-
- @staticmethod
- def catch_up(audit):
- """bring the latest policies from policy-engine"""
- PolicyEngineClient._policy_updater.catch_up(audit)
-
- @staticmethod
- def create_policy_engine_properties():
- """create the policy_engine.properties file from config.json"""
- pass
-
- @staticmethod
- def run():
- """Using policy-engine client to talk to policy engine"""
- audit = Audit(req_message="start PDP client")
- PolicyEngineClient._policy_updater = PolicyUpdater()
- PolicyEngineClient._pdp_notification_handler = PolicyNotificationHandler(
- PolicyEngineClient._policy_updater)
-
- sub_aud = Audit(aud_parent=audit)
- sub_aud.metrics_start("create client to PDP")
- basic_client_auth = PolicyEngineConfig.save_to_file()
- PolicyEngineClient._policy_engine = PolicyEngine(
- PolicyEngineConfig.PATH_TO_PROPERTIES,
- scheme=NotificationScheme.AUTO_ALL_NOTIFICATIONS.name,
- handler=PolicyEngineClient._pdp_notification_handler,
- basic_client_auth=basic_client_auth
- )
- sub_aud.metrics("created client to PDP")
- seed_scope = ".*"
- PolicyEngineClient._policy_engine.getConfig(policyName=seed_scope)
- sub_aud.metrics("seeded client by PDP.getConfig for policyName={0}".format(seed_scope))
-
- PolicyEngineClient.catch_up(audit)
diff --git a/policyhandler/policy_handler.py b/policyhandler/policy_handler.py
index 50d59bc..1cd62db 100644
--- a/policyhandler/policy_handler.py
+++ b/policyhandler/policy_handler.py
@@ -18,14 +18,14 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
+import os
import sys
import logging
from policyhandler.config import Config
from policyhandler.onap.audit import Audit
from policyhandler.web_server import PolicyWeb
-from policyhandler.policy_engine import PolicyEngineClient
+from policyhandler.policy_receiver import PolicyReceiver
class LogWriter(object):
"""redirect the standard out + err to the logger"""
@@ -52,13 +52,16 @@ def run_policy_handler():
sys.stderr = LogWriter(logger.error)
logger.info("========== run_policy_handler ==========")
- Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
+ policy_handler_version = os.getenv("APP_VER")
+ logger.info("policy_handler_version %s", policy_handler_version)
+ Audit.init(Config.get_system_name(), policy_handler_version, Config.LOGGER_CONFIG_FILE_PATH)
logger.info("starting policy_handler with config:")
logger.info(Audit.log_json_dumps(Config.config))
- PolicyEngineClient.run()
- PolicyWeb.run()
+ audit = Audit(req_message="start policy handler")
+ PolicyReceiver.run(audit)
+ PolicyWeb.run_forever(audit)
if __name__ == "__main__":
run_policy_handler()
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
new file mode 100644
index 0000000..ec25987
--- /dev/null
+++ b/policyhandler/policy_receiver.py
@@ -0,0 +1,195 @@
+"""
+policy-receiver communicates with policy-engine
+thru web-socket to receive push notifications
+on updates and removal of policies.
+
+on receiving the policy-notifications, the policy-receiver
+filters them out by the policy scope(s) provided in policy-handler config
+and passes the notifications to policy-updater
+"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import json
+import logging
+import re
+import time
+from threading import Lock, Thread
+
+import websocket
+
+from .config import Config
+from .onap.audit import Audit
+from .policy_updater import PolicyUpdater
+
+LOADED_POLICIES = 'loadedPolicies'
+REMOVED_POLICIES = 'removedPolicies'
+POLICY_NAME = 'policyName'
+POLICY_VER = 'versionNo'
+
+class _PolicyReceiver(Thread):
+ """web-socket to PolicyEngine"""
+ _logger = logging.getLogger("policy_handler.policy_receiver")
+
+ def __init__(self):
+ """web-socket inside the thread to receive policy notifications from PolicyEngine"""
+ Thread.__init__(self, name="policy_receiver")
+ self.daemon = True
+
+ self._lock = Lock()
+ self._keep_running = True
+
+ config = Config.config[Config.FIELD_POLICY_ENGINE]
+ self.web_socket_url = resturl = config["url"] + config["path_pdp"]
+
+ if resturl.startswith("https:"):
+ self.web_socket_url = resturl.replace("https:", "wss:") + "notifications"
+ else:
+ self.web_socket_url = resturl.replace("http:", "ws:") + "notifications"
+
+ self._web_socket = None
+
+ scope_prefixes = [scope_prefix.replace(".", "[.]")
+ for scope_prefix in Config.config["scope_prefixes"]]
+ self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
+ _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
+ self._policy_updater = PolicyUpdater()
+ self._policy_updater.start()
+
+ def run(self):
+ """listen on web-socket and pass the policy notifications to policy-updater"""
+ websocket.enableTrace(True)
+ restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ self._stop_notifications()
+
+ if restarting:
+ time.sleep(5)
+
+ _PolicyReceiver._logger.info(
+ "connecting to policy-notifications at: %s", self.web_socket_url)
+ self._web_socket = websocket.WebSocketApp(
+ self.web_socket_url,
+ on_message=self._on_pdp_message,
+ on_close=self._on_ws_close,
+ on_error=self._on_ws_error
+ )
+
+ _PolicyReceiver._logger.info("waiting for policy-notifications...")
+ self._web_socket.run_forever()
+ restarting = True
+
+ _PolicyReceiver._logger.info("exit policy-receiver")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _stop_notifications(self):
+ """Shuts down the AutoNotification service if running."""
+ with self._lock:
+ if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
+ self._web_socket.close()
+ _PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
+
+ def _on_pdp_message(self, _, message):
+ """received the notification from PDP"""
+ _PolicyReceiver._logger.info("Received notification message: %s", message)
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message:
+ return
+
+ policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(LOADED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+ policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(REMOVED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+
+ if not policies_updated and not policies_removed:
+ _PolicyReceiver._logger.info(
+ "no policy updated or removed for scopes %s", self._policy_scopes.pattern
+ )
+ return
+
+ audit = Audit(req_message="policy-notification - updated[{0}], removed[{1}]" \
+ .format(len(policies_updated), len(policies_removed)))
+ audit.retry_get_config = True
+ self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+
+ def _on_ws_error(self, _, error):
+ """report an error"""
+ _PolicyReceiver._logger.error("policy-notification error: %s", error)
+
+ def _on_ws_close(self, _):
+ """restart web-socket on close"""
+ _PolicyReceiver._logger.info("lost connection to PDP - restarting...")
+
+ def shutdown(self, audit):
+ """Shutdown the policy-receiver"""
+ _PolicyReceiver._logger.info("shutdown policy-receiver")
+ with self._lock:
+ self._keep_running = False
+
+ self._stop_notifications()
+
+ if self.is_alive():
+ self.join()
+
+ self._policy_updater.shutdown(audit)
+
+ def catch_up(self, audit):
+ """need to bring the latest policies to DCAE-Controller"""
+ self._policy_updater.catch_up(audit)
+
+class PolicyReceiver(object):
+ """policy-receiver - static singleton wrapper"""
+ _policy_receiver = None
+
+ @staticmethod
+ def shutdown(audit):
+ """Shutdown the notification-handler"""
+ PolicyReceiver._policy_receiver.shutdown(audit)
+
+ @staticmethod
+ def catch_up(audit):
+ """bring the latest policies from policy-engine"""
+ PolicyReceiver._policy_receiver.catch_up(audit)
+
+ @staticmethod
+ def run(audit):
+ """Using policy-engine client to talk to policy engine"""
+ sub_aud = Audit(aud_parent=audit)
+ sub_aud.metrics_start("start policy receiver")
+
+ PolicyReceiver._policy_receiver = _PolicyReceiver()
+ PolicyReceiver._policy_receiver.start()
+
+ sub_aud.metrics("started policy receiver")
+
+ PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index bf8a31d..1e50693 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -22,123 +22,30 @@
import logging
import json
import copy
-import re
import time
from multiprocessing.dummy import Pool as ThreadPool
import requests
from .config import Config
-from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \
- POLICY_BODY, POLICY_CONFIG
+from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode
-
-class PolicyUtils(object):
- """policy-client utils"""
- _logger = logging.getLogger("policy_handler.policy_utils")
- _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
-
- @staticmethod
- def safe_json_parse(json_str):
- """try parsing json without exception - returns the json_str back if fails"""
- if not json_str:
- return json_str
- try:
- return json.loads(json_str)
- except ValueError as err:
- PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
- return json_str
-
- @staticmethod
- def extract_policy_id(policy_name):
- """ policy_name = policy_id + "." + <version> + "." + <extension>
- For instance,
- policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
- policy_id = DCAE_alex.Config_alex_policy_number_1
- policy_scope = DCAE_alex
- policy_class = Config
- policy_version = 3
- type = extension = xml
- delimiter = "."
- policy_class_delimiter = "_"
- policy_name in PAP = DCAE_alex.alex_policy_number_1
- """
- if not policy_name:
- return
- return PolicyUtils._policy_name_ext.sub('', policy_name)
-
- @staticmethod
- def parse_policy_config(policy):
- """try parsing the config in policy."""
- if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]:
- policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(
- policy[POLICY_BODY][POLICY_CONFIG])
- return policy
-
- @staticmethod
- def convert_to_policy(policy_config):
- """wrap policy_config received from policy-engine with policy_id."""
- if not policy_config or POLICY_NAME not in policy_config \
- or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]:
- return
- policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME])
- if not policy_id:
- return
- return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
-
- @staticmethod
- def select_latest_policy(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
- DCAE-Controller is only interested in the latest version
- """
- if not policy_configs:
- return
- latest_policy_config = {}
- for policy_config in policy_configs:
- if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \
- or not policy_config[POLICY_VERSION].isdigit():
- continue
- if not latest_policy_config \
- or int(policy_config[POLICY_VERSION]) \
- > int(latest_policy_config[POLICY_VERSION]):
- latest_policy_config = policy_config
-
- return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
-
- @staticmethod
- def select_latest_policies(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
- DCAE-Controller is only interested in the latest versions
- """
- if not policy_configs:
- return {}
- policies = {}
- for policy_config in policy_configs:
- policy = PolicyUtils.convert_to_policy(policy_config)
- if not policy or POLICY_ID not in policy or POLICY_BODY not in policy:
- continue
- if POLICY_VERSION not in policy[POLICY_BODY] \
- or not policy[POLICY_BODY][POLICY_VERSION] \
- or not policy[POLICY_BODY][POLICY_VERSION].isdigit():
- continue
- if policy[POLICY_ID] not in policies:
- policies[policy[POLICY_ID]] = policy
- continue
- if int(policy[POLICY_BODY][POLICY_VERSION]) \
- > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]):
- policies[policy[POLICY_ID]] = policy
-
- for policy_id in policies:
- policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
-
- return policies
+from .policy_utils import PolicyUtils
class PolicyRest(object):
""" policy-engine """
_logger = logging.getLogger("policy_handler.policy_rest")
_lazy_inited = False
+ POLICY_GET_CONFIG = 'getConfig'
+ POLICY_CONFIG_STATUS = "policyConfigStatus"
+ CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
+ POLICY_CONFIG_MESSAGE = "policyConfigMessage"
+ NO_RESPONSE_RECEIVED = "No Response Received"
+
+ MIN_VERSION_EXPECTED = "min_version_expected"
+ IGNORE_POLICY_NAMES = "ignore_policy_names"
_requests_session = None
- _url = None
+ _url_get_config = None
_headers = None
_target_entity = None
_thread_pool_size = 4
@@ -167,7 +74,8 @@ class PolicyRest(object):
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
- PolicyRest._url = config["url"] + config["path_api"]
+ PolicyRest._url_get_config = config["url"] \
+ + config["path_api"] + PolicyRest.POLICY_GET_CONFIG
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4)
@@ -181,31 +89,32 @@ class PolicyRest(object):
PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \
- PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \
+ PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \
json.dumps(PolicyRest._scope_prefixes))
@staticmethod
- def _post(audit, path, json_body):
+ def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
- full_path = PolicyRest._url + path
sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \
- targetServiceName=full_path)
+ targetServiceName=PolicyRest._url_get_config)
msg = json.dumps(json_body)
headers = copy.copy(PolicyRest._headers)
headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id
headers_str = Audit.log_json_dumps(headers)
- log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str)
+ log_line = "post to PDP {0} msg={1} headers={2}".format(
+ PolicyRest._url_get_config, msg, headers_str)
sub_aud.metrics_start(log_line)
PolicyRest._logger.info(log_line)
res = None
try:
- res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers)
+ res = PolicyRest._requests_session.post(
+ PolicyRest._url_get_config, json=json_body, headers=headers)
except requests.exceptions.RequestException as ex:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \
- .format(full_path, str(ex), msg, headers_str)
+ .format(PolicyRest._url_get_config, str(ex), msg, headers_str)
PolicyRest._logger.exception(error_msg)
sub_aud.set_http_status_code(error_code)
@@ -213,145 +122,301 @@ class PolicyRest(object):
sub_aud.metrics(error_msg)
return (error_code, None)
- log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \
- full_path, res.status_code, msg, res.text, \
+ log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
+ PolicyRest._url_get_config, res.status_code, msg, res.text,
Audit.log_json_dumps(dict(res.request.headers.items())))
+
+ res_data = None
+ if res.status_code == requests.codes.ok:
+ res_data = res.json()
+
+ if res_data and isinstance(res_data, list) and len(res_data) == 1:
+ result = res_data[0]
+ if result and not result.get(POLICY_NAME):
+ res_data = None
+ if result.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED:
+ error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ error_msg = "unexpected {0}".format(log_line)
+
+ PolicyRest._logger.error(error_msg)
+ sub_aud.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ sub_aud.metrics(error_msg)
+ return (error_code, None)
+
sub_aud.set_http_status_code(res.status_code)
sub_aud.metrics(log_line)
PolicyRest._logger.info(log_line)
+ return res.status_code, res_data
- if res.status_code == requests.codes.ok:
- return res.status_code, res.json()
+ @staticmethod
+ def validate_policy(policy):
+ """Validates the config on policy"""
+ if not policy:
+ return
- return res.status_code, None
+ policy_body = policy.get(POLICY_BODY)
+
+ return bool(
+ policy_body
+ and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED
+ and policy_body.get(POLICY_CONFIG)
+ )
@staticmethod
- def get_latest_policy(aud_policy_name):
- """Get the latest policy for the policy_name from the policy-engine"""
+ def validate_policies(policies):
+ """Validate the config on policies. Returns (valid, errored) tuple"""
+ if not policies:
+ return None, policies
+
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in policies.iteritems():
+ if PolicyRest.validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+
+ return valid_policies, errored_policies
+
+ @staticmethod
+ def get_latest_policy(aud_policy_id):
+ """Get the latest policy for the policy_id from the policy-engine"""
PolicyRest._lazy_init()
- audit, policy_name = aud_policy_name
+ audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
status_code = 0
+ policy_configs = None
latest_policy = None
+ expect_policy_removed = (ignore_policy_names and not min_version_expected)
+
for retry in xrange(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug("%s", policy_name)
- status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
- {POLICY_NAME:policy_name})
- PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \
- json.dumps(policy_configs or []))
- latest_policy = PolicyUtils.select_latest_policy(policy_configs)
- if not latest_policy:
- audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \
- .format(policy_name, json.dumps(policy_configs or [])), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ PolicyRest._logger.debug("%s", policy_id)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(
+ audit, {POLICY_NAME:policy_id}
+ )
+
+ PolicyRest._logger.debug("%s %s policy_configs: %s",
+ status_code, policy_id, json.dumps(policy_configs or []))
+
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_configs, min_version_expected, ignore_policy_names
+ )
+
+ if not latest_policy and not expect_policy_removed:
+ audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ .format(policy_id, json.dumps(policy_configs or [])),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR))
if latest_policy or not audit.retry_get_config \
+ or (expect_policy_removed and not policy_configs) \
or not PolicyRest._policy_retry_sleep \
- or AuditResponseCode.PERMISSION_ERROR.value \
- == AuditResponseCode.get_response_code(status_code).value:
+ or audit.is_serious_error(status_code):
break
if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \
- .format(POLICY_GET_CONFIG, retry, policy_name), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ .format(PolicyRest._url_get_config, retry, policy_id),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR))
break
- audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \
- .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ audit.warn(
+ "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR))
time.sleep(PolicyRest._policy_retry_sleep)
+ if expect_policy_removed and not latest_policy \
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code:
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
audit.set_http_status_code(status_code)
- if not latest_policy:
+ if not PolicyRest.validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
+ )
+
return latest_policy
@staticmethod
- def get_latest_policies_by_names(aud_policy_names):
+ def get_latest_updated_policies(aud_policy_updates):
"""Get the latest policies of the list of policy_names from the policy-engine"""
PolicyRest._lazy_init()
- audit, policy_names = aud_policy_names
- if not policy_names:
+ audit, policies_updated, policies_removed = aud_policy_updates
+ if not policies_updated and not policies_removed:
return
- audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \
- len(policy_names), json.dumps(policy_names)))
- PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names))
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
+ audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ PolicyRest._logger.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_name, policy_version) in policies_updated:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+ policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+
+ for (policy_name, _) in policies_removed:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()]
- thread_count = min(PolicyRest._thread_pool_size, len(policy_names))
- apns = [(audit, policy_name) for policy_name in policy_names]
policies = None
- if thread_count == 1:
+ apns_length = len(apns)
+ if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
else:
- pool = ThreadPool(thread_count)
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
policies = pool.map(PolicyRest.get_latest_policy, apns)
pool.close()
pool.join()
- audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \
- len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \
- targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
- policies = dict([(policy[POLICY_ID], policy) \
- for policy in policies if policy and POLICY_ID in policy])
- PolicyRest._logger.debug("policies %s", json.dumps(policies))
- if not policies:
+ audit.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)),
+ targetEntity=PolicyRest._target_entity,
+ targetServiceName=PolicyRest._url_get_config)
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()
+ if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ PolicyRest._logger.debug(
+ "result updated_policies %s, removed_policies %s, errored_policies %s",
+ json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- return policies
+ audit.error(
+ "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
+ )
+
+ return updated_policies, removed_policies
@staticmethod
- def _get_latest_policies(aud_scope_prefix):
- """Get the latest policies of the same scope from the policy-engine"""
- audit, scope_prefix = aud_scope_prefix
- PolicyRest._logger.debug("%s", scope_prefix)
- status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
- {POLICY_NAME:scope_prefix + ".*"})
- audit.set_http_status_code(status_code)
- PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \
- scope_prefix, json.dumps(policy_configs or []))
- latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+ def _get_latest_policies(aud_policy_filter):
+ """
+ get the latest policies by policy_filter
+ or all the latest policies of the same scope from the policy-engine
+ """
+ audit, policy_filter, error_if_not_found = aud_policy_filter
+ str_policy_filter = json.dumps(policy_filter)
+ PolicyRest._logger.debug("%s", str_policy_filter)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+
+ PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_configs or []))
+ latest_policies = PolicyUtils.select_latest_policies(policy_configs)
if not latest_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \
- scope_prefix, json.dumps(policy_configs or [])), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
- return latest_policies
+ if error_if_not_found:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.warn(
+ "received no policies from PDP for policy_filter {0}: {1}"
+ .format(str_policy_filter, json.dumps(policy_configs or [])),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR)
+ )
+ return None, latest_policies
+
+ audit.set_http_status_code(status_code)
+ return PolicyRest.validate_policies(latest_policies)
@staticmethod
- def get_latest_policies(audit):
+ def get_latest_policies(audit, policy_filter=None):
"""Get the latest policies of the same scope from the policy-engine"""
PolicyRest._lazy_init()
- PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes))
- audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \
- len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes)))
- asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes]
+ aud_policy_filters = None
+ str_metrics = None
+ str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes)
+ if policy_filter is not None:
+ aud_policy_filters = [(audit, policy_filter, True)]
+ str_metrics = "get_latest_policies for policy_filter {0}".format(
+ str_policy_filters)
+ else:
+ aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False)
+ for scope_prefix in PolicyRest._scope_prefixes]
+ str_metrics = "get_latest_policies for scopes {0} {1}".format( \
+ len(PolicyRest._scope_prefixes), str_policy_filters)
+
+ PolicyRest._logger.debug("%s", str_policy_filters)
+ audit.metrics_start(str_metrics)
+
latest_policies = None
- if PolicyRest._scope_thread_pool_size == 1:
- latest_policies = [PolicyRest._get_latest_policies(asps[0])]
+ apfs_length = len(aud_policy_filters)
+ if apfs_length == 1:
+ latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
else:
- pool = ThreadPool(PolicyRest._scope_thread_pool_size)
- latest_policies = pool.map(PolicyRest._get_latest_policies, asps)
+ pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+ latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
pool.close()
pool.join()
- audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \
- len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \
- len(latest_policies), json.dumps(latest_policies)), \
- targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
+ audit.metrics("total result {0}: {1} {2}".format(
+ str_metrics, len(latest_policies), json.dumps(latest_policies)), \
+ targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config)
+
+ # latest_policies == [(valid_policies, errored_policies), ...]
+ valid_policies = dict(
+ pair for (vps, _) in latest_policies if vps for pair in vps.iteritems())
+
+ errored_policies = dict(
+ pair for (_, eps) in latest_policies if eps for pair in eps.iteritems())
- latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items())
- PolicyRest._logger.debug("latest_policies: %s %s", \
- json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies))
+ PolicyRest._logger.debug(
+ "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s",
+ str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies))
- return latest_policies
+ return valid_policies, errored_policies
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 1f1539f..9732f69 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -26,6 +26,7 @@ from threading import Thread, Lock
from .policy_rest import PolicyRest
from .deploy_handler import DeployHandler
+from .onap.audit import Audit
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -33,40 +34,45 @@ class PolicyUpdater(Thread):
def __init__(self):
"""init static config of PolicyUpdater."""
- Thread.__init__(self)
- self.name = "policy_updater"
+ Thread.__init__(self, name="policy_updater")
self.daemon = True
- self._req_shutdown = None
- self._req_catch_up = None
+ self._aud_shutdown = None
+ self._aud_catch_up = None
self._lock = Lock()
self._queue = Queue()
- def enqueue(self, audit=None, policy_names=None):
- """enqueue the policy-names"""
- policy_names = policy_names or []
- PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names))
- self._queue.put((audit, policy_names))
+ def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
+ """enqueue the policy-updates"""
+ policies_updated = policies_updated or []
+ policies_removed = policies_removed or []
+
+ PolicyUpdater._logger.info(
+ "policies_updated %s policies_removed %s",
+ json.dumps(policies_updated), json.dumps(policies_removed))
+ self._queue.put((audit, policies_updated, policies_removed))
def run(self):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- audit, policy_names = self._queue.get()
- PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names))
+ audit, policies_updated, policies_removed = self._queue.get()
+ PolicyUpdater._logger.info(
+ "got policies_updated %s policies_removed %s",
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
if not self._keep_running():
self._queue.task_done()
break
- if self._on_catch_up():
- continue
- if not policy_names:
- self._queue.task_done()
+ if self._on_catch_up(audit) or not audit:
continue
- updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names))
- PolicyUpdater.policy_update(audit, updated_policies)
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (audit, policies_updated, policies_removed))
+
+ DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies)
audit.audit_done()
self._queue.task_done()
@@ -74,51 +80,60 @@ class PolicyUpdater(Thread):
def _keep_running(self):
"""thread-safe check whether to continue running"""
- self._lock.acquire()
- keep_running = not self._req_shutdown
- self._lock.release()
- if self._req_shutdown:
- self._req_shutdown.audit_done()
+ with self._lock:
+ keep_running = not self._aud_shutdown
+
+ if self._aud_shutdown:
+ self._aud_shutdown.audit_done()
return keep_running
def catch_up(self, audit):
"""need to bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- self._req_catch_up = audit
- self._lock.release()
+ PolicyUpdater._logger.info("catch_up requested")
+ with self._lock:
+ self._aud_catch_up = audit
+
self.enqueue()
- def _on_catch_up(self):
- """Bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- req_catch_up = self._req_catch_up
- if self._req_catch_up:
- self._req_catch_up = None
+ def _reset_queue(self):
+ """clear up the queue"""
+ with self._lock:
+ self._aud_catch_up = None
self._queue.task_done()
self._queue = Queue()
+
+ def _on_catch_up(self, audit):
+ """Bring the latest policies to DCAE-Controller"""
+ self._lock.acquire()
+ aud_catch_up = self._aud_catch_up
+ if self._aud_catch_up:
+ self._aud_catch_up = None
self._lock.release()
- if not req_catch_up:
+
+ if not aud_catch_up:
return False
PolicyUpdater._logger.info("catch_up")
- latest_policies = PolicyRest.get_latest_policies(req_catch_up)
- PolicyUpdater.policy_update(req_catch_up, latest_policies)
- req_catch_up.audit_done()
- return True
-
- @staticmethod
- def policy_update(audit, updated_policies):
- """Invoke deploy-handler"""
- if updated_policies:
- PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies))
- DeployHandler.policy_update(audit, updated_policies)
+ latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up)
+
+ if not aud_catch_up.is_success():
+ PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
+ if not audit:
+ self._queue.task_done()
+ else:
+ DeployHandler.policy_update(
+ aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True)
+ self._reset_queue()
+ success, _, _ = aud_catch_up.audit_done()
+ PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+
+ return success
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")
- self._lock.acquire()
- self._req_shutdown = audit
- self._lock.release()
+ with self._lock:
+ self._aud_shutdown = audit
self.enqueue()
if self.is_alive():
self.join()
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
new file mode 100644
index 0000000..d664b21
--- /dev/null
+++ b/policyhandler/policy_utils.py
@@ -0,0 +1,134 @@
+"""policy-client communicates with policy-engine thru REST API"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+import re
+
+from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
+
+class PolicyUtils(object):
+ """policy-client utils"""
+ _logger = logging.getLogger("policy_handler.policy_utils")
+ _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
+
+ @staticmethod
+ def safe_json_parse(json_str):
+ """try parsing json without exception - returns the json_str back if fails"""
+ if not json_str:
+ return json_str
+ try:
+ return json.loads(json_str)
+ except (ValueError, TypeError) as err:
+ PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
+ return json_str
+
+ @staticmethod
+ def extract_policy_id(policy_name):
+ """ policy_name = policy_id + "." + <version> + "." + <extension>
+ For instance,
+ policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
+ policy_id = DCAE_alex.Config_alex_policy_number_1
+ policy_scope = DCAE_alex
+ policy_class = Config
+ policy_version = 3
+ type = extension = xml
+ delimiter = "."
+ policy_class_delimiter = "_"
+ policy_name in PAP = DCAE_alex.alex_policy_number_1
+ """
+ if not policy_name:
+ return
+ return PolicyUtils._policy_name_ext.sub('', policy_name)
+
+ @staticmethod
+ def parse_policy_config(policy):
+ """try parsing the config in policy."""
+ if not policy:
+ return policy
+ config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
+ if config:
+ policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config)
+ return policy
+
+ @staticmethod
+ def convert_to_policy(policy_config):
+ """wrap policy_config received from policy-engine with policy_id."""
+ if not policy_config:
+ return
+ policy_name = policy_config.get(POLICY_NAME)
+ policy_version = policy_config.get(POLICY_VERSION)
+ if not policy_name or not policy_version:
+ return
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ return
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
+
+ @staticmethod
+ def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None):
+ """For some reason, the policy-engine returns all version of the policy_configs.
+ DCAE-Controller is only interested in the latest version
+ """
+ if not policy_configs:
+ return
+ latest_policy_config = {}
+ for policy_config in policy_configs:
+ policy_name = policy_config.get(POLICY_NAME)
+ policy_version = policy_config.get(POLICY_VERSION)
+ if not policy_name or not policy_version or not policy_version.isdigit():
+ continue
+ policy_version = int(policy_version)
+ if min_version_expected and policy_version < min_version_expected:
+ continue
+ if ignore_policy_names and policy_name in ignore_policy_names:
+ continue
+
+ if not latest_policy_config \
+ or int(latest_policy_config[POLICY_VERSION]) < policy_version:
+ latest_policy_config = policy_config
+
+ return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
+
+ @staticmethod
+ def select_latest_policies(policy_configs):
+ """For some reason, the policy-engine returns all version of the policy_configs.
+ DCAE-Controller is only interested in the latest versions
+ """
+ if not policy_configs:
+ return {}
+ policies = {}
+ for policy_config in policy_configs:
+ policy = PolicyUtils.convert_to_policy(policy_config)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ if not policy_id or not policy_version or not policy_version.isdigit():
+ continue
+ if policy_id not in policies \
+ or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]):
+ policies[policy_id] = policy
+
+ for policy_id in policies:
+ policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
+
+ return policies
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 9a5ee19..3d2503a 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -27,78 +27,187 @@ import cherrypy
from .config import Config
from .onap.audit import Audit
from .policy_rest import PolicyRest
-from .policy_engine import PolicyEngineClient
+from .policy_receiver import PolicyReceiver
class PolicyWeb(object):
- """Main static class for REST API of policy-handler"""
- logger = logging.getLogger("policy_handler.web_cherrypy")
+ """run REST API of policy-handler"""
+ logger = logging.getLogger("policy_handler.policy_web")
@staticmethod
- def run():
- """run forever the web-server of the policy-handler"""
- PolicyWeb.logger.info("policy_handler web-service at port(%d)...", \
- Config.wservice_port)
- cherrypy.config.update({"server.socket_host": "0.0.0.0", \
- 'server.socket_port': Config.wservice_port})
- cherrypy.tree.mount(PolicyLatest(), '/policy_latest')
- cherrypy.tree.mount(PoliciesLatest(), '/policies_latest')
- cherrypy.tree.mount(PoliciesCatchUp(), '/catch_up')
- cherrypy.quickstart(Shutdown(), '/shutdown')
-
-class Shutdown(object):
- """Shutdown the policy-handler"""
- @cherrypy.expose
- def index(self):
- """shutdown event"""
- audit = Audit(req_message="get /shutdown", headers=cherrypy.request.headers)
- PolicyWeb.logger.info("--------- stopping REST API of policy-handler -----------")
- cherrypy.engine.exit()
- PolicyEngineClient.shutdown(audit)
- PolicyWeb.logger.info("--------- the end -----------")
- res = str(datetime.now())
- audit.info_requested(res)
- return "goodbye! shutdown requested {0}".format(res)
+ def run_forever(audit):
+ """run the web-server of the policy-handler forever"""
+ PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port)
+ cherrypy.config.update({"server.socket_host": "0.0.0.0",
+ 'server.socket_port': Config.wservice_port})
+ cherrypy.tree.mount(_PolicyWeb(), '/')
+ audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port))
+ cherrypy.engine.start()
+
+class _PolicyWeb(object):
+ """REST API of policy-handler"""
-class PoliciesLatest(object):
- """REST API of the policy-hanlder"""
+ @staticmethod
+ def _get_request_info(request):
+ """returns info about the http request"""
+ return "{0} {1}{2}".format(request.method, request.script_name, request.path_info)
@cherrypy.expose
+ @cherrypy.popargs('policy_id')
@cherrypy.tools.json_out()
- def index(self):
- """find the latest policy by policy_id or all latest policies"""
- audit = Audit(req_message="get /policies_latest", headers=cherrypy.request.headers)
- res = PolicyRest.get_latest_policies(audit) or {}
- PolicyWeb.logger.info("PoliciesLatest: %s", json.dumps(res))
- audit.audit_done(result=json.dumps(res))
+ def policy_latest(self, policy_id):
+ """retireves the latest policy identified by policy_id"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+ PolicyWeb.logger.info("%s policy_id=%s headers=%s", \
+ req_info, policy_id, json.dumps(cherrypy.request.headers))
+
+ res = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {}
+
+ PolicyWeb.logger.info("res %s policy_id=%s res=%s", req_info, policy_id, json.dumps(res))
+
+ success, http_status_code, response_description = audit.audit_done(result=json.dumps(res))
+ if not success:
+ raise cherrypy.HTTPError(http_status_code, response_description)
return res
-@cherrypy.popargs('policy_id')
-class PolicyLatest(object):
- """REST API of the policy-hanlder"""
+ def _get_all_policies_latest(self):
+ """retireves all the latest policies on GET /policies_latest"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+
+ valid_policies, errored_policies = PolicyRest.get_latest_policies(audit)
+
+ res = {"valid_policies": valid_policies, "errored_policies": errored_policies}
+ PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res))
+
+ success, http_status_code, response_description = audit.audit_done(result=json.dumps(res))
+ if not success:
+ raise cherrypy.HTTPError(http_status_code, response_description)
+ return res
@cherrypy.expose
@cherrypy.tools.json_out()
- def index(self, policy_id):
- """find the latest policy by policy_id or all latest policies"""
- audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""), \
+ @cherrypy.tools.json_in()
+ def policies_latest(self):
+ """
+ on :GET: retrieves all the latest policies from policy-engine that are
+ in the scope of the policy-handler.
+
+ on :POST: expects to receive the params that mimic the /getConfig of policy-engine
+ and retrieves the matching policies from policy-engine and picks the latest on each policy.
+
+ sample request - policies filter
+
+ {
+ "configAttributes": { "key1":"value1" },
+ "configName": "alex_config_name",
+ "ecompName": "DCAE",
+ "policyName": "DCAE_alex.Config_alex_.*",
+ "unique": false
+ }
+
+ sample response
+
+ {
+ "DCAE_alex.Config_alex_priority": {
+ "policy_body": {
+ "policyName": "DCAE_alex.Config_alex_priority.3.xml",
+ "policyConfigMessage": "Config Retrieved! ",
+ "responseAttributes": {},
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "matchingConditions": {
+ "priority": "10",
+ "key1": "value1",
+ "ECOMPName": "DCAE",
+ "ConfigName": "alex_config_name"
+ },
+ "property": null,
+ "config": {
+ "foo": "bar",
+ "foo_updated": "2017-10-06T16:54:31.696Z"
+ },
+ "policyVersion": "3"
+ },
+ "policy_id": "DCAE_alex.Config_alex_priority"
+ }
+ }
+ """
+ if cherrypy.request.method == "GET":
+ return self._get_all_policies_latest()
+
+ if cherrypy.request.method != "POST":
+ raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method))
+
+ policy_filter = cherrypy.request.json or {}
+ str_policy_filter = json.dumps(policy_filter)
+
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message="{0}: {1}".format(req_info, str_policy_filter), \
headers=cherrypy.request.headers)
- PolicyWeb.logger.info("PolicyLatest policy_id=%s headers=%s", \
- policy_id, json.dumps(cherrypy.request.headers))
- res = PolicyRest.get_latest_policy((audit, policy_id)) or {}
- PolicyWeb.logger.info("PolicyLatest policy_id=%s res=%s", policy_id, json.dumps(res))
- audit.audit_done(result=json.dumps(res))
+ PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \
+ req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
+
+ res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
+
+ PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \
+ req_info, str_policy_filter, json.dumps(res))
+
+ success, http_status_code, response_description = audit.audit_done(result=json.dumps(res))
+ if not success:
+ raise cherrypy.HTTPError(http_status_code, response_description)
return res
-class PoliciesCatchUp(object):
- """catch up with all DCAE policies"""
@cherrypy.expose
@cherrypy.tools.json_out()
- def index(self):
- """catch up with all policies"""
+ def catch_up(self):
+ """catch up with all DCAE policies"""
started = str(datetime.now())
- audit = Audit(req_message="get /catch_up", headers=cherrypy.request.headers)
- PolicyEngineClient.catch_up(audit)
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+ PolicyReceiver.catch_up(audit)
+
res = {"catch-up requested": started}
- PolicyWeb.logger.info("PoliciesCatchUp: %s", json.dumps(res))
+ PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res))
audit.info_requested(started)
return res
+
+ @cherrypy.expose
+ def shutdown(self):
+ """Shutdown the policy-handler"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s: --- stopping REST API of policy-handler ---", req_info)
+
+ cherrypy.engine.exit()
+
+ PolicyReceiver.shutdown(audit)
+
+ health = json.dumps(Audit.health())
+ audit.info("policy_handler health: {0}".format(health))
+ PolicyWeb.logger.info("policy_handler health: %s", health)
+ PolicyWeb.logger.info("%s: --------- the end -----------", req_info)
+ res = str(datetime.now())
+ audit.info_requested(res)
+ return "goodbye! shutdown requested {0}".format(res)
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def healthcheck(self):
+ """returns the healthcheck results"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+
+ res = Audit.health()
+
+ PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res))
+
+ audit.audit_done(result=json.dumps(res))
+ return res