diff options
Diffstat (limited to 'policyhandler')
22 files changed, 2373 insertions, 2033 deletions
diff --git a/policyhandler/PolicyEngine.py b/policyhandler/PolicyEngine.py deleted file mode 100644 index 7a69894..0000000 --- a/policyhandler/PolicyEngine.py +++ /dev/null @@ -1,1219 +0,0 @@ -"""
-PolicyEngine API for Python
-
-@author: Tarun, Mike
-@version: 0.9
-@change:
- added Enum 'Other' Type and supported it in PolicyConfig class - 1/13
- supporting Remote URL capability to the PolicyEngine as the parameter - 1/13
- Request format has been updated accordingly. No need to send the PDP URLs anymore - 1/26
- Feature where the PolicyEngine chooses available PyPDP among different URLs. 1/26
- Major feature addition required for Notifications 2/17 , Fixed Session issues. 2/25
- Major change in API structure for combining results 3/4.
- Added Security support for Notifications and clearNotification Method 3/18
- newMethod for retrieving configuration using policyFileName 3/20
- logging 3/24
- Notification Bug Fixes 7/21
- basic Auth 7/22
- Notification Changes 9/3
- ECOMP Error codes included 10/1
- -2016
- Major Changes to the Policy Engine API 3/29
- DeletePolicy API and Changes to pushPolicy and getConfig 5/27
- ConfigRequestParmeters and Error code Change 7/11
- New Environment Variable and Client Authorizations Change 7/21
- Changes to the Policy Parameters 8/21
- Allow Clients to use their own Password Protection.
- Dictionary Types and its Fixes.
-"""
-
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import json,sys, os, collections, websocket , logging, time, base64, uuid
-from websocket import create_connection
-from enum import Enum
-from xml.etree.ElementTree import XML
-try:
- # For Python 3.0 and Later
- from pip._vendor import requests
-except ImportError:
- # For backend Support to Python 2's urllib2
- import requests
-try:
- # For Python 3.0 and Later
- from urllib.request import urlopen
-except ImportError:
- # Fall back for Python 2.*
- from urllib2 import urlopen
-try:
- import thread
-except ImportError:
- #Fall Back for Python 2.x
- import _thread as thread
-
-PolicyConfigStatus = Enum('PolicyConfigStatus', 'CONFIG_NOT_FOUND CONFIG_RETRIEVED')
-PolicyType = Enum('PolicyType', 'JSON XML PROPERTIES OTHER')
-PolicyResponseStatus = Enum('PolicyResponseStatus', 'ACTION_ADVISED ACTION_TAKEN NO_ACTION_REQUIRED')
-NotificationType = Enum('NotificationType', 'BOTH UPDATE REMOVE')
-UpdateType = Enum('UpdateType', 'UPDATE NEW')
-NotificationScheme = Enum('NotificationScheme', 'AUTO_ALL_NOTIFICATIONS AUTO_NOTIFICATIONS MANUAL_ALL_NOTIFICATIONS MANUAL_NOTIFICATIONS')
-AttributeType = Enum('AttributeType', 'MATCHING MICROSERVICE RULE SETTINGS')
-PolicyClass = Enum('PolicyClass', 'Action Config Decision')
-PolicyConfigType = Enum('PolicyConfigType', 'Base BRMS_PARAM BRMS_RAW ClosedLoop_Fault ClosedLoop_PM Firewall MicroService Extended')
-DeletePolicyCondition = Enum('DeletePolicyCondition', 'ONE ALL')
-RuleProvider = Enum('RuleProvider', 'Custom AAF GUARD_YAML')
-DictionaryType = Enum('DictionaryType', 'Common Action ClosedLoop Firewall Decision BRMS GOC MicroService DescriptiveScope PolicyScope Enforcer SafePolicy' )
-ImportType = Enum('ImportType', 'MICROSERVICE')
-
-class PolicyEngine:
- """
- PolicyEngine is the Class which needs to be instantiated to call the PDP server.
- It needs the *.properties* file path as the parameter to the constructor.
- """
- def __init__(self, filename, scheme=None, handler=None, clientKey=None, basic_client_auth=True):
- """
- @param filename: String format of the path location of .properties file Could also be A remote URL eg: http://localhost:8080/config.properties
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- @param clientKey: Decoded Client Key to be used by PolicyEngine.
- @attention:The .properties file must contain the PYPDP_URL parameter in it. The parameter can have multiple URLs the PolicyEngine chooses the available PyPDP among them.
- """
- self.filename = filename
- self.urldict = {}
- self.matchStore = []
- self.autoURL = None
- self.scheme = None
- self.handler = None
- self.thread = thread
- self.autows = None
- self.mclose = False
- self.restart = False
- self.logger = logging.getLogger()
- self.resturl= []
- self.encoded= []
- self.clientInfo = None
- self.environment = None
- self.policyheader = {}
- if(filename.startswith("http")):
- try:
- policy_data = urlopen(filename)
- for line in policy_data :
- line = line.decode('utf-8')
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- #print("key= "+key+" Value =" +value )
- self.urldict[key] = value
- except:
- self.logger.error("PE300 - Data Issue: Error with the Config URL: %s" , filename )
- print("PE300 - Data Issue: Config Properties URL Error")
- sys.exit(0)
- else :
- fileExtension = os.path.splitext(filename)
- if(fileExtension[1]!=".properties"):
- self.logger.error("PE300 - Data Issue: File is not in properties format: %s", filename)
- print("PE300 - Data Issue: Not a .properties file!")
- sys.exit(0)
- try :
- with open(self.filename, 'r') as f:
- for line in f:
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- # self.logger.info("key=%s Value=%s", key, value)
- self.urldict[key] = value
- except FileNotFoundError:
- self.logger.error("PE300 - Data Issue: File Not found: %s", filename)
- print("PE300 - Data Issue: File Doesn't exist in the given Location")
- sys.exit(0)
- #TODO logic for best available PyPDP servers
- try:
- self.urldict = collections.OrderedDict(sorted(self.urldict.items()))
- clientID = self.urldict.get("CLIENT_ID")
- # self.logger.info("clientID decoded %s", base64.b64decode(clientID).decode("utf-8"))
- # client_parts = base64.b64decode(clientID).split(":")
- # client_parts = clientID.split(":")
- # self.logger.info("ClientAuth:Basic %s", base64.b64encode(clientID))
- # self.logger.info("CLIENT_ID[0] = %s", client_parts[0])
- # self.logger.info("CLIENT_ID[0] base64 = %s", base64.b64encode(client_parts[0]))
- # self.logger.info("CLIENT_KEY base64 = %s", base64.b64encode(client_parts[1]))
- if(clientKey is None):
- try:
- client = base64.b64decode(self.urldict.get("CLIENT_KEY")).decode("utf-8")
- except Exception:
- self.logger.warn("PE300 - Data Issue: CLIENT_KEY parameter is not in the required encoded Format taking Value as clear Text")
- client = self.urldict.get("CLIENT_KEY")
- else:
- client = clientKey
- if(clientID is None or client is None):
- self.logger.error("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file")
- sys.exit(0)
- else:
- uid = clientID.encode('ascii')
- password = client.encode('ascii')
- self.clientInfo = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.environment = self.urldict.get("ENVIRONMENT")
- if(self.environment is None):
- self.logger.info("Missing Environment Variable setting to Default Value.")
- self.environment = "DEVL"
- self.policyheader = {
- "Content-type" : "application/json",
- "Accept" : "application/json",
- "ClientAuth" : ("Basic " if basic_client_auth else "") + self.clientInfo,
- "Environment" : self.environment
- }
- for key in self.urldict.keys():
- if(key.startswith("PYPDP_URL")):
- pypdpVal = self.urldict.get(key)
- if pypdpVal is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- if ";" in pypdpVal:
- pdpDefault = pypdpVal.split(";")
- if pdpDefault is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- else:
- for count in range(0, len(pdpDefault)):
- self.__pdpParam(pdpDefault[count])
- else:
- self.__pdpParam(pypdpVal)
-
- self.logger.info("PolicyEngine url: %s policyheader: %s urldict: %s", \
- self.resturl, json.dumps(self.policyheader), json.dumps(self.urldict))
- if len(self.resturl)==0:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- except:
- self.logger.error("PE300 - Data Issue: missing parameter(s) in the properties file: %s ", filename)
- print("PE300 - Data Issue: missing parameter(s) in the properties file")
- sys.exit(0)
- # Scheme and Handler code to be handled from here.
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("handler should be a object of NotificationHandler class")
- # sys.exit(0)
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- self.logger.error("PE300 - Data Issue: Scheme not a type of NotificationScheme: %s", scheme.name)
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
- sys.exit(0)
-
- def __pdpParam(self,pdpValue):
- """
- Internal Usage for reading PyPDP Parameters
- """
- if pdpValue is None:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- elif "," in pdpValue:
- pdpValues = pdpValue.split(",")
- if (len(pdpValues)==3):
- # 0 is pypdp URL
- self.resturl.append(pdpValues[0])
- # 1 and 2 are user name password
- if pdpValues[1] and pdpValues[2]:
- uid = pdpValues[1].encode('ascii')
- password = pdpValues[2].encode('ascii')
- encoded = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.encoded.append(encoded)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
-
- def getConfigByPolicyName(self, policyName, requestID=None):
- """
- @param policyName: String format of the PolicyFile Name whose configuration is required.
- @return: Returns a List of PolicyConfig Object(s).
- @deprecated: use getConfig instead.
- """
- __policyNameURL = "/getConfigByPolicyName"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__policyNamejson = {}
- self.__policyNamejson['policyName'] = policyName
- self.__cpnResponse = self.__callPDP(__policyNameURL, json.dumps(self.__policyNamejson), __headers, "POST")
- self.__cpnJSON = self.__cpnResponse.json()
- policyConfigs= self.__configResponse(self.__cpnJSON)
- return policyConfigs
-
- def listConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- listConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyNames.
- """
- __configURL = "/listConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- __configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- __cResponse = self.__callPDP(__configURL, json.dumps(__configjson), __headers, "POST")
- return __cResponse.json()
-
-
- def getConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- getConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyConfig Object(s).
- """
- __configURL = "/getConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- self.__cResponse = self.__callPDP(__configURL, json.dumps(self.__configjson), __headers, "POST")
- #self.__configURL = self.resturl+__configURL
- #self.__cResponse = requests.post(self.__configURL, data=json.dumps(self.__configjson), headers = __headers)
- self.__cJSON = self.__cResponse.json()
- policyConfigs= self.__configResponse(self.__cJSON)
- # if we have successfully retrieved a policy we will store the match values.
- matchFound = False
- for policyConfig in policyConfigs:
- if policyConfig._policyConfigStatus == PolicyConfigStatus.CONFIG_RETRIEVED.name:
- # self.logger.info("Policy has been Retrieved !!")
- matchFound = True
- if matchFound:
- __match = {}
- __match["ECOMPName"] = eCOMPComponentName
- if configName is not None:
- __match["ConfigName"] = configName
- if configAttributes is not None:
- __match.update(configAttributes)
- if not self.matchStore:
- self.matchStore.append(__match)
- else:
- __booMatch = False
- for eachDict in self.matchStore:
- if eachDict==__match:
- __booMatch = True
- break
- if __booMatch==False:
- self.matchStore.append(__match)
- return policyConfigs
-
- def __configRequestParametersJSON(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False):
- """ Internal Function to set JSON from configRequestParameters
- """
- json= {}
- if eCOMPComponentName is not None:
- json['ecompName'] = eCOMPComponentName
- if configName is not None:
- json['configName'] = configName
- if configAttributes is not None:
- json['configAttributes'] = configAttributes
- if policyName is not None:
- json['policyName'] = policyName
- json['unique'] = unique
- return json
-
- def __configResponse(self, cJSON):
- """
- Internal function to take the convert JSON to Response Object.
- """
- policyConfigs=[]
- for configJSON in cJSON:
- policyConfig = PolicyConfig()
- policyConfig._policyConfigMessage = configJSON['policyConfigMessage']
- policyConfig._policyConfigStatus = configJSON['policyConfigStatus']
- policyConfig._policyType = configJSON['type']
- policyConfig._policyName = configJSON['policyName']
- policyConfig._policyVersion = configJSON['policyVersion']
- policyConfig._matchingConditions = configJSON['matchingConditions']
- policyConfig._responseAttributes = configJSON['responseAttributes']
- if PolicyType.JSON.name == policyConfig._policyType:
- policyConfig._json = configJSON['config']
- elif PolicyType.XML.name == policyConfig._policyType:
- policyConfig._xml = XML(configJSON['config'])
- elif PolicyType.PROPERTIES.name == policyConfig._policyType:
- policyConfig._properties = configJSON['property']
- elif PolicyType.OTHER.name == policyConfig._policyType:
- policyConfig._other = configJSON['config']
- policyConfigs.append(policyConfig)
- return policyConfigs
-
- def getDecision(self, decisionAttributes, ecompcomponentName, requestID = None):
- """
- getDecision function sends the Decision Attributes to the PDP server and gets the response to the client from PDP.
- @param decisionAttributes: Dictionary of Decision Attributes in Key and Value String formats.
- @param ecompcomponentName:
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a DecisionResponse Object.
- """
- __decisionurl = "/getDecision"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__decisionjson={}
- self.__decisionjson['decisionAttributes'] = decisionAttributes
- self.__decisionjson['ecompcomponentName'] = ecompcomponentName
- self.__dResponse = self.__callPDP(__decisionurl, json.dumps(self.__decisionjson), __headers, "POST")
- self.__dJSON = self.__dResponse.json()
- decisionResponse = DecisionResponse()
- decisionResponse._decision = self.__dJSON['decision']
- decisionResponse._details = self.__dJSON['details']
- return decisionResponse
-
- def sendEvent(self, eventAttributes, requestID=None):
- """
- sendEvent function sends the Event to the PDP server and gets the response to the client from the PDP.
- @param eventAttributes:Dictonary of the EventAttributes in Key and Value String formats.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyResponse Object(s).
- """
- __eventurl = "/sendEvent"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__eventjson = {}
- self.__eventjson['eventAttributes'] = eventAttributes
- #self.__eventjson['pdp_URL'] = self.pdp_url
- self.__eResponse = self.__callPDP(__eventurl, json.dumps(self.__eventjson), __headers, "POST")
- #self.__eventurl = self.resturl+__eventurl
- #self.__eResponse = requests.post(self.__eventurl, data=json.dumps(self.__eventjson), headers = __headers)
- self.__eJSON = self.__eResponse.json()
- policyResponses=[]
- for eventJSON in self.__eJSON:
- policyResponse = PolicyResponse()
- policyResponse._policyResponseMessage = eventJSON['policyResponseMessage']
- policyResponse._policyResponseStatus = eventJSON['policyResponseStatus']
- policyResponse._actionAdvised = eventJSON['actionAdvised']
- policyResponse._actionTaken = eventJSON['actionTaken']
- policyResponse._requestAttributes = eventJSON['requestAttributes']
- policyResponses.append(policyResponse)
- return policyResponses
-
- def createPolicy(self, policyParameters):
- """
- 'createPolicy creates Policy using the policyParameters sent'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __createurl = "/createPolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__createJson = {}
- self.__createJson = self.__policyParametersJSON(policyParameters)
- self.__createResponse = self.__callPDP(__createurl, json.dumps(self.__createJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__createResponse.status_code
- policyChangeResponse._responseMessage = self.__createResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def updatePolicy(self, policyParameters):
- """
- 'updatePolicy updates Policy using the policyParameters sent.'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __updateurl = "/updatePolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__updateJson = {}
- self.__updateJson = self.__policyParametersJSON(policyParameters)
- self.__updateResponse = self.__callPDP(__updateurl, json.dumps(self.__updateJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__updateResponse.status_code
- policyChangeResponse._responseMessage = self.__updateResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def __policyParametersJSON(self, policyParameters):
- """ Internal Function to set JSON from policyParameters Object
- """
- json= {}
- if policyParameters._actionAttribute is not None:
- json['actionAttribute'] = policyParameters._actionAttribute
- if policyParameters._actionPerformer is not None:
- json['actionPerformer'] = policyParameters._actionPerformer
- if policyParameters._attributes is not None:
- json['attributes'] = policyParameters._attributes
- if policyParameters._configBody is not None:
- json['configBody'] = policyParameters._configBody
- if policyParameters._configBodyType is not None:
- json['configBodyType'] = policyParameters._configBodyType
- if policyParameters._configName is not None:
- json['configName'] = policyParameters._configName
- if policyParameters._controllerName is not None:
- json['controllerName'] = policyParameters._controllerName
- if policyParameters._dependencyNames is not None:
- json['dependencyNames'] = policyParameters._dependencyNames
- if policyParameters._dynamicRuleAlgorithmLabels is not None:
- json['dynamicRuleAlgorithmLabels'] = policyParameters._dynamicRuleAlgorithmLabels
- if policyParameters._dynamicRuleAlgorithmField1 is not None:
- json['dynamicRuleAlgorithmField1'] = policyParameters._dynamicRuleAlgorithmField1
- if policyParameters._dynamicRuleAlgorithmField2 is not None:
- json['dynamicRuleAlgorithmField2'] = policyParameters._dynamicRuleAlgorithmField2
- if policyParameters._dynamicRuleAlgorithmFunctions is not None:
- json['dynamicRuleAlgorithmFunctions'] = policyParameters._dynamicRuleAlgorithmFunctions
- if policyParameters._ecompName is not None:
- json['ecompName'] = policyParameters._ecompName
- if policyParameters._extendedOption is not None:
- json['extendedOption'] = policyParameters._extendedOption
- if policyParameters._guard is not None:
- json['guard'] = policyParameters._guard
- if policyParameters._policyClass is not None:
- json['policyClass'] = policyParameters._policyClass
- if policyParameters._policyConfigType is not None:
- json['policyConfigType'] = policyParameters._policyConfigType
- if policyParameters._policyName is not None:
- json['policyName'] = policyParameters._policyName
- if policyParameters._policyDescription is not None:
- json['policyDescription'] = policyParameters._policyDescription
- if policyParameters._priority is not None:
- json['priority'] = policyParameters._priority
- if policyParameters._requestID is not None:
- json['requestID'] = policyParameters._requestID
- if policyParameters._riskLevel is not None:
- json['riskLevel'] = policyParameters._riskLevel
- if policyParameters._riskType is not None:
- json['riskType'] = policyParameters._riskType
- if policyParameters._ruleProvider is not None:
- json['ruleProvider'] = policyParameters._ruleProvider
- if policyParameters._ttlDate is not None:
- json['ttlDate'] = policyParameters._ttlDate
- return json
-
- def pushPolicy(self, pushPolicyParameters, requestID = None):
- """
- 'pushPolicy pushes a policy based on the given Push Policy Parameters. '
- @param pushPolicyParameters: This is an object of PushPolicyParameters which is required as a parameter to this method.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a PolicyChangeResponse Object
- """
- __pushurl = "/pushPolicy"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- try:
- self.__pushJson = {}
- self.__pushJson['pdpGroup'] = pushPolicyParameters._pdpGroup
- self.__pushJson['policyName'] = pushPolicyParameters._policyName
- self.__pushJson['policyType'] = pushPolicyParameters._policyType
- self.__pushResponse = self.__callPDP(__pushurl, json.dumps(self.__pushJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__pushResponse.status_code
- policyChangeResponse._responseMessage = self.__pushResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the pushPolicyParameters Object. It needs to be object of PushPolicyParameters ")
- print("PE300 - Data Issue: pushPolicyParamters object Error")
-
- def deletePolicy(self, deletePolicyParameters):
- """
- 'deletePolicy Deletes a policy or all its version according to the given deletePolicyParameters'
- @param deletePolicyParameters: This is an Object of DeletePolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __deleteurl = "/deletePolicy"
- __createdictionaryurl = "/createDictionaryItem"
- __headers = self.policyheader
- try:
- if deletePolicyParameters._requestID is None:
- deletePolicyParameters._requestID = str(uuid.uuid4())
- self.__deleteJson = {}
- self.__deleteJson['deleteCondition'] = deletePolicyParameters._deleteCondition
- self.__deleteJson['pdpGroup'] = deletePolicyParameters._pdpGroup
- self.__deleteJson['policyComponent'] = deletePolicyParameters._policyComponent
- self.__deleteJson['policyName'] = deletePolicyParameters._policyName
- self.__deleteJson['policyType'] = deletePolicyParameters._policyType
- self.__deleteJson['requestID'] = deletePolicyParameters._requestID
- self.__deleteResponse = self.__callPDP(__deleteurl, json.dumps(self.__deleteJson), self.policyheader, "DELETE")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__deleteResponse.status_code
- policyChangeResponse._responseMessage = self.__deleteResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the deletePolicyParameters Object. It needs to be object of DeletePolicyParameters ")
- print("PE300 - Data Issue: deletePolicyParameters object Error")
-
- def createDictionaryItems(self, dictionaryParameters):
- """
- 'createDictionaryItems adds dictionary items to the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __createdictionaryurl = '/createDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__createResponse = self.__callPDP(__createdictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__createResponse.status_code
- dictionaryResponse._responseMessage = self.__createResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
-
- def updateDictionaryItems(self, dictionaryParameters):
- """
- 'updateDictionaryItems edits dictionary items in the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __updatedictionaryurl = '/updateDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__updateResponse = self.__callPDP(__updatedictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__updateResponse.status_code
- dictionaryResponse._responseMessage = self.__updateResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getDictionaryItems(self, dictionaryParameters):
- """
- 'getDictionaryItems gets all the dictionary items stored in the database for a specified dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method.
- @return: Returns a DictionaryResponse object
- """
- __retrievedictionaryurl = "/getDictionaryItems"
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json = {}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__getResponse = self.__callPDP(__retrievedictionaryurl, json.dumps(self.__json), __headers, "POST")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__getResponse.status_code
- dictionaryResponse._responseMessage = self.__getResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getNotification(self):
- """
- gets the PDPNotification if the appropriate NotificationScheme is selected.
- @return: Returns a PDPNotification Object.
- """
- if ((self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # Manual Client for websocket Code in here.
- if(self.resturl[0].startswith("https")):
- __man_url = self.resturl[0].replace("https","wss")+"notifications"
- else:
- __man_url = self.resturl[0].replace("http","ws")+"notifications"
- __result = self.__manualRequest(__man_url)
- self.logger.debug("Manual Notification with server: %s \n result is: %s" , __man_url , __result)
- # TODO convert the result to PDP Notifications.
- if (self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name):
- # need to add all the values to the PDPNotification..
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if __result is None:
- return None
- if __result['removedPolicies']:
- removedPolicies = []
- for removed in __result['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if __result['loadedPolicies']:
- updatedPolicies = []
- for updated in __result['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
- elif (self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name):
- return self.__checkNotification(__result)
- else:
- return None
-
- def setNotification(self, scheme, handler = None):
- """
- setNotification allows changes to the NotificationScheme and the NotificationHandler.
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- """
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("Error: handler should be a object of NotificationHandler class")
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- self.__startAuto()
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
-
- def clearNotification(self):
- """
- clearNotification ShutsDown the AutoNotification service if running.
- """
- if self.scheme is not None:
- if((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.logger.info("Notification Service Stopped.")
- print("Notification Service is Stopped!!")
-
- def __callPDP(self,urlFunction, jsonData, headerData,method, files= None, params = None):
- """
- This function call is for internal usage purpose only.
- Calls the available PyPDP
- """
- connected = False
- response = None
- errormessage = ''
- for count in range(0, len(self.resturl)):
- try:
- logging.basicConfig(level=logging.DEBUG)
- request_url = self.resturl[0]+ urlFunction
- self.logger.debug("--- Sending Request to : %s",request_url)
- try:
- self.logger.debug("Request ID %s :",headerData["X-ECOMP-RequestID"])
- except:
- if jsonData is not None:
- self.logger.debug("Request ID %s :",json.loads(jsonData)['requestID'])
- self.logger.debug("Request Data is: %s" ,jsonData)
- headerData["Authorization"]= "Basic " + self.encoded[0]
- if(method=="PUT"):
- response = requests.put(request_url, data = jsonData, headers = headerData)
- elif(method=="DELETE"):
- response = requests.delete(request_url, data = jsonData, headers = headerData)
- elif(method=="POST"):
- if params is not None:
- # files = files, params = params,
- response = requests.post(request_url, params = params, headers = headerData)
- else:
- response = requests.post(request_url, data = jsonData, headers = headerData)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #response = requests.post(request_url, data = jsonData, headers = headerData, verify=False)
- self.logger.debug("--- Response is : ---")
- self.logger.debug(response.status_code)
- self.logger.debug(response.headers)
- self.logger.debug(response.text)
- if(response.status_code == 200) :
- connected = True
- self.logger.info("connected to the PyPDP: %s", request_url)
- break
- elif(response.status_code == 202) :
- connected = True
- break
- elif(response.status_code == 400):
- self.logger.debug("PE400 - Schema Issue: Incorrect Params passed: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE400 - Schema Issue: Incorrect Params passed: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 401):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 403):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- else:
- self.logger.debug("PE200 - System Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- except Exception as e:
- print(str(e));
- self.logger.debug("PE200 - System Error: PyPDP Error: %s", self.resturl[0])
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- if(connected):
- if(self.autoURL==None):
- self.__startAuto()
- elif(self.autoURL!= self.resturl[0]):
- self.__startAuto()
- return response
- else:
- self.logger.error("PE200 - System Error: cannot connect to given PYPDPServer(s) %s", self.resturl)
- print(errormessage)
- sys.exit(0)
-
- def __rotatePDP(self):
- self.resturl = collections.deque(self.resturl)
- self.resturl.rotate(-1)
- self.encoded = collections.deque(self.encoded)
- self.encoded.rotate(-1)
-
- def __checkNotification(self, resultJson):
- """
- This function call is for Internal usage purpose only.
- Checks the Notification JSON compares it with the MatchStore and returns the PDPNotification object.
- """
- if not resultJson:
- return None
- if not self.matchStore:
- return None
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if resultJson['removedPolicies']:
- removedPolicies = []
- for removed in resultJson['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if resultJson['updatedPolicies']:
- updatedPolicies = []
- for updated in resultJson['updatedPolicies']:
- updatedPolicy = LoadedPolicy()
- # check if it has matches then it is a Config Policy and compare it with Match Store.
- if updated['matches']:
- # compare the matches with our Stored Matches
- for eachDict in self.matchStore:
- if eachDict==updated['matches']:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- else:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- pDPNotification._loadedPolicies= updatedPolicies
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
-
- def __startAuto(self):
- """
- Starts the Auto Notification Feature..
- """
- if self.scheme is not None:
- if ((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.handler is None:
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- if self.autoURL is None:
- self.autoURL = self.resturl[0]
- elif self.autoURL != self.resturl[0]:
- self.autoURL = self.resturl[0]
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- self.autows = None
- if self.autows is None:
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
- elif self.autows.sock is not None:
- if not (self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.restart = True
- self.__rotatePDP()
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
-
- else:
- #stop the Auto Notification Service if it is running.
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
-
- def __onEvent(self, message):
- """
- Handles the event Notification received.
- """
- message = json.loads(message)
- if self.handler is not None:
- if (self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name):
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if message['removedPolicies']:
- removedPolicies = []
- for removed in message['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if message['loadedPolicies']:
- updatedPolicies = []
- for updated in message['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- # call the Handler.
- self.handler.notificationReceived(pDPNotification)
- elif (self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name):
- # call the handler
- self.handler(self.__checkNotification(message))
-
- def __manualRequest(self,request_url):
- """
- Takes the request_URL given and returns the JSON response back to the Caller.
- """
- ws = create_connection(request_url)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #ws = create_connection(request_url, sslopt={"cert_reqs": ssl.CERT_NONE})
- ws.send("Manual")
- try:
- return json.loads(ws.recv())
- except:
- return None
- ws.close()
- ws.shutdown()
-
- def __onMessage(self, ws,message):
- """Occurs on Event
- """
- self.logger.info("Received AutoNotification message: %s" , message)
- self.__onEvent(message)
-
- def __onError(self, ws, error):
- """Self Restart the Notification Service on Error
- """
- self.logger.error("PE500 - Process Flow Issue: Auto Notification service Error!! : %s" , error)
-
- def __onclose(self, ws):
- """Occurs on Close ? Try to start again in case User didn't do it.
- """
- self.logger.debug("Connection has been Closed. ")
- if not self.mclose:
- self.__startAuto()
- self.mclose = False
-
- def __autoRequest(self, request_url):
- """
- Takes the request_URL and invokes the PolicyEngine method on any receiving a Message
- """
- websocket.enableTrace(True)
- self.autows = websocket.WebSocketApp(request_url, on_message= self.__onMessage, on_close= self.__onclose, on_error= self.__onError)
- # wait for to 5 seconds to restart
- if self.restart:
- time.sleep(5)
- self.autows.run_forever()
-
-class NotificationHandler:
- """
- 'Defines the methods which need to run when an Event or a Notification is received.'
- """
- def notificationReceived(self, notification):
- """
- Will be triggered automatically whenever a Notification is received by the PEP
- @param notification: PDPNotification object which has the information of the Policies.
- @attention: This method must be implemented by the user for AUTO type NotificationScheme
- """
- raise Exception("Unimplemented abstract method: %s" % __functionId(self, 1))
-
-def __functionId(obj, nFramesUp):
- """ Internal Usage only..
- Create a string naming the function n frames up on the stack. """
- fr = sys._getframe(nFramesUp+1)
- co = fr.f_code
- return "%s.%s" % (obj.__class__, co.co_name)
-
-class PolicyConfig:
- """
- 'PolicyConfig is the return object resulted by getConfig Call.'
- """
- def __init__(self):
- self._policyConfigMessage = None
- self._policyConfigStatus = None
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._responseAttributes = None
- self._policyType = None
- self._json = None
- self._xml = None
- self._prop = None
- self._other = None
-
-class PolicyResponse:
- """
- 'PolicyResponse is the return object resulted by sendEvent Call.'
- """
- def __init__(self):
- self._policyResponseStatus = None
- self._policyResponseMessage = None
- self._requestAttributes = None
- self._actionTaken = None
- self._actionAdvised= None
-
-class PDPNotification:
- """
- 'Defines the Notification Event sent from the PDP to PEP Client.'
- """
- def __init__(self):
- self._removedPolicies = None
- self._loadedPolicies = None
- self._notificationType = None
-
-class RemovedPolicy:
- """
- 'Defines the structure of the Removed Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
-
-class LoadedPolicy:
- """
- 'Defines the Structure of the Loaded Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._updateType = None
-
-class PolicyParameters:
- """
- 'Defines the Structure of the Policy to Create or Update'
- """
- def __init__(self):
- self._actionPerformer = None
- self._actionAttribute = None
- self._attributes = None
- self._configBody = None
- self._configBodyType = None
- self._configName = None
- self._controllerName = None
- self._dependencyNames = None
- self._dynamicRuleAlgorithmLabels = None
- self._dynamicRuleAlgorithmFunctions = None
- self._dynamicRuleAlgorithmField1 = None
- self._dynamicRuleAlgorithmField2 = None
- self._ecompName = None
- self._extendedOption = None
- self._guard = None
- self._policyClass = None
- self._policyConfigType = None
- self._policyName = None
- self._policyDescription = None
- self._priority = None
- self._requestID = None
- self._riskLevel = None
- self._riskType = None
- self._ruleProvider = None
- self._ttlDate = None
-
-class PushPolicyParameters:
- """
- 'Defines the Structure of the Push Policy Parameters'
- """
- def __init__(self):
- self._pdpGroup = None
- self._policyName = None
- self._policyType = None
-
-class PolicyChangeResponse:
- """
- 'Defines the Structure of the policy Changes made from PDP'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
-
-class DeletePolicyParameters:
- """
- 'Defines the Structure of the Delete Policy Parameters'
- """
- def __init__(self):
- self._deleteCondition = None
- self._pdpGroup = None
- self._policyComponent = None
- self._policyName = None
- self._policyType = None
- self._requestID = None
-
-class DictionaryParameters:
- """
- 'Defines the Structure of the Dictionary Parameters'
- """
- def __init__(self):
- self._dictionaryType = None
- self._dictionary = None
- self._dictionaryJson = None
- self._requestID = None
-
-class DictionaryResponse:
- """
- 'Defines the Structure of the dictionary response'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
- self._dictionaryJson = None
- self._dictionaryData = None
-
-class DecisionResponse:
- """
- 'Defines the Structure of Decision Response'
- """
- def __init__(self):
- self._decision = None
- self._details = None
-
-class ImportParameters:
- """
- 'Defines the Structure of Policy Model Import'
- """
- def __init__(self):
- self._serviceName = None
- self._description = None
- self._requestID = None
- self._filePath = None
- self._importBody = None
- self._version = None
- self._importType = None
diff --git a/policyhandler/__init__.py b/policyhandler/__init__.py index a3220c4..3315706 100644 --- a/policyhandler/__init__.py +++ b/policyhandler/__init__.py @@ -1,6 +1,5 @@ -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,3 +15,20 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""policyhandler package""" + +class LogWriter(object): + """redirect the standard out + err to the logger""" + def __init__(self, logger_func): + self.logger_func = logger_func + + def write(self, log_line): + """actual writer to be used in place of stdout or stderr""" + log_line = log_line.rstrip() + if log_line: + self.logger_func(log_line) + + def flush(self): + """no real flushing of the buffer""" + pass diff --git a/policyhandler/policy_handler.py b/policyhandler/__main__.py index 50d59bc..04ca657 100644 --- a/policyhandler/policy_handler.py +++ b/policyhandler/__main__.py @@ -1,8 +1,5 @@ -"""run as server: python -m policyhandler/policy_handler""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,28 +16,22 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import sys +""" + run as server: + python -m policyhandler + + that will invoke this module __main__.py in folder of policyhandler +""" + import logging +import sys +from policyhandler import LogWriter from policyhandler.config import Config from policyhandler.onap.audit import Audit +from policyhandler.policy_receiver import PolicyReceiver from policyhandler.web_server import PolicyWeb -from policyhandler.policy_engine import PolicyEngineClient - -class LogWriter(object): - """redirect the standard out + err to the logger""" - def __init__(self, logger_func): - self.logger_func = logger_func - def write(self, log_line): - """actual writer to be used in place of stdout or stderr""" - log_line = log_line.rstrip() - if log_line: - self.logger_func(log_line) - - def flush(self): - """no real flushing of the buffer""" - pass def run_policy_handler(): """main run function for policy-handler""" @@ -51,14 +42,16 @@ def run_policy_handler(): sys.stdout = LogWriter(logger.info) sys.stderr = LogWriter(logger.error) - logger.info("========== run_policy_handler ==========") + logger.info("========== run_policy_handler ========== %s", __package__) Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) logger.info("starting policy_handler with config:") - logger.info(Audit.log_json_dumps(Config.config)) + logger.info(Audit.log_json_dumps(Config.settings)) + + audit = Audit(req_message="start policy handler") + PolicyReceiver.run(audit) + PolicyWeb.run_forever(audit) - PolicyEngineClient.run() - PolicyWeb.run() if __name__ == "__main__": run_policy_handler() diff --git a/policyhandler/config.py b/policyhandler/config.py index 7033096..8e6edf9 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -1,8 +1,5 @@ -"""read and use the config""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,13 +16,13 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import os -import json +"""read and use the config""" + import copy -import re -import base64 +import json import logging import logging.config +import os from .discovery import DiscoveryClient @@ -45,7 +42,7 @@ class Config(object): FIELD_POLICY_ENGINE = "policy_engine" wservice_port = 25577 _logger = logging.getLogger("policy_handler.config") - config = None + settings = None @staticmethod def merge(new_config): @@ -53,23 +50,19 @@ class Config(object): if not new_config: return - if not Config.config: - Config.config = new_config + if not Config.settings: + Config.settings = new_config return new_config = copy.deepcopy(new_config) - Config.config.update(new_config) + Config.settings.update(new_config) @staticmethod def get_system_name(): """find the name of the policy-handler system to be used as the key in consul-kv for config of policy-handler """ - system_name = None - if Config.config: - system_name = Config.config.get(Config.FIELD_SYSTEM) - - return system_name or Config.SERVICE_NAME_POLICY_HANDLER + return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) @staticmethod def discover(): @@ -78,27 +71,14 @@ class Config(object): new_config = DiscoveryClient.get_value(discovery_key) if not new_config or not isinstance(new_config, dict): - Config._logger.warn("unexpected config from discovery: %s", new_config) + Config._logger.warning("unexpected config from discovery: %s", new_config) return Config._logger.debug("loaded config from discovery(%s): %s", \ discovery_key, json.dumps(new_config)) - Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.config)) + Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings)) Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.debug("merged config from discovery: %s", json.dumps(Config.config)) - - @staticmethod - def upload_to_discovery(): - """upload the current config settings to the discovery service""" - if not Config.config or not isinstance(Config.config, dict): - Config._logger.error("unexpected config: %s", Config.config) - return - - discovery_key = Config.get_system_name() - latest_config = json.dumps({Config.SERVICE_NAME_POLICY_HANDLER:Config.config}) - DiscoveryClient.put_kv(discovery_key, latest_config) - Config._logger.debug("uploaded config to discovery(%s): %s", \ - discovery_key, latest_config) + Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings)) @staticmethod def load_from_file(file_path=None): @@ -112,7 +92,7 @@ class Config(object): loaded_config = json.load(config_json) if not loaded_config: - Config._logger.info("config not loaded from file: %s", file_path) + Config._logger.warning("config not loaded from file: %s", file_path) return Config._logger.info("config loaded from file: %s", file_path) @@ -122,44 +102,5 @@ class Config(object): Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) + Config._logger.info("config loaded from file: %s", json.dumps(Config.settings)) return True - -class PolicyEngineConfig(object): - """main config of the application""" - # PATH_TO_PROPERTIES = r'logs/policy_engine.properties' - PATH_TO_PROPERTIES = r'tmp/policy_engine.properties' - PYPDP_URL = "PYPDP_URL = {0}{1}, {2}, {3}\n" - CLIENT_ID = "CLIENT_ID = {0}\n" - CLIENT_KEY = "CLIENT_KEY = {0}\n" - ENVIRONMENT = "ENVIRONMENT = {0}\n" - _logger = logging.getLogger("policy_handler.pe_config") - - @staticmethod - def save_to_file(): - """create the policy_engine.properties for policy-engine client""" - file_path = PolicyEngineConfig.PATH_TO_PROPERTIES - - try: - config = Config.config[Config.FIELD_POLICY_ENGINE] - headers = config["headers"] - remove_basic = re.compile(r"(^Basic )") - client_auth = headers["ClientAuth"] - basic_client_auth = bool(remove_basic.match(client_auth)) - client_parts = base64.b64decode(remove_basic.sub("", client_auth)).split(":") - auth_parts = base64.b64decode(remove_basic.sub("", headers["Authorization"])).split(":") - - props = PolicyEngineConfig.PYPDP_URL.format(config["url"], config["path_pdp"], - auth_parts[0], auth_parts[1]) - props += PolicyEngineConfig.CLIENT_ID.format(client_parts[0]) - props += PolicyEngineConfig.CLIENT_KEY.format(base64.b64encode(client_parts[1])) - props += PolicyEngineConfig.ENVIRONMENT.format(headers["Environment"]) - - with open(file_path, 'w') as prp_file: - prp_file.write(props) - PolicyEngineConfig._logger.info("created %s basic_client_auth %s", - file_path, basic_client_auth) - return basic_client_auth - except IOError: - PolicyEngineConfig._logger.error("failed to save to %s", file_path) - except KeyError: - PolicyEngineConfig._logger.error("unexpected config for %s", Config.FIELD_POLICY_ENGINE) diff --git a/policyhandler/customize/__init__.py b/policyhandler/customize/__init__.py new file mode 100644 index 0000000..7449528 --- /dev/null +++ b/policyhandler/customize/__init__.py @@ -0,0 +1,32 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""polymorphic customizer changes the behavior of predefined methods in policy-handler""" + +from .customizer import Customizer + +class CustomizerUser(object): + """unprotected singleton around Customizer""" + _customizer = None + + @staticmethod + def get_customizer(): + """get instance of customizer""" + if not CustomizerUser._customizer: + CustomizerUser._customizer = Customizer() + return CustomizerUser._customizer diff --git a/policyhandler/customize/customizer.py b/policyhandler/customize/customizer.py new file mode 100644 index 0000000..9ab5967 --- /dev/null +++ b/policyhandler/customize/customizer.py @@ -0,0 +1,34 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""contains the Customizer class with method overrides per company specification""" + +from .customizer_base import CustomizerBase + +class Customizer(CustomizerBase): + """ + the Customizer class inherits CustomizerBase that is owned by ONAP + + :Customizer: class is owned by the company that needs to customize the policy-handler + + :override: any method defined in the CustomizerBase class to customize + the behavior of the policy-handler + + see README.md for the sample of the customizer.py + """ + pass diff --git a/policyhandler/customize/customizer_base.py b/policyhandler/customize/customizer_base.py new file mode 100644 index 0000000..c98a9eb --- /dev/null +++ b/policyhandler/customize/customizer_base.py @@ -0,0 +1,63 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +""" +contains the base class :CustomizerBase: +that defines the signatures and default behavior of the methods called by the policy-handler + +the methods are expected to be overriden by the child class Cutomizer that is company specific + +:do NOT change: this class and/or this file - it is owned by ONAP +""" + +import logging + +class CustomizerBase(object): + """ + base class for Customizer class + + do NOT change this class and/or this file - it is owned by ONAP + + policy-hanlder is using the instance of the child Customizer class to get the overriden methods + + the methods defined in this class are the placeholders and are expected + to be overriden by the Customizer class + """ + + def __init__(self): + """base class for customization contains the default methods""" + self._logger = logging.getLogger("policy_handler.customizer") + self._logger.info("created customizer") + + def get_service_url(self, audit, service_name, service): + """returns the service url when called from DiscoveryClient""" + service_url = "http://{0}:{1}".format( + service.get("ServiceAddress", ""), service.get("ServicePort", "")) + + info = "no customization for service_url: {0} on {1}".format(service_url, service_name) + self._logger.info(info) + audit.info(info) + return service_url + + def get_deploy_handler_kwargs(self, audit): + """returns the optional dict-kwargs for requests.post to deploy_handler""" + info = "no optional kwargs for requests.post to deploy_handler" + self._logger.info(info) + audit.info(info) + kwargs = {} + return kwargs diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 1d50fc3..ea703f4 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -1,8 +1,5 @@ -""" send notification to deploy-handler""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,13 +16,17 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging +""" send notification to deploy-handler""" + import json +import logging + import requests from .config import Config +from .customize import CustomizerUser from .discovery import DiscoveryClient -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode +from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics POOL_SIZE = 1 @@ -37,69 +38,136 @@ class DeployHandler(object): _requests_session = None _config = None _url = None - _url_path = None + _url_policy = None _target_entity = None + _custom_kwargs = None + _server_instance_uuid = None @staticmethod - def _lazy_init(): + def _lazy_init(audit, rediscover=False): """ set static properties """ - if DeployHandler._lazy_inited: + if DeployHandler._lazy_inited and not rediscover: return - DeployHandler._lazy_inited = True - - DeployHandler._requests_session = requests.Session() - DeployHandler._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) - ) - DeployHandler._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) - ) - - DeployHandler._target_entity = Config.config["deploy_handler"] - DeployHandler._url = DiscoveryClient.get_service_url(DeployHandler._target_entity) - DeployHandler._url_path = DeployHandler._url + '/policy' - DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url) + + DeployHandler._custom_kwargs = (CustomizerUser.get_customizer() + .get_deploy_handler_kwargs(audit)) + if (not DeployHandler._custom_kwargs + or not isinstance(DeployHandler._custom_kwargs, dict)): + DeployHandler._custom_kwargs = {} + + if not DeployHandler._requests_session: + DeployHandler._requests_session = requests.Session() + DeployHandler._requests_session.mount( + 'https://', + requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + ) + DeployHandler._requests_session.mount( + 'http://', + requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + ) + + config_dh = Config.settings.get("deploy_handler") + if config_dh and isinstance(config_dh, dict): + # dns based routing to deployment-handler + # config for policy-handler >= 2.4.0 + # "deploy_handler" : { + # "target_entity" : "deployment_handler", + # "url" : "http://deployment_handler:8188" + # } + DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") + DeployHandler._url = config_dh.get("url") + DeployHandler._logger.info("dns based routing to %s: url(%s)", + DeployHandler._target_entity, DeployHandler._url) + + if not DeployHandler._url: + # discover routing to deployment-handler at consul-services + if not isinstance(config_dh, dict): + # config for policy-handler <= 2.3.1 + # "deploy_handler" : "deployment_handler" + DeployHandler._target_entity = str(config_dh or "deployment_handler") + DeployHandler._url = DiscoveryClient.get_service_url(audit, + DeployHandler._target_entity) + + DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' + DeployHandler._logger.info( + "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy) + + DeployHandler._lazy_inited = bool(DeployHandler._url) + @staticmethod - def policy_update(audit, latest_policies): - """ post policy_updated message to deploy-handler """ - DeployHandler._lazy_init() - msg = {"latest_policies":latest_policies} - sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, - targetServiceName=DeployHandler._url_path) - headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id} - - msg_str = json.dumps(msg) + def policy_update(audit, message, rediscover=False): + """ + post policy_updated message to deploy-handler + + returns condition whether it needs to catch_up + """ + if not message: + return + + DeployHandler._lazy_init(audit, rediscover) + metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, + targetServiceName=DeployHandler._url_policy) + headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + + msg_str = json.dumps(message) headers_str = json.dumps(headers) - log_line = "post to deployment-handler {0} msg={1} headers={2}".format( - DeployHandler._url_path, msg_str, headers_str) - sub_aud.metrics_start(log_line) + log_action = "post to {0} at {1}".format( + DeployHandler._target_entity, DeployHandler._url_policy) + log_data = " msg={0} headers={1}".format(msg_str, headers_str) + log_line = log_action + log_data DeployHandler._logger.info(log_line) + metrics.metrics_start(log_line) + + if not DeployHandler._url: + error_msg = "no url found to {0}".format(log_line) + DeployHandler._logger.error(error_msg) + metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + metrics.metrics(error_msg) + return res = None try: res = DeployHandler._requests_session.post( - DeployHandler._url_path, json=msg, headers=headers + DeployHandler._url_policy, json=message, headers=headers, + **DeployHandler._custom_kwargs ) - except requests.exceptions.RequestException as ex: - error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \ - .format(DeployHandler._url_path, str(ex), msg_str, headers_str) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed to {0} {1}: {2}{3}" + .format(log_action, type(ex).__name__, str(ex), log_data)) DeployHandler._logger.exception(error_msg) - sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) - audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) - sub_aud.metrics(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) return - sub_aud.set_http_status_code(res.status_code) + metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - sub_aud.metrics( - "response from deployment-handler to post {0}: {1} msg={2} text={3} headers={4}" \ - .format(DeployHandler._url_path, res.status_code, msg_str, res.text, - res.request.headers)) + log_line = "response {0} from {1}: text={2}{3}" \ + .format(res.status_code, log_action, res.text, log_data) + metrics.metrics(log_line) - if res.status_code == requests.codes.ok: - return res.json() + if res.status_code != requests.codes.ok: + DeployHandler._logger.error(log_line) + return + + DeployHandler._logger.info(log_line) + result = res.json() or {} + prev_server_instance_uuid = DeployHandler._server_instance_uuid + DeployHandler._server_instance_uuid = result.get("server_instance_uuid") + + deployment_handler_changed = (prev_server_instance_uuid + and prev_server_instance_uuid != DeployHandler._server_instance_uuid) + if deployment_handler_changed: + log_line = ("deployment_handler_changed: {1} != {0}" + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) + metrics.info(log_line) + DeployHandler._logger.info(log_line) + + return deployment_handler_changed diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index 7e16b90..ce24c3d 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -1,8 +1,5 @@ -"""client to talk to consul at the standard port 8500""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,11 +16,16 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging -import json +"""client to talk to consul at the standard port 8500""" + import base64 +import json +import logging + import requests +from .customize import CustomizerUser + class DiscoveryClient(object): """talking to consul at http://consul:8500 @@ -40,34 +42,54 @@ class DiscoveryClient(object): """ CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}" CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}" - SERVICE_MASK = "http://{0}:{1}" _logger = logging.getLogger("policy_handler.discovery") - @staticmethod - def get_service_url(service_name): + def get_service_url(audit, service_name): """find the service record in consul""" service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name) - DiscoveryClient._logger.info("discover %s", service_path) + log_line = "discover {0}".format(service_path) + DiscoveryClient._logger.info(log_line) + audit.info(log_line) response = requests.get(service_path) + + log_line = "response {0} for {1}: {2}".format( + response.status_code, service_path, response.text) + DiscoveryClient._logger.info(log_line) + audit.info(log_line) + response.raise_for_status() - service = response.json()[0] - return DiscoveryClient.SERVICE_MASK.format( \ - service["ServiceAddress"], service["ServicePort"]) + + service = response.json() + if not service: + log_line = "failed discover {0}".format(service_path) + DiscoveryClient._logger.error(log_line) + audit.error(log_line) + return + service = service[0] + + service_url = CustomizerUser.get_customizer().get_service_url(audit, service_name, service) + if not service_url: + log_line = "failed to get service_url for {0}".format(service_name) + DiscoveryClient._logger.error(log_line) + audit.error(log_line) + return + + log_line = "got service_url: {0} for {1}".format(service_url, service_name) + DiscoveryClient._logger.info(log_line) + audit.info(log_line) + return service_url @staticmethod def get_value(key): """get the value for the key from consul-kv""" response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key)) response.raise_for_status() - data = response.json()[0] - value = base64.b64decode(data["Value"]).decode("utf-8") - DiscoveryClient._logger.info("consul-kv key=%s data=%s value(%s)", \ - key, json.dumps(data), value) + data = response.json() + if not data: + DiscoveryClient._logger.error("failed get_value %s", key) + return + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) return json.loads(value) - - @staticmethod - def put_kv(key, value): - """put the value under the key in consul-kv""" - response = requests.put(DiscoveryClient.CONSUL_KV_MASK.format(key), data=value) - response.raise_for_status() diff --git a/policyhandler/onap/CommonLogger.py b/policyhandler/onap/CommonLogger.py index a70ca28..957b3aa 100644 --- a/policyhandler/onap/CommonLogger.py +++ b/policyhandler/onap/CommonLogger.py @@ -1,19 +1,8 @@ #!/usr/bin/python
# -*- indent-tabs-mode: nil -*- vi: set expandtab:
-"""ECOMP Common Logging library in Python.
-
-CommonLogger.py
-
-Original Written by: Terry Schmalzried
-Date written: October 1, 2015
-Last updated: December 1, 2016
-
-version 0.8
-"""
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,6 +19,17 @@ version 0.8 #
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+"""ECOMP Common Logging library in Python.
+
+CommonLogger.py
+
+Original Written by: Terry Schmalzried
+Date written: October 1, 2015
+Last updated: December 1, 2016
+
+version 0.8
+"""
+
from __future__ import print_function
import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections
@@ -203,7 +203,7 @@ class CommonLogger: self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
backupCount=self._backupCount, encoding=None, delay=False)
-
+
else:
self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
mode=self._sizeRotateMode, \
@@ -860,7 +860,7 @@ if __name__ == "__main__": parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
args = parser.parse_args()
-
+
spid = str(os.getpid())
if args.keeplogs:
spid = ""
@@ -878,7 +878,7 @@ if __name__ == "__main__": os.remove(f)
except:
pass
- if not args.keeplogs:
+ if not args.keeplogs:
atexit.register(cleanupTmps)
with open(logcfg, "w") as o:
diff --git a/policyhandler/onap/__init__.py b/policyhandler/onap/__init__.py index a3220c4..e9d0246 100644 --- a/policyhandler/onap/__init__.py +++ b/policyhandler/onap/__init__.py @@ -1,6 +1,5 @@ -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index a1df861..a007a26 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -1,15 +1,5 @@ -"""generic class to keep track of request handling - from receiving it through reponse and log all the activities - - call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests - - start each outside request with creation of the Audit object - audit = Audit(request_id=None, headers=None, msg=None) -""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,15 +16,30 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import os +"""generic class to keep track of request handling + from receiving it through reponse and log all the activities + + call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests + + start each outside request with creation of the Audit object + audit = Audit(request_id=None, headers=None, msg=None) +""" + +import copy import json -import uuid +import os +import re +import subprocess +import sys +import threading import time -import copy -from threading import Lock +import uuid +from datetime import datetime from enum import Enum from .CommonLogger import CommonLogger +from .health import Health +from .process_info import ProcessInfo REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" REQUEST_REMOTE_ADDR = "Remote-Addr" @@ -44,21 +49,31 @@ HOSTNAME = "HOSTNAME" AUDIT_REQUESTID = 'requestID' AUDIT_IPADDRESS = 'IPAddress' AUDIT_SERVER = 'server' +AUDIT_TARGET_ENTITY = 'targetEntity' +AUDIT_METRICS = 'metrics' +AUDIT_TOTAL_STATS = 'audit_total_stats' +METRICS_TOTAL_STATS = 'metrics_total_stats' HEADER_CLIENTAUTH = "clientauth" HEADER_AUTHORIZATION = "authorization" +ERROR_CODE = "errorCode" +ERROR_DESCRIPTION = "errorDescription" + + class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 - DATA_NOT_FOUND_ERROR = 400 PERMISSION_UNAUTHORIZED_ERROR = 401 PERMISSION_FORBIDDEN_ERROR = 403 + RESPONSE_ERROR = 400 + DATA_NOT_FOUND_ERROR = 404 SERVER_INTERNAL_ERROR = 500 SERVICE_UNAVAILABLE_ERROR = 503 DATA_ERROR = 1030 SCHEMA_ERROR = 1040 + class AuditResponseCode(Enum): """audit response codes""" SUCCESS = 0 @@ -72,23 +87,25 @@ class AuditResponseCode(Enum): @staticmethod def get_response_code(http_status_code): """calculates the response_code from max_http_status_code""" + response_code = AuditResponseCode.UNKNOWN_ERROR if http_status_code <= AuditHttpCode.HTTP_OK.value: - return AuditResponseCode.SUCCESS - - if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \ - AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: - return AuditResponseCode.PERMISSION_ERROR - if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: - return AuditResponseCode.AVAILABILITY_ERROR - if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: - return AuditResponseCode.BUSINESS_PROCESS_ERROR - if http_status_code in [AuditHttpCode.DATA_ERROR.value, \ - AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: - return AuditResponseCode.DATA_ERROR - if http_status_code == AuditHttpCode.SCHEMA_ERROR.value: - return AuditResponseCode.SCHEMA_ERROR - - return AuditResponseCode.UNKNOWN_ERROR + response_code = AuditResponseCode.SUCCESS + + elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, + AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: + response_code = AuditResponseCode.PERMISSION_ERROR + elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + response_code = AuditResponseCode.AVAILABILITY_ERROR + elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: + response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR + elif http_status_code in [AuditHttpCode.DATA_ERROR.value, + AuditHttpCode.RESPONSE_ERROR.value, + AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + response_code = AuditResponseCode.DATA_ERROR + elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: + response_code = AuditResponseCode.SCHEMA_ERROR + + return response_code @staticmethod def get_human_text(response_code): @@ -97,7 +114,8 @@ class AuditResponseCode(Enum): return "unknown" return response_code.name.lower().replace("_", " ") -class Audit(object): + +class _Audit(object): """put the audit object on stack per each initiating request in the system :request_id: is the X-ECOMP-RequestID for tracing @@ -109,75 +127,90 @@ class Audit(object): :kwargs: - put any request related params into kwargs """ _service_name = "" - _service_instance_UUID = str(uuid.uuid4()) + _service_version = "" + _service_instance_uuid = str(uuid.uuid4()) + _started = datetime.utcnow() + _key_format = re.compile(r"\W") _logger_debug = None _logger_error = None _logger_metrics = None _logger_audit = None + _health = Health() + _py_ver = sys.version.replace("\n", "") + _packages = [] @staticmethod def init(service_name, config_file_path): """init static invariants and loggers""" - Audit._service_name = service_name - Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - - def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): + _Audit._service_name = service_name + _Audit._logger_debug = CommonLogger(config_file_path, "debug", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_error = CommonLogger(config_file_path, "error", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_audit = CommonLogger(config_file_path, "audit", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + ProcessInfo.init() + try: + _Audit._service_version = subprocess.check_output( + ["python", "setup.py", "--version"], universal_newlines=True).strip() + except subprocess.CalledProcessError: + pass + try: + _Audit._packages = list( + filter(None, subprocess.check_output(["pip", "freeze"], + universal_newlines=True).splitlines())) + except subprocess.CalledProcessError: + pass + + + def health(self, full=False): + """returns json for health check""" + utcnow = datetime.utcnow() + health = { + "server" : { + "service_name" : _Audit._service_name, + "service_version" : _Audit._service_version, + "service_instance_uuid" : _Audit._service_instance_uuid + }, + "runtime" : { + "started" : str(_Audit._started), + "utcnow" : str(utcnow), + "uptime" : str(utcnow - _Audit._started), + "active_threads" : ProcessInfo.active_threads(), + "gc" : ProcessInfo.gc_info(full), + "virtual_memory" : ProcessInfo.virtual_memory(), + "process_memory" : ProcessInfo.process_memory() + }, + "stats" : _Audit._health.dump(), + "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} + } + self.info("{} health: {}".format(_Audit._service_name, json.dumps(health))) + return health + + def process_info(self): + """get the debug info on all the threads and memory""" + process_info = ProcessInfo.get_all() + self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info))) + return process_info + + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): """create audit object per each request in the system + :job_name: is the name of the audit job for health stats :request_id: is the X-ECOMP-RequestID for tracing :req_message: is the request message string for logging - :aud_parent: is the parent Audit - used for sub-query metrics to other systems :kwargs: - put any request related params into kwargs """ + self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) self.request_id = request_id self.req_message = req_message or "" - self.aud_parent = aud_parent self.kwargs = kwargs or {} - self.retry_get_config = False self.max_http_status_code = 0 - self._lock = Lock() - - if self.aud_parent: - if not self.request_id: - self.request_id = self.aud_parent.request_id - if not self.req_message: - self.req_message = self.aud_parent.req_message - self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) - else: - headers = self.kwargs.get("headers", {}) - if headers: - if not self.request_id: - self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) - if AUDIT_IPADDRESS not in self.kwargs: - self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) - - created_req = "" - if not self.request_id: - created_req = " with new" - self.request_id = str(uuid.uuid4()) - - self.kwargs[AUDIT_REQUESTID] = self.request_id - - self._started = time.time() - self._start_event = Audit._logger_audit.getStartRecordEvent() - self.metrics_start() + self._lock = threading.Lock() - if not self.aud_parent: - self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ - .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) def merge_all_kwargs(self, **kwargs): """returns the merge of copy of self.kwargs with the param kwargs""" @@ -188,10 +221,15 @@ class Audit(object): def set_http_status_code(self, http_status_code): """accumulate the highest(worst) http status code""" - self._lock.acquire() - if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: - self.max_http_status_code = max(http_status_code, self.max_http_status_code) - self._lock.release() + with self._lock: + if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: + self.max_http_status_code = max(http_status_code, self.max_http_status_code) + + def get_max_http_status_code(self): + """returns the highest(worst) http status code""" + with self._lock: + max_http_status_code = self.max_http_status_code + return max_http_status_code @staticmethod def get_status_code(success): @@ -200,6 +238,71 @@ class Audit(object): return 'COMPLETE' return 'ERROR' + def is_serious_error(self, status_code): + """returns whether the response_code is success and a human text for response code""" + return (AuditResponseCode.PERMISSION_ERROR.value + == AuditResponseCode.get_response_code(status_code).value + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + def _get_response_status(self): + """calculates the response status fields from max_http_status_code""" + max_http_status_code = self.get_max_http_status_code() + response_code = AuditResponseCode.get_response_code(max_http_status_code) + success = (response_code.value == AuditResponseCode.SUCCESS.value) + response_description = AuditResponseCode.get_human_text(response_code) + return success, max_http_status_code, response_code, response_description + + def is_success(self): + """returns whether the response_code is success and a human text for response code""" + success, _, _, _ = self._get_response_status() + return success + + def debug(self, log_line, **kwargs): + """debug - the debug=lowest level of logging""" + _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + + def info(self, log_line, **kwargs): + """debug - the info level of logging""" + _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + + def info_requested(self, result=None, **kwargs): + """info "requested ..." - the info level of logging""" + self.info("requested {0} {1}".format(self.req_message, result or ""), \ + **self.merge_all_kwargs(**kwargs)) + + def warn(self, log_line, error_code=None, **kwargs): + """debug+error - the warn level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.warn(log_line, **all_kwargs) + _Audit._logger_error.warn(log_line, **all_kwargs) + + def error(self, log_line, error_code=None, **kwargs): + """debug+error - the error level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.error(log_line, **all_kwargs) + _Audit._logger_error.error(log_line, **all_kwargs) + + def fatal(self, log_line, error_code=None, **kwargs): + """debug+error - the fatal level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.fatal(log_line, **all_kwargs) + _Audit._logger_error.fatal(log_line, **all_kwargs) + @staticmethod def hide_secrets(obj): """hides the known secret field values of the dictionary""" @@ -210,7 +313,7 @@ class Audit(object): if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: obj[key] = "*" elif isinstance(obj[key], dict): - obj[key] = Audit.hide_secrets(obj[key]) + obj[key] = _Audit.hide_secrets(obj[key]) return obj @@ -220,101 +323,151 @@ class Audit(object): if not isinstance(obj, dict): return json.dumps(obj, **kwargs) - return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) + return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - def get_response_code(self): - """calculates the response_code from max_http_status_code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() - return AuditResponseCode.get_response_code(max_http_status_code) + @staticmethod + def get_elapsed_time(started): + """returns the elapsed time since started in milliseconds""" + return int(round(1000 * (time.time() - (started or 0)))) - def debug(self, log_line, **kwargs): - """debug - the debug=lowest level of logging""" - Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) - def info(self, log_line, **kwargs): - """debug - the info level of logging""" - Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) +class Audit(_Audit): + """Audit class to track the high level operations""" - def info_requested(self, result=None, **kwargs): - """info "requested ..." - the info level of logging""" - self.info("requested {0} {1}".format(self.req_message, result or ""), \ - **self.merge_all_kwargs(**kwargs)) + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): + """create audit object per each request in the system - def warn(self, log_line, **kwargs): - """debug+error - the warn level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.warn(log_line, **all_kwargs) - Audit._logger_error.warn(log_line, **all_kwargs) + :job_name: is the name of the audit job for health stats + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super().__init__(job_name=job_name, + request_id=request_id, + req_message=req_message, + **kwargs) - def error(self, log_line, **kwargs): - """debug+error - the error level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.error(log_line, **all_kwargs) - Audit._logger_error.error(log_line, **all_kwargs) + headers = self.kwargs.get("headers", {}) + if headers: + if not self.request_id: + self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + if AUDIT_IPADDRESS not in self.kwargs: + self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - def fatal(self, log_line, **kwargs): - """debug+error - the fatal level of logging""" + created_req = "" + if not self.request_id: + created_req = " with new" + self.request_id = str(uuid.uuid4()) + + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) + + self.kwargs[AUDIT_REQUESTID] = self.request_id + + _Audit._health.start(self.job_name, self.request_id) + _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id) + + self._started = time.time() + self._start_event = Audit._logger_audit.getStartRecordEvent() + + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + + + def audit_done(self, result=None, **kwargs): + """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.fatal(log_line, **all_kwargs) - Audit._logger_error.fatal(log_line, **all_kwargs) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() + log_line = "{0} {1}".format(self.req_message, result or "").strip() + audit_func = None + timer = _Audit.get_elapsed_time(self._started) + if success: + log_line = "done: {0}".format(log_line) + self.info(log_line, **all_kwargs) + audit_func = _Audit._logger_audit.info + _Audit._health.success(self.job_name, timer, self.request_id) + _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id) + else: + log_line = "failed: {0}".format(log_line) + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + audit_func = _Audit._logger_audit.error + _Audit._health.error(self.job_name, timer, self.request_id) + _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs) + + return (success, max_http_status_code, response_description) + + +class Metrics(_Audit): + """Metrics class to track the calls to outside systems""" + + def __init__(self, aud_parent, **kwargs): + """create audit object per each request in the system + + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super().__init__(job_name=aud_parent.job_name, + request_id=aud_parent.request_id, + req_message=aud_parent.req_message, + **aud_parent.merge_all_kwargs(**kwargs)) + self.aud_parent = aud_parent + self._metrics_name = _Audit._key_format.sub( + '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name)) + + self._metrics_started = None + self._metrics_start_event = None - @staticmethod - def get_elapsed_time(started): - """returns the elapsed time since started in milliseconds""" - return int(round(1000 * (time.time() - started))) def metrics_start(self, log_line=None, **kwargs): """reset metrics timing""" + self.merge_all_kwargs(**kwargs) self._metrics_started = time.time() - self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() + self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent() if log_line: self.info(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._health.start(self._metrics_name, self.request_id) + _Audit._health.start(METRICS_TOTAL_STATS, self.request_id) + def metrics(self, log_line, **kwargs): """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() metrics_func = None + timer = _Audit.get_elapsed_time(self._metrics_started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) - metrics_func = Audit._logger_metrics.info + metrics_func = _Audit._logger_metrics.info + _Audit._health.success(self._metrics_name, timer, self.request_id) + _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id) else: log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) - metrics_func = Audit._logger_metrics.error - - metrics_func(log_line, begTime=self._metrics_start_event, \ - timer=Audit.get_elapsed_time(self._metrics_started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + metrics_func = _Audit._logger_metrics.error + _Audit._health.error(self._metrics_name, timer, self.request_id) + _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id) + + metrics_func( + log_line, + begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()), + timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, **all_kwargs) - self.metrics_start() - - def audit_done(self, result=None, **kwargs): - """debug+audit - the audit=top level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) - log_line = "{0} {1}".format(self.req_message, result or "").strip() - audit_func = None - if success: - log_line = "done: {0}".format(log_line) - self.info(log_line, **all_kwargs) - audit_func = Audit._logger_audit.info - else: - log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) - audit_func = Audit._logger_audit.error - - audit_func(log_line, begTime=self._start_event, \ - timer=Audit.get_elapsed_time(self._started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ - **all_kwargs) + return (success, max_http_status_code, response_description) diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py new file mode 100644 index 0000000..d117255 --- /dev/null +++ b/policyhandler/onap/health.py @@ -0,0 +1,160 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""generic class to keep track of app health""" + +import uuid +from threading import Lock +from datetime import datetime + +class HealthStats(object): + """keep track of stats for calls""" + def __init__(self, name): + """keep track of stats for metrics calls""" + self._name = name or "stats_" + str(uuid.uuid4()) + self._lock = Lock() + + self._call_count = 0 + self._error_count = 0 + self._active_count = 0 + + self._longest_timer = 0 + self._total_timer = 0 + + self._last_success = None + self._last_error = None + self._last_start = None + self._longest_end_ts = None + + self._last_success_request_id = None + self._last_error_request_id = None + self._last_started_request_id = None + self._longest_request_id = None + + + def dump(self): + """returns dict of stats""" + dump = None + with self._lock: + dump = { + "total" : { + "call_count" : self._call_count, + "ave_timer_millisecs" : (float(self._total_timer)/self._call_count + if self._call_count else 0) + }, + "success" : { + "success_count" : (self._call_count - self._error_count), + "last_success" : str(self._last_success), + "last_success_request_id" : self._last_success_request_id + }, + "error" : { + "error_count" : self._error_count, + "last_error" : str(self._last_error), + "last_error_request_id" : self._last_error_request_id + }, + "active" : { + "active_count" : self._active_count, + "last_start" : str(self._last_start), + "last_started_request_id" : self._last_started_request_id + }, + "longest" : { + "longest_timer_millisecs" : self._longest_timer, + "longest_request_id" : self._longest_request_id, + "longest_end" : str(self._longest_end_ts) + } + } + return dump + + + def start(self, request_id=None): + """records the start of active execution""" + with self._lock: + self._active_count += 1 + self._last_start = datetime.utcnow() + self._last_started_request_id = request_id + + + def success(self, timer, request_id=None): + """records the successful execution""" + with self._lock: + self._active_count -= 1 + self._call_count += 1 + self._last_success = datetime.utcnow() + self._last_success_request_id = request_id + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + self._longest_request_id = request_id + self._longest_end_ts = self._last_success + + + def error(self, timer, request_id=None): + """records the errored execution""" + with self._lock: + self._active_count -= 1 + self._call_count += 1 + self._error_count += 1 + self._last_error = datetime.utcnow() + self._last_error_request_id = request_id + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + self._longest_request_id = request_id + self._longest_end_ts = self._last_error + + +class Health(object): + """Health stats for multiple requests""" + def __init__(self): + """Health stats for application""" + self._all_stats = {} + self._lock = Lock() + + + def _add_or_get_stats(self, stats_name): + """add to or get from the ever growing dict of HealthStats""" + with self._lock: + stats = self._all_stats.get(stats_name) + if not stats: + self._all_stats[stats_name] = stats = HealthStats(stats_name) + return stats + + + def start(self, stats_name, request_id=None): + """records the start of execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.start(request_id) + + + def success(self, stats_name, timer, request_id=None): + """records the successful execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.success(timer, request_id) + + + def error(self, stats_name, timer, request_id=None): + """records the error execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.error(timer, request_id) + + + def dump(self): + """returns dict of stats""" + with self._lock: + stats = dict((k, v.dump()) for (k, v) in self._all_stats.items()) + return stats diff --git a/policyhandler/onap/process_info.py b/policyhandler/onap/process_info.py new file mode 100644 index 0000000..a22ccf9 --- /dev/null +++ b/policyhandler/onap/process_info.py @@ -0,0 +1,152 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""generic class to keep get real time info about the current process""" + +import gc +import sys +import threading +import traceback +from functools import wraps + +import psutil + + +def safe_operation(func): + """safequard the function against any exception""" + if not func: + return + + @wraps(func) + def wrapper(*args, **kwargs): + """wrapper around the function""" + try: + return func(*args, **kwargs) + except Exception as ex: + return {type(ex).__name__ : str(ex)} + return wrapper + + +class ProcessInfo(object): + """static class to calculate process info""" + _BIBYTES_SYMBOLS = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') + _BIBYTES_VALS = {} + _inited = False + _lock = threading.Lock() + + @staticmethod + def init(): + """init static constants""" + if ProcessInfo._inited: + return + with ProcessInfo._lock: + if ProcessInfo._inited: + return + + for i, bibytes_symbol in enumerate(ProcessInfo._BIBYTES_SYMBOLS): + ProcessInfo._BIBYTES_VALS[bibytes_symbol] = 1 << (i + 1) * 10 + ProcessInfo._BIBYTES_SYMBOLS = list(reversed(ProcessInfo._BIBYTES_SYMBOLS)) + ProcessInfo._inited = True + + @staticmethod + def bytes_to_bibytes(byte_count): + """converts byte count to human value in kibi-mebi-gibi-...-bytes""" + if byte_count is None: + return "unknown" + if not byte_count or not isinstance(byte_count, int): + return byte_count + ProcessInfo.init() + + for bibytes_symbol in ProcessInfo._BIBYTES_SYMBOLS: + bibytes_value = ProcessInfo._BIBYTES_VALS[bibytes_symbol] + if byte_count >= bibytes_value: + value = float(byte_count) / bibytes_value + return '%.2f %s' % (value, bibytes_symbol) + return "%s B" % byte_count + + @staticmethod + @safe_operation + def process_memory(): + """calculates the memory usage of the current process""" + process = psutil.Process() + with process.oneshot(): + return dict((k, ProcessInfo.bytes_to_bibytes(v)) + for k, v in process.memory_full_info()._asdict().items()) + + + @staticmethod + @safe_operation + def virtual_memory(): + """calculates the virtual memory usage of the whole vm""" + return dict((k, ProcessInfo.bytes_to_bibytes(v)) + for k, v in psutil.virtual_memory()._asdict().items()) + + + @staticmethod + @safe_operation + def active_threads(): + """list of active threads""" + return sorted([thr.name + "(" + str(thr.ident) + ")" for thr in threading.enumerate()]) + + + @staticmethod + @safe_operation + def thread_stacks(): + """returns the current threads with their stack""" + thread_names = dict((thr.ident, thr.name) for thr in threading.enumerate()) + return [ + { + "thread_id" : thread_id, + "thread_name" : thread_names.get(thread_id), + "thread_stack" : [ + { + "filename" : filename, + "lineno" : lineno, + "function" : function_name, + "line" : line.strip() if line else None + } + for filename, lineno, function_name, line in traceback.extract_stack(stack) + ] + } + for thread_id, stack in sys._current_frames().items() + ] + + + @staticmethod + @safe_operation + def gc_info(full=False): + """gets info from garbage collector""" + gc_info = { + "gc_count" : str(gc.get_count()), + "gc_threshold" : str(gc.get_threshold()) + } + if gc.garbage: + gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage] + if full else len(gc.garbage)) + return gc_info + + @staticmethod + def get_all(): + """all info""" + return { + "active_threads" : ProcessInfo.active_threads(), + "gc" : ProcessInfo.gc_info(full=True), + "process_memory" : ProcessInfo.process_memory(), + "virtual_memory" : ProcessInfo.virtual_memory(), + "thread_stacks" : ProcessInfo.thread_stacks() + } diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 640b724..90ede47 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -1,8 +1,5 @@ -"""contants of policy-handler""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,10 +16,19 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. +"""contants of policy-handler""" + POLICY_ID = 'policy_id' POLICY_VERSION = "policyVersion" POLICY_NAME = "policyName" POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' -POLICY_GET_CONFIG = 'getConfig' +CATCH_UP = "catch_up" +AUTO_CATCH_UP = "auto catch_up" +LATEST_POLICIES = "latest_policies" +REMOVED_POLICIES = "removed_policies" +ERRORED_POLICIES = "errored_policies" +ERRORED_SCOPES = "errored_scopes" +SCOPE_PREFIXES = "scope_prefixes" +POLICY_FILTER = "policy_filter" diff --git a/policyhandler/policy_engine.py b/policyhandler/policy_engine.py deleted file mode 100644 index a0ff697..0000000 --- a/policyhandler/policy_engine.py +++ /dev/null @@ -1,103 +0,0 @@ -"""policy-engine-client communicates with policy-engine thru PolicyEngine client object""" - -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -import logging -import re - -from .config import Config, PolicyEngineConfig -from .onap.audit import Audit -from .PolicyEngine import PolicyEngine, NotificationHandler, NotificationScheme -from .policy_updater import PolicyUpdater - -class PolicyNotificationHandler(NotificationHandler): - """handler of the policy-engine push notifications""" - _logger = logging.getLogger("policy_handler.policy_notification") - - def __init__(self, policy_updater): - scope_prefixes = [scope_prefix.replace(".", "[.]") - for scope_prefix in Config.config["scope_prefixes"]] - self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") - PolicyNotificationHandler._logger.info("_policy_scopes %s", self._policy_scopes.pattern) - self._policy_updater = policy_updater - self._policy_updater.start() - - def notificationReceived(self, notification): - if not notification or not notification._loadedPolicies: - return - - policy_names = [loaded._policyName - for loaded in notification._loadedPolicies - if self._policy_scopes.match(loaded._policyName)] - - if not policy_names: - PolicyNotificationHandler._logger.info("no policy updated for scopes %s", - self._policy_scopes.pattern) - return - - audit = Audit(req_message="notificationReceived from PDP") - audit.retry_get_config = True - self._policy_updater.enqueue(audit, policy_names) - -class PolicyEngineClient(object): - """ policy-engine client""" - _logger = logging.getLogger("policy_handler.policy_engine") - _policy_updater = None - _pdp_notification_handler = None - _policy_engine = None - - @staticmethod - def shutdown(audit): - """Shutdown the notification-handler""" - PolicyEngineClient._policy_updater.shutdown(audit) - - @staticmethod - def catch_up(audit): - """bring the latest policies from policy-engine""" - PolicyEngineClient._policy_updater.catch_up(audit) - - @staticmethod - def create_policy_engine_properties(): - """create the policy_engine.properties file from config.json""" - pass - - @staticmethod - def run(): - """Using policy-engine client to talk to policy engine""" - audit = Audit(req_message="start PDP client") - PolicyEngineClient._policy_updater = PolicyUpdater() - PolicyEngineClient._pdp_notification_handler = PolicyNotificationHandler( - PolicyEngineClient._policy_updater) - - sub_aud = Audit(aud_parent=audit) - sub_aud.metrics_start("create client to PDP") - basic_client_auth = PolicyEngineConfig.save_to_file() - PolicyEngineClient._policy_engine = PolicyEngine( - PolicyEngineConfig.PATH_TO_PROPERTIES, - scheme=NotificationScheme.AUTO_ALL_NOTIFICATIONS.name, - handler=PolicyEngineClient._pdp_notification_handler, - basic_client_auth=basic_client_auth - ) - sub_aud.metrics("created client to PDP") - seed_scope = ".*" - PolicyEngineClient._policy_engine.getConfig(policyName=seed_scope) - sub_aud.metrics("seeded client by PDP.getConfig for policyName={0}".format(seed_scope)) - - PolicyEngineClient.catch_up(audit) diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py new file mode 100644 index 0000000..e1584a3 --- /dev/null +++ b/policyhandler/policy_receiver.py @@ -0,0 +1,200 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +""" +policy-receiver communicates with policy-engine +thru web-socket to receive push notifications +on updates and removal of policies. + +on receiving the policy-notifications, the policy-receiver +filters them out by the policy scope(s) provided in policy-handler config +and passes the notifications to policy-updater +""" + +import json +import logging +import re +import time +from threading import Lock, Thread + +import websocket + +from .config import Config +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode +from .policy_updater import PolicyUpdater + +LOADED_POLICIES = 'loadedPolicies' +REMOVED_POLICIES = 'removedPolicies' +POLICY_NAME = 'policyName' +POLICY_VER = 'versionNo' + +class _PolicyReceiver(Thread): + """web-socket to PolicyEngine""" + _logger = logging.getLogger("policy_handler.policy_receiver") + + def __init__(self): + """web-socket inside the thread to receive policy notifications from PolicyEngine""" + Thread.__init__(self, name="policy_receiver") + self.daemon = True + + self._lock = Lock() + self._keep_running = True + + config = Config.settings[Config.FIELD_POLICY_ENGINE] + self.web_socket_url = resturl = config["url"] + config["path_pdp"] + + if resturl.startswith("https:"): + self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + self._web_socket = None + + scope_prefixes = [scope_prefix.replace(".", "[.]") + for scope_prefix in Config.settings["scope_prefixes"]] + self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") + _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern) + self._policy_updater = PolicyUpdater() + self._policy_updater.start() + + def run(self): + """listen on web-socket and pass the policy notifications to policy-updater""" + websocket.enableTrace(True) + restarting = False + while True: + if not self._get_keep_running(): + break + + self._stop_notifications() + + if restarting: + time.sleep(5) + + _PolicyReceiver._logger.info( + "connecting to policy-notifications at: %s", self.web_socket_url) + self._web_socket = websocket.WebSocketApp( + self.web_socket_url, + on_message=self._on_pdp_message, + on_close=self._on_ws_close, + on_error=self._on_ws_error + ) + + _PolicyReceiver._logger.info("waiting for policy-notifications...") + self._web_socket.run_forever() + restarting = True + + _PolicyReceiver._logger.info("exit policy-receiver") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _stop_notifications(self): + """Shuts down the AutoNotification service if running.""" + with self._lock: + if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: + self._web_socket.close() + _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") + + def _on_pdp_message(self, _, message): + """received the notification from PDP""" + try: + _PolicyReceiver._logger.info("Received notification message: %s", message) + if not message: + return + message = json.loads(message) + + if not message or not isinstance(message, dict): + _PolicyReceiver._logger.warning("unexpected message from PDP: %s", + json.dumps(message)) + return + + policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(LOADED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(REMOVED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + + if not policies_updated and not policies_removed: + _PolicyReceiver._logger.info("no policy updated or removed for scopes %s", + self._policy_scopes.pattern) + return + + audit = Audit(job_name="policy_update", + req_message="policy-notification - updated[{0}], removed[{1}]" + .format(len(policies_updated), len(policies_removed)), + retry_get_config=True) + self._policy_updater.enqueue(audit, policies_updated, policies_removed) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_pdp_message", json.dumps(message)) + + _PolicyReceiver._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + + def _on_ws_error(self, _, error): + """report an error""" + _PolicyReceiver._logger.error("policy-notification error: %s", error) + + def _on_ws_close(self, _): + """restart web-socket on close""" + _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + + def shutdown(self, audit): + """Shutdown the policy-receiver""" + _PolicyReceiver._logger.info("shutdown policy-receiver") + with self._lock: + self._keep_running = False + + self._stop_notifications() + + if self.is_alive(): + self.join() + + self._policy_updater.shutdown(audit) + + def catch_up(self, audit): + """need to bring the latest policies to DCAE-Controller""" + self._policy_updater.catch_up(audit) + +class PolicyReceiver(object): + """policy-receiver - static singleton wrapper""" + _policy_receiver = None + + @staticmethod + def shutdown(audit): + """Shutdown the notification-handler""" + PolicyReceiver._policy_receiver.shutdown(audit) + + @staticmethod + def catch_up(audit): + """bring the latest policies from policy-engine""" + PolicyReceiver._policy_receiver.catch_up(audit) + + @staticmethod + def run(audit): + """Using policy-engine client to talk to policy engine""" + PolicyReceiver._policy_receiver = _PolicyReceiver() + PolicyReceiver._policy_receiver.start() + + PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index bf8a31d..c8018f6 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -1,8 +1,5 @@ -"""policy-client communicates with policy-engine thru REST API""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,126 +16,43 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging -import json +"""policy-client communicates with policy-engine thru REST API""" + import copy -import re +import json +import logging import time from multiprocessing.dummy import Pool as ThreadPool + import requests from .config import Config -from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \ - POLICY_BODY, POLICY_CONFIG -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode - -class PolicyUtils(object): - """policy-client utils""" - _logger = logging.getLogger("policy_handler.policy_utils") - _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') - - @staticmethod - def safe_json_parse(json_str): - """try parsing json without exception - returns the json_str back if fails""" - if not json_str: - return json_str - try: - return json.loads(json_str) - except ValueError as err: - PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) - return json_str - - @staticmethod - def extract_policy_id(policy_name): - """ policy_name = policy_id + "." + <version> + "." + <extension> - For instance, - policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml - policy_id = DCAE_alex.Config_alex_policy_number_1 - policy_scope = DCAE_alex - policy_class = Config - policy_version = 3 - type = extension = xml - delimiter = "." - policy_class_delimiter = "_" - policy_name in PAP = DCAE_alex.alex_policy_number_1 - """ - if not policy_name: - return - return PolicyUtils._policy_name_ext.sub('', policy_name) - - @staticmethod - def parse_policy_config(policy): - """try parsing the config in policy.""" - if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]: - policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse( - policy[POLICY_BODY][POLICY_CONFIG]) - return policy - - @staticmethod - def convert_to_policy(policy_config): - """wrap policy_config received from policy-engine with policy_id.""" - if not policy_config or POLICY_NAME not in policy_config \ - or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]: - return - policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME]) - if not policy_id: - return - return {POLICY_ID:policy_id, POLICY_BODY:policy_config} - - @staticmethod - def select_latest_policy(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest version - """ - if not policy_configs: - return - latest_policy_config = {} - for policy_config in policy_configs: - if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \ - or not policy_config[POLICY_VERSION].isdigit(): - continue - if not latest_policy_config \ - or int(policy_config[POLICY_VERSION]) \ - > int(latest_policy_config[POLICY_VERSION]): - latest_policy_config = policy_config - - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) - - @staticmethod - def select_latest_policies(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest versions - """ - if not policy_configs: - return {} - policies = {} - for policy_config in policy_configs: - policy = PolicyUtils.convert_to_policy(policy_config) - if not policy or POLICY_ID not in policy or POLICY_BODY not in policy: - continue - if POLICY_VERSION not in policy[POLICY_BODY] \ - or not policy[POLICY_BODY][POLICY_VERSION] \ - or not policy[POLICY_BODY][POLICY_VERSION].isdigit(): - continue - if policy[POLICY_ID] not in policies: - policies[policy[POLICY_ID]] = policy - continue - if int(policy[POLICY_BODY][POLICY_VERSION]) \ - > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]): - policies[policy[POLICY_ID]] = policy - - for policy_id in policies: - policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES, + POLICY_BODY, POLICY_CONFIG, POLICY_FILTER, + POLICY_ID, POLICY_NAME, SCOPE_PREFIXES) +from .policy_utils import PolicyUtils - return policies class PolicyRest(object): - """ policy-engine """ + """using the http API to policy-engine""" _logger = logging.getLogger("policy_handler.policy_rest") _lazy_inited = False + POLICY_GET_CONFIG = 'getConfig' + PDP_CONFIG_STATUS = "policyConfigStatus" + PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED" + PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" + PDP_CONFIG_MESSAGE = "policyConfigMessage" + PDP_NO_RESPONSE_RECEIVED = "No Response Received" + PDP_STATUS_CODE_ERROR = 400 + PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." + + MIN_VERSION_EXPECTED = "min_version_expected" + IGNORE_POLICY_NAMES = "ignore_policy_names" _requests_session = None - _url = None + _url_get_config = None _headers = None _target_entity = None _thread_pool_size = 4 @@ -154,7 +68,7 @@ class PolicyRest(object): return PolicyRest._lazy_inited = True - config = Config.config[Config.FIELD_POLICY_ENGINE] + config = Config.settings[Config.FIELD_POLICY_ENGINE] pool_size = config.get("pool_connections", 20) PolicyRest._requests_session = requests.Session() @@ -167,191 +81,460 @@ class PolicyRest(object): requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) - PolicyRest._url = config["url"] + config["path_api"] + PolicyRest._url_get_config = config["url"] \ + + config["path_api"] + PolicyRest.POLICY_GET_CONFIG PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) + PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._scope_prefixes = Config.config["scope_prefixes"] + PolicyRest._scope_prefixes = Config.settings["scope_prefixes"] PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ len(PolicyRest._scope_prefixes)) - PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) + PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 + PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) - PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \ - PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \ + PolicyRest._logger.info( + "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", + PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers), json.dumps(PolicyRest._scope_prefixes)) @staticmethod - def _post(audit, path, json_body): + def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" - full_path = PolicyRest._url + path - sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \ - targetServiceName=full_path) + metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, + targetServiceName=PolicyRest._url_get_config) msg = json.dumps(json_body) headers = copy.copy(PolicyRest._headers) - headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id - headers_str = Audit.log_json_dumps(headers) + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + headers_str = Metrics.log_json_dumps(headers) - log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str) - sub_aud.metrics_start(log_line) + log_line = "post to PDP {0} msg={1} headers={2}".format( + PolicyRest._url_get_config, msg, headers_str) + metrics.metrics_start(log_line) PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers) - except requests.exceptions.RequestException as ex: - error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \ - .format(full_path, str(ex), msg, headers_str) + res = PolicyRest._requests_session.post( + PolicyRest._url_get_config, json=json_body, headers=headers) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ( + "failed to post to PDP {0} {1}: {2} msg={3} headers={4}" + .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str)) PolicyRest._logger.exception(error_msg) - sub_aud.set_http_status_code(error_code) + metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) - sub_aud.metrics(error_msg) + metrics.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \ - full_path, res.status_code, msg, res.text, \ - Audit.log_json_dumps(dict(res.request.headers.items()))) - sub_aud.set_http_status_code(res.status_code) - sub_aud.metrics(log_line) + log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( + PolicyRest._url_get_config, res.status_code, msg, res.text, + Metrics.log_json_dumps(dict(res.request.headers.items()))) + + status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) + + if status_code: + return status_code, res_data + + metrics.set_http_status_code(res.status_code) + metrics.metrics(log_line) PolicyRest._logger.info(log_line) + return res.status_code, res_data + @staticmethod + def _extract_pdp_res_data(audit, metrics, log_line, res): + """special treatment of pdp response""" + res_data = None if res.status_code == requests.codes.ok: - return res.status_code, res.json() + res_data = res.json() + + if res_data and isinstance(res_data, list) and len(res_data) == 1: + rslt = res_data[0] + if rslt and not rslt.get(POLICY_NAME): + res_data = None + if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: + error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + error_msg = "unexpected {0}".format(log_line) + + PolicyRest._logger.error(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return error_code, None + return None, res_data + + if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR: + try: + res_data = res.json() + except ValueError: + return None, None + + if not res_data or not isinstance(res_data, list) or len(res_data) != 1: + return None, None + + rslt = res_data[0] + if (rslt and not rslt.get(POLICY_NAME) + and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND + and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): + status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value + info_msg = "not found {0}".format(log_line) + + PolicyRest._logger.info(info_msg) + metrics.set_http_status_code(status_code) + metrics.metrics(info_msg) + return status_code, None + return None, None - return res.status_code, None @staticmethod - def get_latest_policy(aud_policy_name): - """Get the latest policy for the policy_name from the policy-engine""" - PolicyRest._lazy_init() - audit, policy_name = aud_policy_name + def _validate_policy(policy): + """Validates the config on policy""" + if not policy: + return - status_code = 0 + policy_body = policy.get(POLICY_BODY) + + return bool( + policy_body + and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED + and policy_body.get(POLICY_CONFIG) + ) + + @staticmethod + def get_latest_policy(aud_policy_id): + """safely try retrieving the latest policy for the policy_id from the policy-engine""" + audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id + str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format( + policy_id, min_version_expected, json.dumps(ignore_policy_names)) + + try: + return PolicyRest._get_latest_policy( + audit, policy_id, min_version_expected, ignore_policy_names, str_metrics) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policy", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None + + + @staticmethod + def _get_latest_policy(audit, policy_id, + min_version_expected, ignore_policy_names, str_metrics): + """retry several times getting the latest policy for the policy_id from the policy-engine""" + PolicyRest._lazy_init() latest_policy = None - for retry in xrange(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("%s", policy_name) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:policy_name}) - PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \ - json.dumps(policy_configs or [])) - latest_policy = PolicyUtils.select_latest_policy(policy_configs) - if not latest_policy: - audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \ - .format(policy_name, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) - - if latest_policy or not audit.retry_get_config \ - or not PolicyRest._policy_retry_sleep \ - or AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value: + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") + expect_policy_removed = (ignore_policy_names and not min_version_expected) + + for retry in range(1, PolicyRest._policy_retry_count + 1): + PolicyRest._logger.debug(str_metrics) + + done, latest_policy, status_code = PolicyRest._get_latest_policy_once( + audit, policy_id, min_version_expected, ignore_policy_names, + expect_policy_removed) + + if done or not retry_get_config or not PolicyRest._policy_retry_sleep: break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \ - .format(POLICY_GET_CONFIG, retry, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + error_code=AuditResponseCode.DATA_ERROR) break - audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \ - .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn( + "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, + PolicyRest._policy_retry_sleep, policy_id), + error_code=AuditResponseCode.DATA_ERROR) time.sleep(PolicyRest._policy_retry_sleep) + if (expect_policy_removed and not latest_policy + and AuditHttpCode.RESPONSE_ERROR.value == status_code): + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + audit.set_http_status_code(status_code) - if not latest_policy: + if not PolicyRest._validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR) + return latest_policy @staticmethod - def get_latest_policies_by_names(aud_policy_names): + def _get_latest_policy_once(audit, policy_id, + min_version_expected, ignore_policy_names, + expect_policy_removed): + """single attempt to get the latest policy for the policy_id from the policy-engine""" + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) + + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) + + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) + + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + + done = bool(latest_policy + or (expect_policy_removed and not policy_configs) + or audit.is_serious_error(status_code)) + + return done, latest_policy, status_code + + @staticmethod + def get_latest_updated_policies(aud_policy_updates): + """safely try retrieving the latest policies for the list of policy_names""" + audit, policies_updated, policies_removed = aud_policy_updates + if not policies_updated and not policies_removed: + return None, None + + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) + + try: + return PolicyRest._get_latest_updated_policies( + audit, str_metrics, policies_updated, policies_removed) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_updated_policies", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None + + @staticmethod + def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): """Get the latest policies of the list of policy_names from the policy-engine""" PolicyRest._lazy_init() - audit, policy_names = aud_policy_names - if not policy_names: - return + metrics = Metrics( + aud_parent=audit, + targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True - audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \ - len(policy_names), json.dumps(policy_names))) - PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names)) + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.items()] - thread_count = min(PolicyRest._thread_pool_size, len(policy_names)) - apns = [(audit, policy_name) for policy_name in policy_names] policies = None - if thread_count == 1: + apns_length = len(apns) + if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] else: - pool = ThreadPool(thread_count) + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) policies = pool.map(PolicyRest.get_latest_policy, apns) pool.close() pool.join() - audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \ - len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) - policies = dict([(policy[POLICY_ID], policy) \ - for policy in policies if policy and POLICY_ID in policy]) - PolicyRest._logger.debug("policies %s", json.dumps(policies)) - if not policies: + metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.items() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.items() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - return policies + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + @staticmethod - def _get_latest_policies(aud_scope_prefix): - """Get the latest policies of the same scope from the policy-engine""" - audit, scope_prefix = aud_scope_prefix - PolicyRest._logger.debug("%s", scope_prefix) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:scope_prefix + ".*"}) - audit.set_http_status_code(status_code) - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \ - scope_prefix, json.dumps(policy_configs or [])) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) + def _get_latest_policies(aud_policy_filter): + """ + get the latest policies by policy_filter + or all the latest policies of the same scope from the policy-engine + """ + audit, policy_filter, scope_prefix = aud_policy_filter + try: + str_policy_filter = json.dumps(policy_filter) + PolicyRest._logger.debug("%s", str_policy_filter) + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + + PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, + str_policy_filter, json.dumps(policy_configs or [])) + + latest_policies = PolicyUtils.select_latest_policies(policy_configs) + + if (scope_prefix and not policy_configs + and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value): + audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies, scope_prefix + + if not latest_policies: + if not scope_prefix: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies, None + + audit.set_http_status_code(status_code) + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.items(): + if PolicyRest._validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + return valid_policies, errored_policies, None + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})" + .format(audit.request_id, type(ex).__name__, str(ex), + "_get_latest_policies", json.dumps(policy_filter), scope_prefix)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None, scope_prefix - if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \ - scope_prefix, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) - return latest_policies @staticmethod - def get_latest_policies(audit): + def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" - PolicyRest._lazy_init() - PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes)) - - audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes))) - asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] - latest_policies = None - if PolicyRest._scope_thread_pool_size == 1: - latest_policies = [PolicyRest._get_latest_policies(asps[0])] - else: - pool = ThreadPool(PolicyRest._scope_thread_pool_size) - latest_policies = pool.map(PolicyRest._get_latest_policies, asps) - pool.close() - pool.join() + result = {} + aud_policy_filters = None + str_policy_filters = None + str_metrics = None + target_entity = None - audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \ - len(latest_policies), json.dumps(latest_policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) - - latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items()) - PolicyRest._logger.debug("latest_policies: %s %s", \ - json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies)) + try: + PolicyRest._lazy_init() + if policy_filter is not None: + aud_policy_filters = [(audit, policy_filter, None)] + str_policy_filters = json.dumps(policy_filter) + str_metrics = "get_latest_policies for policy_filter {0}".format( + str_policy_filters) + target_entity = ("{0} total get_latest_policies by policy_filter" + .format(PolicyRest._target_entity)) + result[POLICY_FILTER] = copy.deepcopy(policy_filter) + else: + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) + for scope_prefix in PolicyRest._scope_prefixes] + str_policy_filters = json.dumps(PolicyRest._scope_prefixes) + str_metrics = "get_latest_policies for scopes {0} {1}".format( \ + len(PolicyRest._scope_prefixes), str_policy_filters) + target_entity = ("{0} total get_latest_policies by scope_prefixes" + .format(PolicyRest._target_entity)) + result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) + + PolicyRest._logger.debug("%s", str_policy_filters) + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start(str_metrics) + + latest_policies = None + apfs_length = len(aud_policy_filters) + if apfs_length == 1: + latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] + else: + pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) + pool.close() + pool.join() + + metrics.metrics("total result {0}: {1} {2}".format( + str_metrics, len(latest_policies), json.dumps(latest_policies))) + + # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + result[LATEST_POLICIES] = dict( + pair for (vps, _, _) in latest_policies if vps for pair in vps.items()) + + result[ERRORED_POLICIES] = dict( + pair for (_, eps, _) in latest_policies if eps for pair in eps.items()) + + result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) + + PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", + str_policy_filters, json.dumps(result)) + return result + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policies", str_metrics)) - return latest_policies + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 1f1539f..5ba7c29 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -1,8 +1,5 @@ -"""policy-updater thread""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,13 +16,23 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging +"""policy-updater thread""" + +import copy import json -from Queue import Queue -from threading import Thread, Lock +import logging +from queue import Queue +from threading import Lock, Thread -from .policy_rest import PolicyRest +from .config import Config from .deploy_handler import DeployHandler +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode +from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, + REMOVED_POLICIES) +from .policy_rest import PolicyRest +from .policy_utils import Utils +from .step_timer import StepTimer + class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -33,92 +40,259 @@ class PolicyUpdater(Thread): def __init__(self): """init static config of PolicyUpdater.""" - Thread.__init__(self) - self.name = "policy_updater" + Thread.__init__(self, name="policy_updater") self.daemon = True - self._req_shutdown = None - self._req_catch_up = None + self._catch_up_timer = None + self._aud_shutdown = None + self._aud_catch_up = None + + catch_up_config = Config.settings.get(CATCH_UP, {}) + self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 + self._catch_up_skips = 0 + self._catch_up_prev_message = None self._lock = Lock() self._queue = Queue() - def enqueue(self, audit=None, policy_names=None): - """enqueue the policy-names""" - policy_names = policy_names or [] - PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names)) - self._queue.put((audit, policy_names)) + + def enqueue(self, audit=None, policies_updated=None, policies_removed=None): + """enqueue the policy-updates""" + policies_updated = policies_updated or [] + policies_removed = policies_removed or [] + + PolicyUpdater._logger.info( + "enqueue request_id %s policies_updated %s policies_removed %s", + ((audit and audit.request_id) or "none"), + json.dumps(policies_updated), json.dumps(policies_removed)) + + with self._lock: + self._queue.put((audit, policies_updated, policies_removed)) + + + def catch_up(self, audit=None): + """need to bring the latest policies to DCAE-Controller""" + with self._lock: + if not self._aud_catch_up: + self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) + PolicyUpdater._logger.info( + "catch_up %s request_id %s", + self._aud_catch_up.req_message, self._aud_catch_up.request_id + ) + + self.enqueue() + def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policy_names = self._queue.get() - PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names)) + queued_audit, policies_updated, policies_removed = self._queue.get() + PolicyUpdater._logger.info( + "got request_id %s policies_updated %s policies_removed %s", + ((queued_audit and queued_audit.request_id) or "none"), + json.dumps(policies_updated), json.dumps(policies_removed)) + if not self._keep_running(): - self._queue.task_done() break + if self._on_catch_up(): + self._reset_queue() continue - - if not policy_names: - self._queue.task_done() + elif not queued_audit: continue - updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names)) - PolicyUpdater.policy_update(audit, updated_policies) - audit.audit_done() - self._queue.task_done() + self._on_policies_update(queued_audit, policies_updated, policies_removed) PolicyUpdater._logger.info("exit policy-updater") def _keep_running(self): """thread-safe check whether to continue running""" - self._lock.acquire() - keep_running = not self._req_shutdown - self._lock.release() - if self._req_shutdown: - self._req_shutdown.audit_done() + with self._lock: + keep_running = not self._aud_shutdown + + if self._aud_shutdown: + self._aud_shutdown.audit_done() return keep_running - def catch_up(self, audit): - """need to bring the latest policies to DCAE-Controller""" - self._lock.acquire() - self._req_catch_up = audit - self._lock.release() - self.enqueue() + def _run_catch_up_timer(self): + """create and start the catch_up timer""" + if not self._catch_up_interval: + return + + if self._catch_up_timer: + self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) + self._catch_up_timer.next() + return + + self._catch_up_timer = StepTimer( + "catch_up_timer", + self._catch_up_interval, + PolicyUpdater.catch_up, + PolicyUpdater._logger, + self + ) + self._logger.info("started catch_up_timer in %s", self._catch_up_interval) + self._catch_up_timer.start() + + def _pause_catch_up_timer(self): + """pause catch_up_timer""" + if self._catch_up_timer: + self._logger.info("pause catch_up_timer") + self._catch_up_timer.pause() + + def _stop_catch_up_timer(self): + """stop and destroy the catch_up_timer""" + if self._catch_up_timer: + self._logger.info("stopping catch_up_timer") + self._catch_up_timer.stop() + self._catch_up_timer.join() + self._catch_up_timer = None + self._logger.info("stopped catch_up_timer") + + def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): + """try not to send the duplicate messages on auto catchup unless hitting the max count""" + if aud_catch_up.req_message != AUTO_CATCH_UP \ + or self._catch_up_skips >= self._catch_up_max_skips \ + or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): + self._catch_up_skips = 0 + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + log_line = "going to send the catch_up {0}: {1}".format( + aud_catch_up.req_message, + json.dumps(self._catch_up_prev_message) + ) + self._logger.info(log_line) + aud_catch_up.info(log_line) + return True + + self._catch_up_skips += 1 + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format( + self._catch_up_skips, self._catch_up_max_skips, + aud_catch_up.req_message, json.dumps(self._catch_up_prev_message) + ) + self._logger.info(log_line) + aud_catch_up.info(log_line) + return False + + def _reset_queue(self): + """clear up the queue""" + with self._lock: + if not self._aud_catch_up and not self._aud_shutdown: + with self._queue.mutex: + self._queue.queue.clear() def _on_catch_up(self): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - req_catch_up = self._req_catch_up - if self._req_catch_up: - self._req_catch_up = None - self._queue.task_done() - self._queue = Queue() - self._lock.release() - if not req_catch_up: + """bring all the latest policies to DCAE-Controller""" + with self._lock: + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None + + if not aud_catch_up: return False - PolicyUpdater._logger.info("catch_up") - latest_policies = PolicyRest.get_latest_policies(req_catch_up) - PolicyUpdater.policy_update(req_catch_up, latest_policies) - req_catch_up.audit_done() - return True + log_line = "catch_up {0} request_id {1}".format( + aud_catch_up.req_message, aud_catch_up.request_id + ) + try: + PolicyUpdater._logger.info(log_line) + self._pause_catch_up_timer() + + catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) + catch_up_message[CATCH_UP] = True + + catch_up_result = "" + if not aud_catch_up.is_success(): + catch_up_result = "- not sending catch-up to deployment-handler due to errors" + PolicyUpdater._logger.warning(catch_up_result) + elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message): + catch_up_result = "- skipped sending the same policies" + else: + DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + if not aud_catch_up.is_success(): + catch_up_result = "- failed to send catch-up to deployment-handler" + PolicyUpdater._logger.warning(catch_up_result) + else: + catch_up_result = "- sent catch-up to deployment-handler" + success, _, _ = aud_catch_up.audit_done(result=catch_up_result) + PolicyUpdater._logger.info(log_line + " " + catch_up_result) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(aud_catch_up.request_id, type(ex).__name__, str(ex), + "on_catch_up", log_line + " " + catch_up_result)) + + PolicyUpdater._logger.exception(error_msg) + aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + success = False + + if not success: + self._catch_up_prev_message = None + + self._run_catch_up_timer() + + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_catch_up.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) + return success + + + def _on_policies_update(self, queued_audit, policies_updated, policies_removed): + """handle the event of policy-updates from the queue""" + deployment_handler_changed = None + result = "" + + log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( + ((queued_audit and queued_audit.request_id) or "none"), + json.dumps(policies_updated), json.dumps(policies_removed)) + + try: + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (queued_audit, policies_updated, policies_removed)) + + if not queued_audit.is_success(): + result = "- not sending policy-updates to deployment-handler due to errors" + PolicyUpdater._logger.warning(result) + else: + message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} + deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) + if not queued_audit.is_success(): + result = "- failed to send policy-updates to deployment-handler" + PolicyUpdater._logger.warning(result) + else: + result = "- sent policy-updates to deployment-handler" + + success, _, _ = queued_audit.audit_done(result=result) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(queued_audit.request_id, type(ex).__name__, str(ex), + "on_policies_update", log_line + " " + result)) + + PolicyUpdater._logger.exception(error_msg) + queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + success = False + + if deployment_handler_changed: + self._catch_up_prev_message = None + self._pause_catch_up_timer() + self.catch_up() + elif not success: + self._catch_up_prev_message = None - @staticmethod - def policy_update(audit, updated_policies): - """Invoke deploy-handler""" - if updated_policies: - PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies)) - DeployHandler.policy_update(audit, updated_policies) def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") - self._lock.acquire() - self._req_shutdown = audit - self._lock.release() + with self._lock: + self._aud_shutdown = audit self.enqueue() + + self._stop_catch_up_timer() + if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py new file mode 100644 index 0000000..c2a8b07 --- /dev/null +++ b/policyhandler/policy_utils.py @@ -0,0 +1,175 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""policy-client communicates with policy-engine thru REST API""" + +import json +import logging +import re + +from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, + POLICY_VERSION) + +class PolicyUtils(object): + """policy-client utils""" + _logger = logging.getLogger("policy_handler.policy_utils") + _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') + + @staticmethod + def extract_policy_id(policy_name): + """ policy_name = policy_id + "." + <version> + "." + <extension> + For instance, + policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml + policy_id = DCAE_alex.Config_alex_policy_number_1 + policy_scope = DCAE_alex + policy_class = Config + policy_version = 3 + type = extension = xml + delimiter = "." + policy_class_delimiter = "_" + policy_name in PAP = DCAE_alex.alex_policy_number_1 + """ + if not policy_name: + return + return PolicyUtils._policy_name_ext.sub('', policy_name) + + @staticmethod + def parse_policy_config(policy): + """try parsing the config in policy.""" + if not policy: + return policy + config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) + if config: + policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config) + return policy + + @staticmethod + def convert_to_policy(policy_config): + """wrap policy_config received from policy-engine with policy_id.""" + if not policy_config: + return + policy_name = policy_config.get(POLICY_NAME) + policy_version = policy_config.get(POLICY_VERSION) + if not policy_name or not policy_version: + return + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + return + return {POLICY_ID:policy_id, POLICY_BODY:policy_config} + + @staticmethod + def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest version + """ + if not policy_configs: + return + latest_policy_config = {} + for policy_config in policy_configs: + policy_name = policy_config.get(POLICY_NAME) + policy_version = policy_config.get(POLICY_VERSION) + if not policy_name or not policy_version or not policy_version.isdigit(): + continue + policy_version = int(policy_version) + if min_version_expected and policy_version < min_version_expected: + continue + if ignore_policy_names and policy_name in ignore_policy_names: + continue + + if not latest_policy_config \ + or int(latest_policy_config[POLICY_VERSION]) < policy_version: + latest_policy_config = policy_config + + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) + + @staticmethod + def select_latest_policies(policy_configs): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest versions + """ + if not policy_configs: + return {} + policies = {} + for policy_config in policy_configs: + policy = PolicyUtils.convert_to_policy(policy_config) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + if not policy_id or not policy_version or not policy_version.isdigit(): + continue + if policy_id not in policies \ + or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]): + policies[policy_id] = policy + + for policy_id in policies: + policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) + + return policies + +class Utils(object): + """general purpose utils""" + _logger = logging.getLogger("policy_handler.utils") + + @staticmethod + def safe_json_parse(json_str): + """try parsing json without exception - returns the json_str back if fails""" + if not json_str: + return json_str + try: + return json.loads(json_str) + except (ValueError, TypeError) as err: + Utils._logger.warning("unexpected json error(%s): len(%s) str[:100]: (%s)", + str(err), len(json_str), str(json_str)[:100]) + return json_str + + @staticmethod + def are_the_same(body_1, body_2): + """check whether both objects are the same""" + if (body_1 and not body_2) or (not body_1 and body_2): + Utils._logger.debug("only one is empty %s != %s", body_1, body_2) + return False + + if body_1 is None and body_2 is None: + return True + + if isinstance(body_1, list) and isinstance(body_2, list): + if len(body_1) != len(body_2): + Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2)) + return False + + for val_1, val_2 in zip(body_1, body_2): + if not Utils.are_the_same(val_1, val_2): + return False + return True + + if isinstance(body_1, dict) and isinstance(body_2, dict): + if body_1.keys() ^ body_2.keys(): + Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2)) + return False + + for key, val_1 in body_1.items(): + if not Utils.are_the_same(val_1, body_2[key]): + return False + return True + + # ... here when primitive values or mismatched types ... + the_same_values = (body_1 == body_2) + if not the_same_values: + Utils._logger.debug("values %s != %s", body_1, body_2) + return the_same_values diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py new file mode 100644 index 0000000..768b400 --- /dev/null +++ b/policyhandler/step_timer.py @@ -0,0 +1,175 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""periodically callback""" + +import json +from datetime import datetime +from threading import Event, RLock, Thread + + +class StepTimer(Thread): + """call on_time after interval number of seconds, then wait to continue""" + STATE_INIT = "init" + STATE_NEXT = "next" + STATE_STARTED = "started" + STATE_PAUSED = "paused" + STATE_STOPPING = "stopping" + STATE_STOPPED = "stopped" + + def __init__(self, name, interval, on_time, logger, *args, **kwargs): + """create step timer with controlled start. next step and pause""" + Thread.__init__(self, name=name) + self._interval = interval + self._on_time = on_time + self._logger = logger + self._args = args + self._kwargs = kwargs + + self._lock = RLock() + + self._timeout = Event() + self._waiting_for_timeout = False + self._next = Event() + self._paused = False + self._finished = False + + self._request = StepTimer.STATE_INIT + self._req_count = 0 + self._req_time = 0 + self._req_ts = datetime.utcnow() + + self._substep = None + self._substep_time = 0 + self._substep_ts = datetime.utcnow() + + def get_timer_status(self): + """returns timer status""" + with self._lock: + return "{0}[{1}] {2}: timeout({3}), paused({4}), next({5}), finished({6})".format( + self._request, + self._req_count, + self._substep, + self._timeout.is_set(), + self._paused, + self._next.is_set(), + self._finished, + ) + + def next(self): + """continue with the next timeout""" + with self._lock: + self._paused = False + if self._waiting_for_timeout: + self._next.set() + self._timeout.set() + else: + self._next.set() + self._request_to_timer(StepTimer.STATE_NEXT) + + def pause(self): + """pause the timer""" + with self._lock: + self._paused = True + self._next.clear() + self._request_to_timer(StepTimer.STATE_PAUSED) + + def stop(self): + """stop the timer if it hasn't finished yet""" + with self._lock: + self._finished = True + self._timeout.set() + self._next.set() + self._request_to_timer(StepTimer.STATE_STOPPING) + + def _request_to_timer(self, request): + """set the request on the timer""" + with self._lock: + if request in [StepTimer.STATE_NEXT, StepTimer.STATE_STARTED]: + self._req_count += 1 + + prev_req = self._request + self._request = request + utcnow = datetime.utcnow() + self._req_time = (utcnow - self._req_ts).total_seconds() + self._req_ts = utcnow + self._logger.info("{0}[{1}] {2}->{3}".format( + self.name, self._req_time, prev_req, self.get_timer_status())) + + def _log_substep(self, substep): + """log timer substep""" + with self._lock: + self._substep = substep + utcnow = datetime.utcnow() + self._substep_time = (utcnow - self._substep_ts).total_seconds() + self._substep_ts = utcnow + self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status())) + + def _on_time_event(self): + """execute the _on_time event""" + if self._paused: + self._log_substep("paused - skip on_time event") + return + + try: + self._log_substep("on_time event") + self._on_time(*self._args, **self._kwargs) + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})" + .format(self.name, type(ex).__name__, str(ex), "_on_time", + json.dumps(self._args), json.dumps(self._kwargs))) + self._logger.exception(error_msg) + + def run(self): + """loop one step a time until stopped=finished""" + self._request_to_timer(StepTimer.STATE_STARTED) + while True: + with self._lock: + self._timeout.clear() + self._waiting_for_timeout = True + self._log_substep("waiting for timeout {0}...".format(self._interval)) + + interrupted = self._timeout.wait(self._interval) + + with self._lock: + self._waiting_for_timeout = False + self._log_substep("woke up after {0}timeout" + .format((interrupted and "interrupted ") or "")) + + if self._finished: + self._log_substep("finished") + break + + if self._next.is_set() and interrupted: + self._next.clear() + self._log_substep("restart timer") + continue + + self._on_time_event() + + self._log_substep("waiting for next...") + self._next.wait() + with self._lock: + self._next.clear() + self._log_substep("woke up on next") + + if self._finished: + self._log_substep("finished") + break + + self._request_to_timer(StepTimer.STATE_STOPPED) diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 9a5ee19..c49536f 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -1,8 +1,5 @@ -"""web-service for policy_handler""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +16,8 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. +"""web-service for policy_handler""" + import logging import json from datetime import datetime @@ -27,78 +26,195 @@ import cherrypy from .config import Config from .onap.audit import Audit from .policy_rest import PolicyRest -from .policy_engine import PolicyEngineClient +from .policy_receiver import PolicyReceiver class PolicyWeb(object): - """Main static class for REST API of policy-handler""" - logger = logging.getLogger("policy_handler.web_cherrypy") + """run REST API of policy-handler""" + SERVER_HOST = "0.0.0.0" + logger = logging.getLogger("policy_handler.policy_web") @staticmethod - def run(): - """run forever the web-server of the policy-handler""" - PolicyWeb.logger.info("policy_handler web-service at port(%d)...", \ - Config.wservice_port) - cherrypy.config.update({"server.socket_host": "0.0.0.0", \ - 'server.socket_port': Config.wservice_port}) - cherrypy.tree.mount(PolicyLatest(), '/policy_latest') - cherrypy.tree.mount(PoliciesLatest(), '/policies_latest') - cherrypy.tree.mount(PoliciesCatchUp(), '/catch_up') - cherrypy.quickstart(Shutdown(), '/shutdown') - -class Shutdown(object): - """Shutdown the policy-handler""" - @cherrypy.expose - def index(self): - """shutdown event""" - audit = Audit(req_message="get /shutdown", headers=cherrypy.request.headers) - PolicyWeb.logger.info("--------- stopping REST API of policy-handler -----------") - cherrypy.engine.exit() - PolicyEngineClient.shutdown(audit) - PolicyWeb.logger.info("--------- the end -----------") - res = str(datetime.now()) - audit.info_requested(res) - return "goodbye! shutdown requested {0}".format(res) + def run_forever(audit): + """run the web-server of the policy-handler forever""" + PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port) + cherrypy.config.update({"server.socket_host": PolicyWeb.SERVER_HOST, + 'server.socket_port': Config.wservice_port}) + cherrypy.tree.mount(_PolicyWeb(), '/') + audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port)) + cherrypy.engine.start() + +class _PolicyWeb(object): + """REST API of policy-handler""" -class PoliciesLatest(object): - """REST API of the policy-hanlder""" + @staticmethod + def _get_request_info(request): + """returns info about the http request""" + return "{0} {1}{2}".format(request.method, request.script_name, request.path_info) @cherrypy.expose + @cherrypy.popargs('policy_id') @cherrypy.tools.json_out() - def index(self): - """find the latest policy by policy_id or all latest policies""" - audit = Audit(req_message="get /policies_latest", headers=cherrypy.request.headers) - res = PolicyRest.get_latest_policies(audit) or {} - PolicyWeb.logger.info("PoliciesLatest: %s", json.dumps(res)) - audit.audit_done(result=json.dumps(res)) - return res + def policy_latest(self, policy_id): + """retireves the latest policy identified by policy_id""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="get_latest_policy", + req_message=req_info, headers=cherrypy.request.headers) + PolicyWeb.logger.info("%s policy_id=%s headers=%s", \ + req_info, policy_id, json.dumps(cherrypy.request.headers)) + + latest_policy = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} + + PolicyWeb.logger.info("res %s policy_id=%s latest_policy=%s", + req_info, policy_id, json.dumps(latest_policy)) -@cherrypy.popargs('policy_id') -class PolicyLatest(object): - """REST API of the policy-hanlder""" + success, http_status_code, _ = audit.audit_done(result=json.dumps(latest_policy)) + if not success: + cherrypy.response.status = http_status_code + + return latest_policy + + def _get_all_policies_latest(self): + """retireves all the latest policies on GET /policies_latest""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="get_all_policies_latest", + req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + + result = PolicyRest.get_latest_policies(audit) + + PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result @cherrypy.expose @cherrypy.tools.json_out() - def index(self, policy_id): - """find the latest policy by policy_id or all latest policies""" - audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""), \ + @cherrypy.tools.json_in() + def policies_latest(self): + """ + on :GET: retrieves all the latest policies from policy-engine that are + in the scope of the policy-handler. + + on :POST: expects to receive the params that mimic the /getConfig of policy-engine + and retrieves the matching policies from policy-engine and picks the latest on each policy. + + sample request - policies filter + + { + "configAttributes": { "key1":"value1" }, + "configName": "alex_config_name", + "onapName": "DCAE", + "policyName": "DCAE_alex.Config_alex_.*", + "unique": false + } + + sample response + + { + "DCAE_alex.Config_alex_priority": { + "policy_body": { + "policyName": "DCAE_alex.Config_alex_priority.3.xml", + "policyConfigMessage": "Config Retrieved! ", + "responseAttributes": {}, + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "matchingConditions": { + "priority": "10", + "key1": "value1", + "ONAPName": "DCAE", + "ConfigName": "alex_config_name" + }, + "property": null, + "config": { + "foo": "bar", + "foo_updated": "2018-10-06T16:54:31.696Z" + }, + "policyVersion": "3" + }, + "policy_id": "DCAE_alex.Config_alex_priority" + } + } + """ + if cherrypy.request.method == "GET": + return self._get_all_policies_latest() + + if cherrypy.request.method != "POST": + raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method)) + + policy_filter = cherrypy.request.json or {} + str_policy_filter = json.dumps(policy_filter) + + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="get_latest_policies", + req_message="{0}: {1}".format(req_info, str_policy_filter), \ headers=cherrypy.request.headers) - PolicyWeb.logger.info("PolicyLatest policy_id=%s headers=%s", \ - policy_id, json.dumps(cherrypy.request.headers)) - res = PolicyRest.get_latest_policy((audit, policy_id)) or {} - PolicyWeb.logger.info("PolicyLatest policy_id=%s res=%s", policy_id, json.dumps(res)) - audit.audit_done(result=json.dumps(res)) - return res + PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \ + req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) + + result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + + PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \ + req_info, str_policy_filter, json.dumps(result)) + + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) + if not success: + cherrypy.response.status = http_status_code + + return result -class PoliciesCatchUp(object): - """catch up with all DCAE policies""" @cherrypy.expose @cherrypy.tools.json_out() - def index(self): - """catch up with all policies""" - started = str(datetime.now()) - audit = Audit(req_message="get /catch_up", headers=cherrypy.request.headers) - PolicyEngineClient.catch_up(audit) + def catch_up(self): + """catch up with all DCAE policies""" + started = str(datetime.utcnow()) + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="catch_up", req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + PolicyReceiver.catch_up(audit) + res = {"catch-up requested": started} - PolicyWeb.logger.info("PoliciesCatchUp: %s", json.dumps(res)) + PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res)) audit.info_requested(started) return res + + @cherrypy.expose + def shutdown(self): + """Shutdown the policy-handler""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="shutdown", req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s: --- stopping REST API of policy-handler ---", req_info) + + cherrypy.engine.exit() + + PolicyReceiver.shutdown(audit) + + PolicyWeb.logger.info("policy_handler health: {0}" + .format(json.dumps(audit.health(full=True)))) + PolicyWeb.logger.info("%s: --------- the end -----------", req_info) + res = str(datetime.utcnow()) + audit.info_requested(res) + PolicyWeb.logger.info("process_info: %s", json.dumps(audit.process_info())) + return "goodbye! shutdown requested {0}".format(res) + + @cherrypy.expose + @cherrypy.tools.json_out() + def healthcheck(self): + """returns the healthcheck results""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="healthcheck", + req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + + res = audit.health() + + PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res)) + + audit.audit_done(result=json.dumps(res)) + return res |