diff options
author | Alex Shatov <alexs@att.com> | 2018-01-10 11:00:50 -0500 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-01-10 11:07:30 -0500 |
commit | 1369bea8b3c24ef063799acefbfc01659878f034 (patch) | |
tree | 95fa3e5580f62be9c1e1d630ed0c6496b9fb03a2 /policyhandler | |
parent | dc5da5bf63ae4a4ac11b4b5c46407e58da16fbfe (diff) |
variable collection of policies per component
* new feature variable collection of policies per component in DCAE
* massive refactoring
* dissolved the external PolicyEngine.py into policy_receiver.py
- kept only the web-socket communication to PolicyEngine
* new /healthcheck - shows some stats of service running
* Unit Test coverage 75%
Change-Id: I816b7d5713ae0dd88fa73d3656f272b4f3e7946e
Issue-ID: DCAEGEN2-249
Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/PolicyEngine.py | 1219 | ||||
-rw-r--r-- | policyhandler/config.py | 61 | ||||
-rw-r--r-- | policyhandler/deploy_handler.py | 35 | ||||
-rw-r--r-- | policyhandler/discovery.py | 30 | ||||
-rw-r--r-- | policyhandler/onap/audit.py | 142 | ||||
-rw-r--r-- | policyhandler/onap/health.py | 104 | ||||
-rw-r--r-- | policyhandler/policy_consts.py | 2 | ||||
-rw-r--r-- | policyhandler/policy_engine.py | 103 | ||||
-rw-r--r-- | policyhandler/policy_handler.py | 13 | ||||
-rw-r--r-- | policyhandler/policy_receiver.py | 195 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 459 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 107 | ||||
-rw-r--r-- | policyhandler/policy_utils.py | 134 | ||||
-rw-r--r-- | policyhandler/web_server.py | 215 |
14 files changed, 1075 insertions, 1744 deletions
diff --git a/policyhandler/PolicyEngine.py b/policyhandler/PolicyEngine.py deleted file mode 100644 index 7a69894..0000000 --- a/policyhandler/PolicyEngine.py +++ /dev/null @@ -1,1219 +0,0 @@ -"""
-PolicyEngine API for Python
-
-@author: Tarun, Mike
-@version: 0.9
-@change:
- added Enum 'Other' Type and supported it in PolicyConfig class - 1/13
- supporting Remote URL capability to the PolicyEngine as the parameter - 1/13
- Request format has been updated accordingly. No need to send the PDP URLs anymore - 1/26
- Feature where the PolicyEngine chooses available PyPDP among different URLs. 1/26
- Major feature addition required for Notifications 2/17 , Fixed Session issues. 2/25
- Major change in API structure for combining results 3/4.
- Added Security support for Notifications and clearNotification Method 3/18
- newMethod for retrieving configuration using policyFileName 3/20
- logging 3/24
- Notification Bug Fixes 7/21
- basic Auth 7/22
- Notification Changes 9/3
- ECOMP Error codes included 10/1
- -2016
- Major Changes to the Policy Engine API 3/29
- DeletePolicy API and Changes to pushPolicy and getConfig 5/27
- ConfigRequestParmeters and Error code Change 7/11
- New Environment Variable and Client Authorizations Change 7/21
- Changes to the Policy Parameters 8/21
- Allow Clients to use their own Password Protection.
- Dictionary Types and its Fixes.
-"""
-
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import json,sys, os, collections, websocket , logging, time, base64, uuid
-from websocket import create_connection
-from enum import Enum
-from xml.etree.ElementTree import XML
-try:
- # For Python 3.0 and Later
- from pip._vendor import requests
-except ImportError:
- # For backend Support to Python 2's urllib2
- import requests
-try:
- # For Python 3.0 and Later
- from urllib.request import urlopen
-except ImportError:
- # Fall back for Python 2.*
- from urllib2 import urlopen
-try:
- import thread
-except ImportError:
- #Fall Back for Python 2.x
- import _thread as thread
-
-PolicyConfigStatus = Enum('PolicyConfigStatus', 'CONFIG_NOT_FOUND CONFIG_RETRIEVED')
-PolicyType = Enum('PolicyType', 'JSON XML PROPERTIES OTHER')
-PolicyResponseStatus = Enum('PolicyResponseStatus', 'ACTION_ADVISED ACTION_TAKEN NO_ACTION_REQUIRED')
-NotificationType = Enum('NotificationType', 'BOTH UPDATE REMOVE')
-UpdateType = Enum('UpdateType', 'UPDATE NEW')
-NotificationScheme = Enum('NotificationScheme', 'AUTO_ALL_NOTIFICATIONS AUTO_NOTIFICATIONS MANUAL_ALL_NOTIFICATIONS MANUAL_NOTIFICATIONS')
-AttributeType = Enum('AttributeType', 'MATCHING MICROSERVICE RULE SETTINGS')
-PolicyClass = Enum('PolicyClass', 'Action Config Decision')
-PolicyConfigType = Enum('PolicyConfigType', 'Base BRMS_PARAM BRMS_RAW ClosedLoop_Fault ClosedLoop_PM Firewall MicroService Extended')
-DeletePolicyCondition = Enum('DeletePolicyCondition', 'ONE ALL')
-RuleProvider = Enum('RuleProvider', 'Custom AAF GUARD_YAML')
-DictionaryType = Enum('DictionaryType', 'Common Action ClosedLoop Firewall Decision BRMS GOC MicroService DescriptiveScope PolicyScope Enforcer SafePolicy' )
-ImportType = Enum('ImportType', 'MICROSERVICE')
-
-class PolicyEngine:
- """
- PolicyEngine is the Class which needs to be instantiated to call the PDP server.
- It needs the *.properties* file path as the parameter to the constructor.
- """
- def __init__(self, filename, scheme=None, handler=None, clientKey=None, basic_client_auth=True):
- """
- @param filename: String format of the path location of .properties file Could also be A remote URL eg: http://localhost:8080/config.properties
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- @param clientKey: Decoded Client Key to be used by PolicyEngine.
- @attention:The .properties file must contain the PYPDP_URL parameter in it. The parameter can have multiple URLs the PolicyEngine chooses the available PyPDP among them.
- """
- self.filename = filename
- self.urldict = {}
- self.matchStore = []
- self.autoURL = None
- self.scheme = None
- self.handler = None
- self.thread = thread
- self.autows = None
- self.mclose = False
- self.restart = False
- self.logger = logging.getLogger()
- self.resturl= []
- self.encoded= []
- self.clientInfo = None
- self.environment = None
- self.policyheader = {}
- if(filename.startswith("http")):
- try:
- policy_data = urlopen(filename)
- for line in policy_data :
- line = line.decode('utf-8')
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- #print("key= "+key+" Value =" +value )
- self.urldict[key] = value
- except:
- self.logger.error("PE300 - Data Issue: Error with the Config URL: %s" , filename )
- print("PE300 - Data Issue: Config Properties URL Error")
- sys.exit(0)
- else :
- fileExtension = os.path.splitext(filename)
- if(fileExtension[1]!=".properties"):
- self.logger.error("PE300 - Data Issue: File is not in properties format: %s", filename)
- print("PE300 - Data Issue: Not a .properties file!")
- sys.exit(0)
- try :
- with open(self.filename, 'r') as f:
- for line in f:
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- # self.logger.info("key=%s Value=%s", key, value)
- self.urldict[key] = value
- except FileNotFoundError:
- self.logger.error("PE300 - Data Issue: File Not found: %s", filename)
- print("PE300 - Data Issue: File Doesn't exist in the given Location")
- sys.exit(0)
- #TODO logic for best available PyPDP servers
- try:
- self.urldict = collections.OrderedDict(sorted(self.urldict.items()))
- clientID = self.urldict.get("CLIENT_ID")
- # self.logger.info("clientID decoded %s", base64.b64decode(clientID).decode("utf-8"))
- # client_parts = base64.b64decode(clientID).split(":")
- # client_parts = clientID.split(":")
- # self.logger.info("ClientAuth:Basic %s", base64.b64encode(clientID))
- # self.logger.info("CLIENT_ID[0] = %s", client_parts[0])
- # self.logger.info("CLIENT_ID[0] base64 = %s", base64.b64encode(client_parts[0]))
- # self.logger.info("CLIENT_KEY base64 = %s", base64.b64encode(client_parts[1]))
- if(clientKey is None):
- try:
- client = base64.b64decode(self.urldict.get("CLIENT_KEY")).decode("utf-8")
- except Exception:
- self.logger.warn("PE300 - Data Issue: CLIENT_KEY parameter is not in the required encoded Format taking Value as clear Text")
- client = self.urldict.get("CLIENT_KEY")
- else:
- client = clientKey
- if(clientID is None or client is None):
- self.logger.error("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file")
- sys.exit(0)
- else:
- uid = clientID.encode('ascii')
- password = client.encode('ascii')
- self.clientInfo = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.environment = self.urldict.get("ENVIRONMENT")
- if(self.environment is None):
- self.logger.info("Missing Environment Variable setting to Default Value.")
- self.environment = "DEVL"
- self.policyheader = {
- "Content-type" : "application/json",
- "Accept" : "application/json",
- "ClientAuth" : ("Basic " if basic_client_auth else "") + self.clientInfo,
- "Environment" : self.environment
- }
- for key in self.urldict.keys():
- if(key.startswith("PYPDP_URL")):
- pypdpVal = self.urldict.get(key)
- if pypdpVal is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- if ";" in pypdpVal:
- pdpDefault = pypdpVal.split(";")
- if pdpDefault is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- else:
- for count in range(0, len(pdpDefault)):
- self.__pdpParam(pdpDefault[count])
- else:
- self.__pdpParam(pypdpVal)
-
- self.logger.info("PolicyEngine url: %s policyheader: %s urldict: %s", \
- self.resturl, json.dumps(self.policyheader), json.dumps(self.urldict))
- if len(self.resturl)==0:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- except:
- self.logger.error("PE300 - Data Issue: missing parameter(s) in the properties file: %s ", filename)
- print("PE300 - Data Issue: missing parameter(s) in the properties file")
- sys.exit(0)
- # Scheme and Handler code to be handled from here.
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("handler should be a object of NotificationHandler class")
- # sys.exit(0)
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- self.logger.error("PE300 - Data Issue: Scheme not a type of NotificationScheme: %s", scheme.name)
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
- sys.exit(0)
-
- def __pdpParam(self,pdpValue):
- """
- Internal Usage for reading PyPDP Parameters
- """
- if pdpValue is None:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- elif "," in pdpValue:
- pdpValues = pdpValue.split(",")
- if (len(pdpValues)==3):
- # 0 is pypdp URL
- self.resturl.append(pdpValues[0])
- # 1 and 2 are user name password
- if pdpValues[1] and pdpValues[2]:
- uid = pdpValues[1].encode('ascii')
- password = pdpValues[2].encode('ascii')
- encoded = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.encoded.append(encoded)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
-
- def getConfigByPolicyName(self, policyName, requestID=None):
- """
- @param policyName: String format of the PolicyFile Name whose configuration is required.
- @return: Returns a List of PolicyConfig Object(s).
- @deprecated: use getConfig instead.
- """
- __policyNameURL = "/getConfigByPolicyName"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__policyNamejson = {}
- self.__policyNamejson['policyName'] = policyName
- self.__cpnResponse = self.__callPDP(__policyNameURL, json.dumps(self.__policyNamejson), __headers, "POST")
- self.__cpnJSON = self.__cpnResponse.json()
- policyConfigs= self.__configResponse(self.__cpnJSON)
- return policyConfigs
-
- def listConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- listConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyNames.
- """
- __configURL = "/listConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- __configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- __cResponse = self.__callPDP(__configURL, json.dumps(__configjson), __headers, "POST")
- return __cResponse.json()
-
-
- def getConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- getConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyConfig Object(s).
- """
- __configURL = "/getConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- self.__cResponse = self.__callPDP(__configURL, json.dumps(self.__configjson), __headers, "POST")
- #self.__configURL = self.resturl+__configURL
- #self.__cResponse = requests.post(self.__configURL, data=json.dumps(self.__configjson), headers = __headers)
- self.__cJSON = self.__cResponse.json()
- policyConfigs= self.__configResponse(self.__cJSON)
- # if we have successfully retrieved a policy we will store the match values.
- matchFound = False
- for policyConfig in policyConfigs:
- if policyConfig._policyConfigStatus == PolicyConfigStatus.CONFIG_RETRIEVED.name:
- # self.logger.info("Policy has been Retrieved !!")
- matchFound = True
- if matchFound:
- __match = {}
- __match["ECOMPName"] = eCOMPComponentName
- if configName is not None:
- __match["ConfigName"] = configName
- if configAttributes is not None:
- __match.update(configAttributes)
- if not self.matchStore:
- self.matchStore.append(__match)
- else:
- __booMatch = False
- for eachDict in self.matchStore:
- if eachDict==__match:
- __booMatch = True
- break
- if __booMatch==False:
- self.matchStore.append(__match)
- return policyConfigs
-
- def __configRequestParametersJSON(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False):
- """ Internal Function to set JSON from configRequestParameters
- """
- json= {}
- if eCOMPComponentName is not None:
- json['ecompName'] = eCOMPComponentName
- if configName is not None:
- json['configName'] = configName
- if configAttributes is not None:
- json['configAttributes'] = configAttributes
- if policyName is not None:
- json['policyName'] = policyName
- json['unique'] = unique
- return json
-
- def __configResponse(self, cJSON):
- """
- Internal function to take the convert JSON to Response Object.
- """
- policyConfigs=[]
- for configJSON in cJSON:
- policyConfig = PolicyConfig()
- policyConfig._policyConfigMessage = configJSON['policyConfigMessage']
- policyConfig._policyConfigStatus = configJSON['policyConfigStatus']
- policyConfig._policyType = configJSON['type']
- policyConfig._policyName = configJSON['policyName']
- policyConfig._policyVersion = configJSON['policyVersion']
- policyConfig._matchingConditions = configJSON['matchingConditions']
- policyConfig._responseAttributes = configJSON['responseAttributes']
- if PolicyType.JSON.name == policyConfig._policyType:
- policyConfig._json = configJSON['config']
- elif PolicyType.XML.name == policyConfig._policyType:
- policyConfig._xml = XML(configJSON['config'])
- elif PolicyType.PROPERTIES.name == policyConfig._policyType:
- policyConfig._properties = configJSON['property']
- elif PolicyType.OTHER.name == policyConfig._policyType:
- policyConfig._other = configJSON['config']
- policyConfigs.append(policyConfig)
- return policyConfigs
-
- def getDecision(self, decisionAttributes, ecompcomponentName, requestID = None):
- """
- getDecision function sends the Decision Attributes to the PDP server and gets the response to the client from PDP.
- @param decisionAttributes: Dictionary of Decision Attributes in Key and Value String formats.
- @param ecompcomponentName:
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a DecisionResponse Object.
- """
- __decisionurl = "/getDecision"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__decisionjson={}
- self.__decisionjson['decisionAttributes'] = decisionAttributes
- self.__decisionjson['ecompcomponentName'] = ecompcomponentName
- self.__dResponse = self.__callPDP(__decisionurl, json.dumps(self.__decisionjson), __headers, "POST")
- self.__dJSON = self.__dResponse.json()
- decisionResponse = DecisionResponse()
- decisionResponse._decision = self.__dJSON['decision']
- decisionResponse._details = self.__dJSON['details']
- return decisionResponse
-
- def sendEvent(self, eventAttributes, requestID=None):
- """
- sendEvent function sends the Event to the PDP server and gets the response to the client from the PDP.
- @param eventAttributes:Dictonary of the EventAttributes in Key and Value String formats.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyResponse Object(s).
- """
- __eventurl = "/sendEvent"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__eventjson = {}
- self.__eventjson['eventAttributes'] = eventAttributes
- #self.__eventjson['pdp_URL'] = self.pdp_url
- self.__eResponse = self.__callPDP(__eventurl, json.dumps(self.__eventjson), __headers, "POST")
- #self.__eventurl = self.resturl+__eventurl
- #self.__eResponse = requests.post(self.__eventurl, data=json.dumps(self.__eventjson), headers = __headers)
- self.__eJSON = self.__eResponse.json()
- policyResponses=[]
- for eventJSON in self.__eJSON:
- policyResponse = PolicyResponse()
- policyResponse._policyResponseMessage = eventJSON['policyResponseMessage']
- policyResponse._policyResponseStatus = eventJSON['policyResponseStatus']
- policyResponse._actionAdvised = eventJSON['actionAdvised']
- policyResponse._actionTaken = eventJSON['actionTaken']
- policyResponse._requestAttributes = eventJSON['requestAttributes']
- policyResponses.append(policyResponse)
- return policyResponses
-
- def createPolicy(self, policyParameters):
- """
- 'createPolicy creates Policy using the policyParameters sent'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __createurl = "/createPolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__createJson = {}
- self.__createJson = self.__policyParametersJSON(policyParameters)
- self.__createResponse = self.__callPDP(__createurl, json.dumps(self.__createJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__createResponse.status_code
- policyChangeResponse._responseMessage = self.__createResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def updatePolicy(self, policyParameters):
- """
- 'updatePolicy updates Policy using the policyParameters sent.'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __updateurl = "/updatePolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__updateJson = {}
- self.__updateJson = self.__policyParametersJSON(policyParameters)
- self.__updateResponse = self.__callPDP(__updateurl, json.dumps(self.__updateJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__updateResponse.status_code
- policyChangeResponse._responseMessage = self.__updateResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def __policyParametersJSON(self, policyParameters):
- """ Internal Function to set JSON from policyParameters Object
- """
- json= {}
- if policyParameters._actionAttribute is not None:
- json['actionAttribute'] = policyParameters._actionAttribute
- if policyParameters._actionPerformer is not None:
- json['actionPerformer'] = policyParameters._actionPerformer
- if policyParameters._attributes is not None:
- json['attributes'] = policyParameters._attributes
- if policyParameters._configBody is not None:
- json['configBody'] = policyParameters._configBody
- if policyParameters._configBodyType is not None:
- json['configBodyType'] = policyParameters._configBodyType
- if policyParameters._configName is not None:
- json['configName'] = policyParameters._configName
- if policyParameters._controllerName is not None:
- json['controllerName'] = policyParameters._controllerName
- if policyParameters._dependencyNames is not None:
- json['dependencyNames'] = policyParameters._dependencyNames
- if policyParameters._dynamicRuleAlgorithmLabels is not None:
- json['dynamicRuleAlgorithmLabels'] = policyParameters._dynamicRuleAlgorithmLabels
- if policyParameters._dynamicRuleAlgorithmField1 is not None:
- json['dynamicRuleAlgorithmField1'] = policyParameters._dynamicRuleAlgorithmField1
- if policyParameters._dynamicRuleAlgorithmField2 is not None:
- json['dynamicRuleAlgorithmField2'] = policyParameters._dynamicRuleAlgorithmField2
- if policyParameters._dynamicRuleAlgorithmFunctions is not None:
- json['dynamicRuleAlgorithmFunctions'] = policyParameters._dynamicRuleAlgorithmFunctions
- if policyParameters._ecompName is not None:
- json['ecompName'] = policyParameters._ecompName
- if policyParameters._extendedOption is not None:
- json['extendedOption'] = policyParameters._extendedOption
- if policyParameters._guard is not None:
- json['guard'] = policyParameters._guard
- if policyParameters._policyClass is not None:
- json['policyClass'] = policyParameters._policyClass
- if policyParameters._policyConfigType is not None:
- json['policyConfigType'] = policyParameters._policyConfigType
- if policyParameters._policyName is not None:
- json['policyName'] = policyParameters._policyName
- if policyParameters._policyDescription is not None:
- json['policyDescription'] = policyParameters._policyDescription
- if policyParameters._priority is not None:
- json['priority'] = policyParameters._priority
- if policyParameters._requestID is not None:
- json['requestID'] = policyParameters._requestID
- if policyParameters._riskLevel is not None:
- json['riskLevel'] = policyParameters._riskLevel
- if policyParameters._riskType is not None:
- json['riskType'] = policyParameters._riskType
- if policyParameters._ruleProvider is not None:
- json['ruleProvider'] = policyParameters._ruleProvider
- if policyParameters._ttlDate is not None:
- json['ttlDate'] = policyParameters._ttlDate
- return json
-
- def pushPolicy(self, pushPolicyParameters, requestID = None):
- """
- 'pushPolicy pushes a policy based on the given Push Policy Parameters. '
- @param pushPolicyParameters: This is an object of PushPolicyParameters which is required as a parameter to this method.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a PolicyChangeResponse Object
- """
- __pushurl = "/pushPolicy"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- try:
- self.__pushJson = {}
- self.__pushJson['pdpGroup'] = pushPolicyParameters._pdpGroup
- self.__pushJson['policyName'] = pushPolicyParameters._policyName
- self.__pushJson['policyType'] = pushPolicyParameters._policyType
- self.__pushResponse = self.__callPDP(__pushurl, json.dumps(self.__pushJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__pushResponse.status_code
- policyChangeResponse._responseMessage = self.__pushResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the pushPolicyParameters Object. It needs to be object of PushPolicyParameters ")
- print("PE300 - Data Issue: pushPolicyParamters object Error")
-
- def deletePolicy(self, deletePolicyParameters):
- """
- 'deletePolicy Deletes a policy or all its version according to the given deletePolicyParameters'
- @param deletePolicyParameters: This is an Object of DeletePolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __deleteurl = "/deletePolicy"
- __createdictionaryurl = "/createDictionaryItem"
- __headers = self.policyheader
- try:
- if deletePolicyParameters._requestID is None:
- deletePolicyParameters._requestID = str(uuid.uuid4())
- self.__deleteJson = {}
- self.__deleteJson['deleteCondition'] = deletePolicyParameters._deleteCondition
- self.__deleteJson['pdpGroup'] = deletePolicyParameters._pdpGroup
- self.__deleteJson['policyComponent'] = deletePolicyParameters._policyComponent
- self.__deleteJson['policyName'] = deletePolicyParameters._policyName
- self.__deleteJson['policyType'] = deletePolicyParameters._policyType
- self.__deleteJson['requestID'] = deletePolicyParameters._requestID
- self.__deleteResponse = self.__callPDP(__deleteurl, json.dumps(self.__deleteJson), self.policyheader, "DELETE")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__deleteResponse.status_code
- policyChangeResponse._responseMessage = self.__deleteResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the deletePolicyParameters Object. It needs to be object of DeletePolicyParameters ")
- print("PE300 - Data Issue: deletePolicyParameters object Error")
-
- def createDictionaryItems(self, dictionaryParameters):
- """
- 'createDictionaryItems adds dictionary items to the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __createdictionaryurl = '/createDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__createResponse = self.__callPDP(__createdictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__createResponse.status_code
- dictionaryResponse._responseMessage = self.__createResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
-
- def updateDictionaryItems(self, dictionaryParameters):
- """
- 'updateDictionaryItems edits dictionary items in the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __updatedictionaryurl = '/updateDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__updateResponse = self.__callPDP(__updatedictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__updateResponse.status_code
- dictionaryResponse._responseMessage = self.__updateResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getDictionaryItems(self, dictionaryParameters):
- """
- 'getDictionaryItems gets all the dictionary items stored in the database for a specified dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method.
- @return: Returns a DictionaryResponse object
- """
- __retrievedictionaryurl = "/getDictionaryItems"
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json = {}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__getResponse = self.__callPDP(__retrievedictionaryurl, json.dumps(self.__json), __headers, "POST")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__getResponse.status_code
- dictionaryResponse._responseMessage = self.__getResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getNotification(self):
- """
- gets the PDPNotification if the appropriate NotificationScheme is selected.
- @return: Returns a PDPNotification Object.
- """
- if ((self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # Manual Client for websocket Code in here.
- if(self.resturl[0].startswith("https")):
- __man_url = self.resturl[0].replace("https","wss")+"notifications"
- else:
- __man_url = self.resturl[0].replace("http","ws")+"notifications"
- __result = self.__manualRequest(__man_url)
- self.logger.debug("Manual Notification with server: %s \n result is: %s" , __man_url , __result)
- # TODO convert the result to PDP Notifications.
- if (self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name):
- # need to add all the values to the PDPNotification..
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if __result is None:
- return None
- if __result['removedPolicies']:
- removedPolicies = []
- for removed in __result['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if __result['loadedPolicies']:
- updatedPolicies = []
- for updated in __result['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
- elif (self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name):
- return self.__checkNotification(__result)
- else:
- return None
-
- def setNotification(self, scheme, handler = None):
- """
- setNotification allows changes to the NotificationScheme and the NotificationHandler.
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- """
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("Error: handler should be a object of NotificationHandler class")
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- self.__startAuto()
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
-
- def clearNotification(self):
- """
- clearNotification ShutsDown the AutoNotification service if running.
- """
- if self.scheme is not None:
- if((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.logger.info("Notification Service Stopped.")
- print("Notification Service is Stopped!!")
-
- def __callPDP(self,urlFunction, jsonData, headerData,method, files= None, params = None):
- """
- This function call is for internal usage purpose only.
- Calls the available PyPDP
- """
- connected = False
- response = None
- errormessage = ''
- for count in range(0, len(self.resturl)):
- try:
- logging.basicConfig(level=logging.DEBUG)
- request_url = self.resturl[0]+ urlFunction
- self.logger.debug("--- Sending Request to : %s",request_url)
- try:
- self.logger.debug("Request ID %s :",headerData["X-ECOMP-RequestID"])
- except:
- if jsonData is not None:
- self.logger.debug("Request ID %s :",json.loads(jsonData)['requestID'])
- self.logger.debug("Request Data is: %s" ,jsonData)
- headerData["Authorization"]= "Basic " + self.encoded[0]
- if(method=="PUT"):
- response = requests.put(request_url, data = jsonData, headers = headerData)
- elif(method=="DELETE"):
- response = requests.delete(request_url, data = jsonData, headers = headerData)
- elif(method=="POST"):
- if params is not None:
- # files = files, params = params,
- response = requests.post(request_url, params = params, headers = headerData)
- else:
- response = requests.post(request_url, data = jsonData, headers = headerData)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #response = requests.post(request_url, data = jsonData, headers = headerData, verify=False)
- self.logger.debug("--- Response is : ---")
- self.logger.debug(response.status_code)
- self.logger.debug(response.headers)
- self.logger.debug(response.text)
- if(response.status_code == 200) :
- connected = True
- self.logger.info("connected to the PyPDP: %s", request_url)
- break
- elif(response.status_code == 202) :
- connected = True
- break
- elif(response.status_code == 400):
- self.logger.debug("PE400 - Schema Issue: Incorrect Params passed: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE400 - Schema Issue: Incorrect Params passed: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 401):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 403):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- else:
- self.logger.debug("PE200 - System Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- except Exception as e:
- print(str(e));
- self.logger.debug("PE200 - System Error: PyPDP Error: %s", self.resturl[0])
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- if(connected):
- if(self.autoURL==None):
- self.__startAuto()
- elif(self.autoURL!= self.resturl[0]):
- self.__startAuto()
- return response
- else:
- self.logger.error("PE200 - System Error: cannot connect to given PYPDPServer(s) %s", self.resturl)
- print(errormessage)
- sys.exit(0)
-
- def __rotatePDP(self):
- self.resturl = collections.deque(self.resturl)
- self.resturl.rotate(-1)
- self.encoded = collections.deque(self.encoded)
- self.encoded.rotate(-1)
-
- def __checkNotification(self, resultJson):
- """
- This function call is for Internal usage purpose only.
- Checks the Notification JSON compares it with the MatchStore and returns the PDPNotification object.
- """
- if not resultJson:
- return None
- if not self.matchStore:
- return None
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if resultJson['removedPolicies']:
- removedPolicies = []
- for removed in resultJson['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if resultJson['updatedPolicies']:
- updatedPolicies = []
- for updated in resultJson['updatedPolicies']:
- updatedPolicy = LoadedPolicy()
- # check if it has matches then it is a Config Policy and compare it with Match Store.
- if updated['matches']:
- # compare the matches with our Stored Matches
- for eachDict in self.matchStore:
- if eachDict==updated['matches']:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- else:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- pDPNotification._loadedPolicies= updatedPolicies
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
-
- def __startAuto(self):
- """
- Starts the Auto Notification Feature..
- """
- if self.scheme is not None:
- if ((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.handler is None:
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- if self.autoURL is None:
- self.autoURL = self.resturl[0]
- elif self.autoURL != self.resturl[0]:
- self.autoURL = self.resturl[0]
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- self.autows = None
- if self.autows is None:
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
- elif self.autows.sock is not None:
- if not (self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.restart = True
- self.__rotatePDP()
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
-
- else:
- #stop the Auto Notification Service if it is running.
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
-
- def __onEvent(self, message):
- """
- Handles the event Notification received.
- """
- message = json.loads(message)
- if self.handler is not None:
- if (self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name):
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if message['removedPolicies']:
- removedPolicies = []
- for removed in message['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if message['loadedPolicies']:
- updatedPolicies = []
- for updated in message['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- # call the Handler.
- self.handler.notificationReceived(pDPNotification)
- elif (self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name):
- # call the handler
- self.handler(self.__checkNotification(message))
-
- def __manualRequest(self,request_url):
- """
- Takes the request_URL given and returns the JSON response back to the Caller.
- """
- ws = create_connection(request_url)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #ws = create_connection(request_url, sslopt={"cert_reqs": ssl.CERT_NONE})
- ws.send("Manual")
- try:
- return json.loads(ws.recv())
- except:
- return None
- ws.close()
- ws.shutdown()
-
- def __onMessage(self, ws,message):
- """Occurs on Event
- """
- self.logger.info("Received AutoNotification message: %s" , message)
- self.__onEvent(message)
-
- def __onError(self, ws, error):
- """Self Restart the Notification Service on Error
- """
- self.logger.error("PE500 - Process Flow Issue: Auto Notification service Error!! : %s" , error)
-
- def __onclose(self, ws):
- """Occurs on Close ? Try to start again in case User didn't do it.
- """
- self.logger.debug("Connection has been Closed. ")
- if not self.mclose:
- self.__startAuto()
- self.mclose = False
-
- def __autoRequest(self, request_url):
- """
- Takes the request_URL and invokes the PolicyEngine method on any receiving a Message
- """
- websocket.enableTrace(True)
- self.autows = websocket.WebSocketApp(request_url, on_message= self.__onMessage, on_close= self.__onclose, on_error= self.__onError)
- # wait for to 5 seconds to restart
- if self.restart:
- time.sleep(5)
- self.autows.run_forever()
-
-class NotificationHandler:
- """
- 'Defines the methods which need to run when an Event or a Notification is received.'
- """
- def notificationReceived(self, notification):
- """
- Will be triggered automatically whenever a Notification is received by the PEP
- @param notification: PDPNotification object which has the information of the Policies.
- @attention: This method must be implemented by the user for AUTO type NotificationScheme
- """
- raise Exception("Unimplemented abstract method: %s" % __functionId(self, 1))
-
-def __functionId(obj, nFramesUp):
- """ Internal Usage only..
- Create a string naming the function n frames up on the stack. """
- fr = sys._getframe(nFramesUp+1)
- co = fr.f_code
- return "%s.%s" % (obj.__class__, co.co_name)
-
-class PolicyConfig:
- """
- 'PolicyConfig is the return object resulted by getConfig Call.'
- """
- def __init__(self):
- self._policyConfigMessage = None
- self._policyConfigStatus = None
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._responseAttributes = None
- self._policyType = None
- self._json = None
- self._xml = None
- self._prop = None
- self._other = None
-
-class PolicyResponse:
- """
- 'PolicyResponse is the return object resulted by sendEvent Call.'
- """
- def __init__(self):
- self._policyResponseStatus = None
- self._policyResponseMessage = None
- self._requestAttributes = None
- self._actionTaken = None
- self._actionAdvised= None
-
-class PDPNotification:
- """
- 'Defines the Notification Event sent from the PDP to PEP Client.'
- """
- def __init__(self):
- self._removedPolicies = None
- self._loadedPolicies = None
- self._notificationType = None
-
-class RemovedPolicy:
- """
- 'Defines the structure of the Removed Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
-
-class LoadedPolicy:
- """
- 'Defines the Structure of the Loaded Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._updateType = None
-
-class PolicyParameters:
- """
- 'Defines the Structure of the Policy to Create or Update'
- """
- def __init__(self):
- self._actionPerformer = None
- self._actionAttribute = None
- self._attributes = None
- self._configBody = None
- self._configBodyType = None
- self._configName = None
- self._controllerName = None
- self._dependencyNames = None
- self._dynamicRuleAlgorithmLabels = None
- self._dynamicRuleAlgorithmFunctions = None
- self._dynamicRuleAlgorithmField1 = None
- self._dynamicRuleAlgorithmField2 = None
- self._ecompName = None
- self._extendedOption = None
- self._guard = None
- self._policyClass = None
- self._policyConfigType = None
- self._policyName = None
- self._policyDescription = None
- self._priority = None
- self._requestID = None
- self._riskLevel = None
- self._riskType = None
- self._ruleProvider = None
- self._ttlDate = None
-
-class PushPolicyParameters:
- """
- 'Defines the Structure of the Push Policy Parameters'
- """
- def __init__(self):
- self._pdpGroup = None
- self._policyName = None
- self._policyType = None
-
-class PolicyChangeResponse:
- """
- 'Defines the Structure of the policy Changes made from PDP'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
-
-class DeletePolicyParameters:
- """
- 'Defines the Structure of the Delete Policy Parameters'
- """
- def __init__(self):
- self._deleteCondition = None
- self._pdpGroup = None
- self._policyComponent = None
- self._policyName = None
- self._policyType = None
- self._requestID = None
-
-class DictionaryParameters:
- """
- 'Defines the Structure of the Dictionary Parameters'
- """
- def __init__(self):
- self._dictionaryType = None
- self._dictionary = None
- self._dictionaryJson = None
- self._requestID = None
-
-class DictionaryResponse:
- """
- 'Defines the Structure of the dictionary response'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
- self._dictionaryJson = None
- self._dictionaryData = None
-
-class DecisionResponse:
- """
- 'Defines the Structure of Decision Response'
- """
- def __init__(self):
- self._decision = None
- self._details = None
-
-class ImportParameters:
- """
- 'Defines the Structure of Policy Model Import'
- """
- def __init__(self):
- self._serviceName = None
- self._description = None
- self._requestID = None
- self._filePath = None
- self._importBody = None
- self._version = None
- self._importType = None
diff --git a/policyhandler/config.py b/policyhandler/config.py index 7033096..a36f032 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -22,8 +22,6 @@ import os import json import copy -import re -import base64 import logging import logging.config @@ -65,11 +63,7 @@ class Config(object): """find the name of the policy-handler system to be used as the key in consul-kv for config of policy-handler """ - system_name = None - if Config.config: - system_name = Config.config.get(Config.FIELD_SYSTEM) - - return system_name or Config.SERVICE_NAME_POLICY_HANDLER + return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) @staticmethod def discover(): @@ -88,19 +82,6 @@ class Config(object): Config._logger.debug("merged config from discovery: %s", json.dumps(Config.config)) @staticmethod - def upload_to_discovery(): - """upload the current config settings to the discovery service""" - if not Config.config or not isinstance(Config.config, dict): - Config._logger.error("unexpected config: %s", Config.config) - return - - discovery_key = Config.get_system_name() - latest_config = json.dumps({Config.SERVICE_NAME_POLICY_HANDLER:Config.config}) - DiscoveryClient.put_kv(discovery_key, latest_config) - Config._logger.debug("uploaded config to discovery(%s): %s", \ - discovery_key, latest_config) - - @staticmethod def load_from_file(file_path=None): """read and store the config from config file""" if not file_path: @@ -123,43 +104,3 @@ class Config(object): Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) return True - -class PolicyEngineConfig(object): - """main config of the application""" - # PATH_TO_PROPERTIES = r'logs/policy_engine.properties' - PATH_TO_PROPERTIES = r'tmp/policy_engine.properties' - PYPDP_URL = "PYPDP_URL = {0}{1}, {2}, {3}\n" - CLIENT_ID = "CLIENT_ID = {0}\n" - CLIENT_KEY = "CLIENT_KEY = {0}\n" - ENVIRONMENT = "ENVIRONMENT = {0}\n" - _logger = logging.getLogger("policy_handler.pe_config") - - @staticmethod - def save_to_file(): - """create the policy_engine.properties for policy-engine client""" - file_path = PolicyEngineConfig.PATH_TO_PROPERTIES - - try: - config = Config.config[Config.FIELD_POLICY_ENGINE] - headers = config["headers"] - remove_basic = re.compile(r"(^Basic )") - client_auth = headers["ClientAuth"] - basic_client_auth = bool(remove_basic.match(client_auth)) - client_parts = base64.b64decode(remove_basic.sub("", client_auth)).split(":") - auth_parts = base64.b64decode(remove_basic.sub("", headers["Authorization"])).split(":") - - props = PolicyEngineConfig.PYPDP_URL.format(config["url"], config["path_pdp"], - auth_parts[0], auth_parts[1]) - props += PolicyEngineConfig.CLIENT_ID.format(client_parts[0]) - props += PolicyEngineConfig.CLIENT_KEY.format(base64.b64encode(client_parts[1])) - props += PolicyEngineConfig.ENVIRONMENT.format(headers["Environment"]) - - with open(file_path, 'w') as prp_file: - prp_file.write(props) - PolicyEngineConfig._logger.info("created %s basic_client_auth %s", - file_path, basic_client_auth) - return basic_client_auth - except IOError: - PolicyEngineConfig._logger.error("failed to save to %s", file_path) - except KeyError: - PolicyEngineConfig._logger.error("unexpected config for %s", Config.FIELD_POLICY_ENGINE) diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 1d50fc3..a641a95 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -59,14 +59,27 @@ class DeployHandler(object): DeployHandler._target_entity = Config.config["deploy_handler"] DeployHandler._url = DiscoveryClient.get_service_url(DeployHandler._target_entity) - DeployHandler._url_path = DeployHandler._url + '/policy' + DeployHandler._url_path = (DeployHandler._url or "") + '/policy' DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url) @staticmethod - def policy_update(audit, latest_policies): - """ post policy_updated message to deploy-handler """ + def policy_update(audit, latest_policies, removed_policies=None, + errored_policies=None, catch_up=False): + """post policy_updated message to deploy-handler""" + if not latest_policies and not removed_policies and not catch_up: + return + + latest_policies = latest_policies or {} + removed_policies = removed_policies or {} + errored_policies = errored_policies or {} + DeployHandler._lazy_init() - msg = {"latest_policies":latest_policies} + msg = { + "catch_up" : catch_up, + "latest_policies" : latest_policies, + "removed_policies" : removed_policies, + "errored_policies" : errored_policies + } sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_path) headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id} @@ -74,10 +87,22 @@ class DeployHandler(object): msg_str = json.dumps(msg) headers_str = json.dumps(headers) + DeployHandler._logger.info( + "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]", + catch_up, len(latest_policies), len(removed_policies), len(errored_policies)) log_line = "post to deployment-handler {0} msg={1} headers={2}".format( DeployHandler._url_path, msg_str, headers_str) - sub_aud.metrics_start(log_line) + DeployHandler._logger.info(log_line) + sub_aud.metrics_start(log_line) + + if not DeployHandler._url: + error_msg = "no url found to {0}".format(log_line) + DeployHandler._logger.error(error_msg) + sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + sub_aud.metrics(error_msg) + return res = None try: diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index 7e16b90..33c3265 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -41,6 +41,8 @@ class DiscoveryClient(object): CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}" CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}" SERVICE_MASK = "http://{0}:{1}" + SERVICE_ADDRESS = "ServiceAddress" + SERVICE_PORT = "ServicePort" _logger = logging.getLogger("policy_handler.discovery") @@ -51,23 +53,25 @@ class DiscoveryClient(object): DiscoveryClient._logger.info("discover %s", service_path) response = requests.get(service_path) response.raise_for_status() - service = response.json()[0] - return DiscoveryClient.SERVICE_MASK.format( \ - service["ServiceAddress"], service["ServicePort"]) + service = response.json() + if not service: + DiscoveryClient._logger.error("failed discover %s", service_path) + return + service = service[0] + return DiscoveryClient.SERVICE_MASK.format( + service[DiscoveryClient.SERVICE_ADDRESS], service[DiscoveryClient.SERVICE_PORT] + ) @staticmethod def get_value(key): """get the value for the key from consul-kv""" response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key)) response.raise_for_status() - data = response.json()[0] - value = base64.b64decode(data["Value"]).decode("utf-8") - DiscoveryClient._logger.info("consul-kv key=%s data=%s value(%s)", \ - key, json.dumps(data), value) + data = response.json() + if not data: + DiscoveryClient._logger.error("failed get_value %s", key) + return + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) return json.loads(value) - - @staticmethod - def put_kv(key, value): - """put the value under the key in consul-kv""" - response = requests.put(DiscoveryClient.CONSUL_KV_MASK.format(key), data=value) - response.raise_for_status() diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index a1df861..c338b76 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -27,14 +27,18 @@ # ECOMP is a trademark and service mark of AT&T Intellectual Property. import os +import sys import json import uuid import time import copy +from datetime import datetime from threading import Lock from enum import Enum +from pip import utils as pip_utils from .CommonLogger import CommonLogger +from .health import Health REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" REQUEST_REMOTE_ADDR = "Remote-Addr" @@ -44,6 +48,7 @@ HOSTNAME = "HOSTNAME" AUDIT_REQUESTID = 'requestID' AUDIT_IPADDRESS = 'IPAddress' AUDIT_SERVER = 'server' +AUDIT_TARGET_ENTITY = 'targetEntity' HEADER_CLIENTAUTH = "clientauth" HEADER_AUTHORIZATION = "authorization" @@ -51,9 +56,10 @@ HEADER_AUTHORIZATION = "authorization" class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 - DATA_NOT_FOUND_ERROR = 400 PERMISSION_UNAUTHORIZED_ERROR = 401 PERMISSION_FORBIDDEN_ERROR = 403 + RESPONSE_ERROR = 400 + DATA_NOT_FOUND_ERROR = 404 SERVER_INTERNAL_ERROR = 500 SERVICE_UNAVAILABLE_ERROR = 503 DATA_ERROR = 1030 @@ -72,23 +78,25 @@ class AuditResponseCode(Enum): @staticmethod def get_response_code(http_status_code): """calculates the response_code from max_http_status_code""" + response_code = AuditResponseCode.UNKNOWN_ERROR if http_status_code <= AuditHttpCode.HTTP_OK.value: - return AuditResponseCode.SUCCESS - - if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \ - AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: - return AuditResponseCode.PERMISSION_ERROR - if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: - return AuditResponseCode.AVAILABILITY_ERROR - if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: - return AuditResponseCode.BUSINESS_PROCESS_ERROR - if http_status_code in [AuditHttpCode.DATA_ERROR.value, \ - AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: - return AuditResponseCode.DATA_ERROR - if http_status_code == AuditHttpCode.SCHEMA_ERROR.value: - return AuditResponseCode.SCHEMA_ERROR - - return AuditResponseCode.UNKNOWN_ERROR + response_code = AuditResponseCode.SUCCESS + + elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, + AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: + response_code = AuditResponseCode.PERMISSION_ERROR + elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + response_code = AuditResponseCode.AVAILABILITY_ERROR + elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: + response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR + elif http_status_code in [AuditHttpCode.DATA_ERROR.value, + AuditHttpCode.RESPONSE_ERROR.value, + AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + response_code = AuditResponseCode.DATA_ERROR + elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: + response_code = AuditResponseCode.SCHEMA_ERROR + + return response_code @staticmethod def get_human_text(response_code): @@ -109,16 +117,23 @@ class Audit(object): :kwargs: - put any request related params into kwargs """ _service_name = "" + _service_version = "" _service_instance_UUID = str(uuid.uuid4()) + _started = datetime.now() _logger_debug = None _logger_error = None _logger_metrics = None _logger_audit = None + _health = Health() + _py_ver = sys.version.replace("\n", "") + _packages = sorted([pckg.project_name + "==" + pckg.version + for pckg in pip_utils.get_installed_distributions()]) @staticmethod - def init(service_name, config_file_path): + def init(service_name, service_version, config_file_path): """init static invariants and loggers""" Audit._service_name = service_name + Audit._service_version = service_version Audit._logger_debug = CommonLogger(config_file_path, "debug", \ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) Audit._logger_error = CommonLogger(config_file_path, "error", \ @@ -128,6 +143,22 @@ class Audit(object): Audit._logger_audit = CommonLogger(config_file_path, "audit", \ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) + @staticmethod + def health(): + """returns json for health check""" + now = datetime.now() + return { + "service_name" : Audit._service_name, + "service_version" : Audit._service_version, + "service_instance_UUID" : Audit._service_instance_UUID, + "python" : Audit._py_ver, + "started" : str(Audit._started), + "now" : str(now), + "uptime" : str(now - Audit._started), + "stats" : Audit._health.dump(), + "packages" : Audit._packages + } + def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): """create audit object per each request in the system @@ -193,6 +224,13 @@ class Audit(object): self.max_http_status_code = max(http_status_code, self.max_http_status_code) self._lock.release() + def get_max_http_status_code(self): + """returns the highest(worst) http status code""" + self._lock.acquire() + max_http_status_code = self.max_http_status_code + self._lock.release() + return max_http_status_code + @staticmethod def get_status_code(success): """COMPLETE versus ERROR""" @@ -222,12 +260,24 @@ class Audit(object): return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - def get_response_code(self): - """calculates the response_code from max_http_status_code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() - return AuditResponseCode.get_response_code(max_http_status_code) + def is_serious_error(self, status_code): + """returns whether the response_code is success and a human text for response code""" + return AuditResponseCode.PERMISSION_ERROR.value \ + == AuditResponseCode.get_response_code(status_code).value \ + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value + + def _get_response_status(self): + """calculates the response status fields from max_http_status_code""" + max_http_status_code = self.get_max_http_status_code() + response_code = AuditResponseCode.get_response_code(max_http_status_code) + success = (response_code.value == AuditResponseCode.SUCCESS.value) + response_description = AuditResponseCode.get_human_text(response_code) + return success, max_http_status_code, response_code, response_description + + def is_success(self): + """returns whether the response_code is success and a human text for response code""" + success, _, _, _ = self._get_response_status() + return success def debug(self, log_line, **kwargs): """debug - the debug=lowest level of logging""" @@ -275,46 +325,56 @@ class Audit(object): def metrics(self, log_line, **kwargs): """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() metrics_func = None + timer = Audit.get_elapsed_time(self._metrics_started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) metrics_func = Audit._logger_metrics.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) else: log_line = "failed: {0}".format(log_line) self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) + errorDescription=response_description, **all_kwargs) metrics_func = Audit._logger_metrics.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) - metrics_func(log_line, begTime=self._metrics_start_event, \ - timer=Audit.get_elapsed_time(self._metrics_started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ - **all_kwargs) + metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, + statusCode=Audit.get_status_code(success), responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) self.metrics_start() + return (success, max_http_status_code, response_description) def audit_done(self, result=None, **kwargs): """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() log_line = "{0} {1}".format(self.req_message, result or "").strip() audit_func = None + timer = Audit.get_elapsed_time(self._started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) audit_func = Audit._logger_audit.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) else: log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) audit_func = Audit._logger_audit.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) - audit_func(log_line, begTime=self._start_event, \ - timer=Audit.get_elapsed_time(self._started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ - **all_kwargs) + return (success, max_http_status_code, response_description) diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py new file mode 100644 index 0000000..eefa7d2 --- /dev/null +++ b/policyhandler/onap/health.py @@ -0,0 +1,104 @@ +"""generic class to keep track of app health""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import uuid +from threading import Lock +from datetime import datetime + +class HealthStats(object): + """keep track of stats for calls""" + def __init__(self, name): + """keep track of stats for metrics calls""" + self._name = name or "stats_" + str(uuid.uuid4()) + self._lock = Lock() + self._call_count = 0 + self._error_count = 0 + self._longest_timer = 0 + self._total_timer = 0 + self._last_success = None + self._last_error = None + + def dump(self): + """returns dict of stats""" + dump = None + with self._lock: + dump = { + "call_count" : self._call_count, + "error_count" : self._error_count, + "last_success" : str(self._last_success), + "last_error" : str(self._last_error), + "longest_timer_millisecs" : self._longest_timer, + "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \ + if self._call_count else 0) + } + return dump + + def success(self, timer): + """records the successful execution""" + with self._lock: + self._call_count += 1 + self._last_success = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + + def error(self, timer): + """records the errored execution""" + with self._lock: + self._call_count += 1 + self._error_count += 1 + self._last_error = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + +class Health(object): + """Health stats for multiple requests""" + def __init__(self): + """Health stats for application""" + self._all_stats = {} + self._lock = Lock() + + def _add_or_get_stats(self, stats_name): + """add to or get from the ever growing dict of HealthStats""" + stats = None + with self._lock: + stats = self._all_stats.get(stats_name) + if not stats: + self._all_stats[stats_name] = stats = HealthStats(stats_name) + return stats + + def success(self, stats_name, timer): + """records the successful execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.success(timer) + + def error(self, stats_name, timer): + """records the error execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.error(timer) + + def dump(self): + """returns dict of stats""" + with self._lock: + stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems()) + + return stats diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 640b724..30fe9b2 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -24,5 +24,3 @@ POLICY_VERSION = "policyVersion" POLICY_NAME = "policyName" POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' - -POLICY_GET_CONFIG = 'getConfig' diff --git a/policyhandler/policy_engine.py b/policyhandler/policy_engine.py deleted file mode 100644 index a0ff697..0000000 --- a/policyhandler/policy_engine.py +++ /dev/null @@ -1,103 +0,0 @@ -"""policy-engine-client communicates with policy-engine thru PolicyEngine client object""" - -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -import logging -import re - -from .config import Config, PolicyEngineConfig -from .onap.audit import Audit -from .PolicyEngine import PolicyEngine, NotificationHandler, NotificationScheme -from .policy_updater import PolicyUpdater - -class PolicyNotificationHandler(NotificationHandler): - """handler of the policy-engine push notifications""" - _logger = logging.getLogger("policy_handler.policy_notification") - - def __init__(self, policy_updater): - scope_prefixes = [scope_prefix.replace(".", "[.]") - for scope_prefix in Config.config["scope_prefixes"]] - self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") - PolicyNotificationHandler._logger.info("_policy_scopes %s", self._policy_scopes.pattern) - self._policy_updater = policy_updater - self._policy_updater.start() - - def notificationReceived(self, notification): - if not notification or not notification._loadedPolicies: - return - - policy_names = [loaded._policyName - for loaded in notification._loadedPolicies - if self._policy_scopes.match(loaded._policyName)] - - if not policy_names: - PolicyNotificationHandler._logger.info("no policy updated for scopes %s", - self._policy_scopes.pattern) - return - - audit = Audit(req_message="notificationReceived from PDP") - audit.retry_get_config = True - self._policy_updater.enqueue(audit, policy_names) - -class PolicyEngineClient(object): - """ policy-engine client""" - _logger = logging.getLogger("policy_handler.policy_engine") - _policy_updater = None - _pdp_notification_handler = None - _policy_engine = None - - @staticmethod - def shutdown(audit): - """Shutdown the notification-handler""" - PolicyEngineClient._policy_updater.shutdown(audit) - - @staticmethod - def catch_up(audit): - """bring the latest policies from policy-engine""" - PolicyEngineClient._policy_updater.catch_up(audit) - - @staticmethod - def create_policy_engine_properties(): - """create the policy_engine.properties file from config.json""" - pass - - @staticmethod - def run(): - """Using policy-engine client to talk to policy engine""" - audit = Audit(req_message="start PDP client") - PolicyEngineClient._policy_updater = PolicyUpdater() - PolicyEngineClient._pdp_notification_handler = PolicyNotificationHandler( - PolicyEngineClient._policy_updater) - - sub_aud = Audit(aud_parent=audit) - sub_aud.metrics_start("create client to PDP") - basic_client_auth = PolicyEngineConfig.save_to_file() - PolicyEngineClient._policy_engine = PolicyEngine( - PolicyEngineConfig.PATH_TO_PROPERTIES, - scheme=NotificationScheme.AUTO_ALL_NOTIFICATIONS.name, - handler=PolicyEngineClient._pdp_notification_handler, - basic_client_auth=basic_client_auth - ) - sub_aud.metrics("created client to PDP") - seed_scope = ".*" - PolicyEngineClient._policy_engine.getConfig(policyName=seed_scope) - sub_aud.metrics("seeded client by PDP.getConfig for policyName={0}".format(seed_scope)) - - PolicyEngineClient.catch_up(audit) diff --git a/policyhandler/policy_handler.py b/policyhandler/policy_handler.py index 50d59bc..1cd62db 100644 --- a/policyhandler/policy_handler.py +++ b/policyhandler/policy_handler.py @@ -18,14 +18,14 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. - +import os import sys import logging from policyhandler.config import Config from policyhandler.onap.audit import Audit from policyhandler.web_server import PolicyWeb -from policyhandler.policy_engine import PolicyEngineClient +from policyhandler.policy_receiver import PolicyReceiver class LogWriter(object): """redirect the standard out + err to the logger""" @@ -52,13 +52,16 @@ def run_policy_handler(): sys.stderr = LogWriter(logger.error) logger.info("========== run_policy_handler ==========") - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) + policy_handler_version = os.getenv("APP_VER") + logger.info("policy_handler_version %s", policy_handler_version) + Audit.init(Config.get_system_name(), policy_handler_version, Config.LOGGER_CONFIG_FILE_PATH) logger.info("starting policy_handler with config:") logger.info(Audit.log_json_dumps(Config.config)) - PolicyEngineClient.run() - PolicyWeb.run() + audit = Audit(req_message="start policy handler") + PolicyReceiver.run(audit) + PolicyWeb.run_forever(audit) if __name__ == "__main__": run_policy_handler() diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py new file mode 100644 index 0000000..ec25987 --- /dev/null +++ b/policyhandler/policy_receiver.py @@ -0,0 +1,195 @@ +""" +policy-receiver communicates with policy-engine +thru web-socket to receive push notifications +on updates and removal of policies. + +on receiving the policy-notifications, the policy-receiver +filters them out by the policy scope(s) provided in policy-handler config +and passes the notifications to policy-updater +""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import json +import logging +import re +import time +from threading import Lock, Thread + +import websocket + +from .config import Config +from .onap.audit import Audit +from .policy_updater import PolicyUpdater + +LOADED_POLICIES = 'loadedPolicies' +REMOVED_POLICIES = 'removedPolicies' +POLICY_NAME = 'policyName' +POLICY_VER = 'versionNo' + +class _PolicyReceiver(Thread): + """web-socket to PolicyEngine""" + _logger = logging.getLogger("policy_handler.policy_receiver") + + def __init__(self): + """web-socket inside the thread to receive policy notifications from PolicyEngine""" + Thread.__init__(self, name="policy_receiver") + self.daemon = True + + self._lock = Lock() + self._keep_running = True + + config = Config.config[Config.FIELD_POLICY_ENGINE] + self.web_socket_url = resturl = config["url"] + config["path_pdp"] + + if resturl.startswith("https:"): + self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + self._web_socket = None + + scope_prefixes = [scope_prefix.replace(".", "[.]") + for scope_prefix in Config.config["scope_prefixes"]] + self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") + _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern) + self._policy_updater = PolicyUpdater() + self._policy_updater.start() + + def run(self): + """listen on web-socket and pass the policy notifications to policy-updater""" + websocket.enableTrace(True) + restarting = False + while True: + if not self._get_keep_running(): + break + + self._stop_notifications() + + if restarting: + time.sleep(5) + + _PolicyReceiver._logger.info( + "connecting to policy-notifications at: %s", self.web_socket_url) + self._web_socket = websocket.WebSocketApp( + self.web_socket_url, + on_message=self._on_pdp_message, + on_close=self._on_ws_close, + on_error=self._on_ws_error + ) + + _PolicyReceiver._logger.info("waiting for policy-notifications...") + self._web_socket.run_forever() + restarting = True + + _PolicyReceiver._logger.info("exit policy-receiver") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _stop_notifications(self): + """Shuts down the AutoNotification service if running.""" + with self._lock: + if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: + self._web_socket.close() + _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") + + def _on_pdp_message(self, _, message): + """received the notification from PDP""" + _PolicyReceiver._logger.info("Received notification message: %s", message) + if not message: + return + message = json.loads(message) + + if not message: + return + + policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(LOADED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(REMOVED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + + if not policies_updated and not policies_removed: + _PolicyReceiver._logger.info( + "no policy updated or removed for scopes %s", self._policy_scopes.pattern + ) + return + + audit = Audit(req_message="policy-notification - updated[{0}], removed[{1}]" \ + .format(len(policies_updated), len(policies_removed))) + audit.retry_get_config = True + self._policy_updater.enqueue(audit, policies_updated, policies_removed) + + def _on_ws_error(self, _, error): + """report an error""" + _PolicyReceiver._logger.error("policy-notification error: %s", error) + + def _on_ws_close(self, _): + """restart web-socket on close""" + _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + + def shutdown(self, audit): + """Shutdown the policy-receiver""" + _PolicyReceiver._logger.info("shutdown policy-receiver") + with self._lock: + self._keep_running = False + + self._stop_notifications() + + if self.is_alive(): + self.join() + + self._policy_updater.shutdown(audit) + + def catch_up(self, audit): + """need to bring the latest policies to DCAE-Controller""" + self._policy_updater.catch_up(audit) + +class PolicyReceiver(object): + """policy-receiver - static singleton wrapper""" + _policy_receiver = None + + @staticmethod + def shutdown(audit): + """Shutdown the notification-handler""" + PolicyReceiver._policy_receiver.shutdown(audit) + + @staticmethod + def catch_up(audit): + """bring the latest policies from policy-engine""" + PolicyReceiver._policy_receiver.catch_up(audit) + + @staticmethod + def run(audit): + """Using policy-engine client to talk to policy engine""" + sub_aud = Audit(aud_parent=audit) + sub_aud.metrics_start("start policy receiver") + + PolicyReceiver._policy_receiver = _PolicyReceiver() + PolicyReceiver._policy_receiver.start() + + sub_aud.metrics("started policy receiver") + + PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index bf8a31d..1e50693 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -22,123 +22,30 @@ import logging import json import copy -import re import time from multiprocessing.dummy import Pool as ThreadPool import requests from .config import Config -from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \ - POLICY_BODY, POLICY_CONFIG +from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode - -class PolicyUtils(object): - """policy-client utils""" - _logger = logging.getLogger("policy_handler.policy_utils") - _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') - - @staticmethod - def safe_json_parse(json_str): - """try parsing json without exception - returns the json_str back if fails""" - if not json_str: - return json_str - try: - return json.loads(json_str) - except ValueError as err: - PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) - return json_str - - @staticmethod - def extract_policy_id(policy_name): - """ policy_name = policy_id + "." + <version> + "." + <extension> - For instance, - policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml - policy_id = DCAE_alex.Config_alex_policy_number_1 - policy_scope = DCAE_alex - policy_class = Config - policy_version = 3 - type = extension = xml - delimiter = "." - policy_class_delimiter = "_" - policy_name in PAP = DCAE_alex.alex_policy_number_1 - """ - if not policy_name: - return - return PolicyUtils._policy_name_ext.sub('', policy_name) - - @staticmethod - def parse_policy_config(policy): - """try parsing the config in policy.""" - if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]: - policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse( - policy[POLICY_BODY][POLICY_CONFIG]) - return policy - - @staticmethod - def convert_to_policy(policy_config): - """wrap policy_config received from policy-engine with policy_id.""" - if not policy_config or POLICY_NAME not in policy_config \ - or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]: - return - policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME]) - if not policy_id: - return - return {POLICY_ID:policy_id, POLICY_BODY:policy_config} - - @staticmethod - def select_latest_policy(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest version - """ - if not policy_configs: - return - latest_policy_config = {} - for policy_config in policy_configs: - if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \ - or not policy_config[POLICY_VERSION].isdigit(): - continue - if not latest_policy_config \ - or int(policy_config[POLICY_VERSION]) \ - > int(latest_policy_config[POLICY_VERSION]): - latest_policy_config = policy_config - - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) - - @staticmethod - def select_latest_policies(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest versions - """ - if not policy_configs: - return {} - policies = {} - for policy_config in policy_configs: - policy = PolicyUtils.convert_to_policy(policy_config) - if not policy or POLICY_ID not in policy or POLICY_BODY not in policy: - continue - if POLICY_VERSION not in policy[POLICY_BODY] \ - or not policy[POLICY_BODY][POLICY_VERSION] \ - or not policy[POLICY_BODY][POLICY_VERSION].isdigit(): - continue - if policy[POLICY_ID] not in policies: - policies[policy[POLICY_ID]] = policy - continue - if int(policy[POLICY_BODY][POLICY_VERSION]) \ - > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]): - policies[policy[POLICY_ID]] = policy - - for policy_id in policies: - policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) - - return policies +from .policy_utils import PolicyUtils class PolicyRest(object): """ policy-engine """ _logger = logging.getLogger("policy_handler.policy_rest") _lazy_inited = False + POLICY_GET_CONFIG = 'getConfig' + POLICY_CONFIG_STATUS = "policyConfigStatus" + CONFIG_RETRIEVED = "CONFIG_RETRIEVED" + POLICY_CONFIG_MESSAGE = "policyConfigMessage" + NO_RESPONSE_RECEIVED = "No Response Received" + + MIN_VERSION_EXPECTED = "min_version_expected" + IGNORE_POLICY_NAMES = "ignore_policy_names" _requests_session = None - _url = None + _url_get_config = None _headers = None _target_entity = None _thread_pool_size = 4 @@ -167,7 +74,8 @@ class PolicyRest(object): requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) - PolicyRest._url = config["url"] + config["path_api"] + PolicyRest._url_get_config = config["url"] \ + + config["path_api"] + PolicyRest.POLICY_GET_CONFIG PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) @@ -181,31 +89,32 @@ class PolicyRest(object): PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \ - PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \ + PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \ json.dumps(PolicyRest._scope_prefixes)) @staticmethod - def _post(audit, path, json_body): + def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" - full_path = PolicyRest._url + path sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \ - targetServiceName=full_path) + targetServiceName=PolicyRest._url_get_config) msg = json.dumps(json_body) headers = copy.copy(PolicyRest._headers) headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id headers_str = Audit.log_json_dumps(headers) - log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str) + log_line = "post to PDP {0} msg={1} headers={2}".format( + PolicyRest._url_get_config, msg, headers_str) sub_aud.metrics_start(log_line) PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers) + res = PolicyRest._requests_session.post( + PolicyRest._url_get_config, json=json_body, headers=headers) except requests.exceptions.RequestException as ex: error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \ - .format(full_path, str(ex), msg, headers_str) + .format(PolicyRest._url_get_config, str(ex), msg, headers_str) PolicyRest._logger.exception(error_msg) sub_aud.set_http_status_code(error_code) @@ -213,145 +122,301 @@ class PolicyRest(object): sub_aud.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \ - full_path, res.status_code, msg, res.text, \ + log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( + PolicyRest._url_get_config, res.status_code, msg, res.text, Audit.log_json_dumps(dict(res.request.headers.items()))) + + res_data = None + if res.status_code == requests.codes.ok: + res_data = res.json() + + if res_data and isinstance(res_data, list) and len(res_data) == 1: + result = res_data[0] + if result and not result.get(POLICY_NAME): + res_data = None + if result.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED: + error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + error_msg = "unexpected {0}".format(log_line) + + PolicyRest._logger.error(error_msg) + sub_aud.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + sub_aud.metrics(error_msg) + return (error_code, None) + sub_aud.set_http_status_code(res.status_code) sub_aud.metrics(log_line) PolicyRest._logger.info(log_line) + return res.status_code, res_data - if res.status_code == requests.codes.ok: - return res.status_code, res.json() + @staticmethod + def validate_policy(policy): + """Validates the config on policy""" + if not policy: + return - return res.status_code, None + policy_body = policy.get(POLICY_BODY) + + return bool( + policy_body + and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED + and policy_body.get(POLICY_CONFIG) + ) @staticmethod - def get_latest_policy(aud_policy_name): - """Get the latest policy for the policy_name from the policy-engine""" + def validate_policies(policies): + """Validate the config on policies. Returns (valid, errored) tuple""" + if not policies: + return None, policies + + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in policies.iteritems(): + if PolicyRest.validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + + return valid_policies, errored_policies + + @staticmethod + def get_latest_policy(aud_policy_id): + """Get the latest policy for the policy_id from the policy-engine""" PolicyRest._lazy_init() - audit, policy_name = aud_policy_name + audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id status_code = 0 + policy_configs = None latest_policy = None + expect_policy_removed = (ignore_policy_names and not min_version_expected) + for retry in xrange(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("%s", policy_name) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:policy_name}) - PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \ - json.dumps(policy_configs or [])) - latest_policy = PolicyUtils.select_latest_policy(policy_configs) - if not latest_policy: - audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \ - .format(policy_name, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + PolicyRest._logger.debug("%s", policy_id) + + status_code, policy_configs = PolicyRest._pdp_get_config( + audit, {POLICY_NAME:policy_id} + ) + + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) + + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) + + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR)) if latest_policy or not audit.retry_get_config \ + or (expect_policy_removed and not policy_configs) \ or not PolicyRest._policy_retry_sleep \ - or AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value: + or audit.is_serious_error(status_code): break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \ - .format(POLICY_GET_CONFIG, retry, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR)) break - audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \ - .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn( + "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR)) time.sleep(PolicyRest._policy_retry_sleep) + if expect_policy_removed and not latest_policy \ + and AuditHttpCode.RESPONSE_ERROR.value == status_code: + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + audit.set_http_status_code(status_code) - if not latest_policy: + if not PolicyRest.validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR) + ) + return latest_policy @staticmethod - def get_latest_policies_by_names(aud_policy_names): + def get_latest_updated_policies(aud_policy_updates): """Get the latest policies of the list of policy_names from the policy-engine""" PolicyRest._lazy_init() - audit, policy_names = aud_policy_names - if not policy_names: + audit, policies_updated, policies_removed = aud_policy_updates + if not policies_updated and not policies_removed: return - audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \ - len(policy_names), json.dumps(policy_names))) - PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names)) + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) + audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.iteritems()] - thread_count = min(PolicyRest._thread_pool_size, len(policy_names)) - apns = [(audit, policy_name) for policy_name in policy_names] policies = None - if thread_count == 1: + apns_length = len(apns) + if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] else: - pool = ThreadPool(thread_count) + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) policies = pool.map(PolicyRest.get_latest_policy, apns) pool.close() pool.join() - audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \ - len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) - policies = dict([(policy[POLICY_ID], policy) \ - for policy in policies if policy and POLICY_ID in policy]) - PolicyRest._logger.debug("policies %s", json.dumps(policies)) - if not policies: + audit.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies)), + targetEntity=PolicyRest._target_entity, + targetServiceName=PolicyRest._url_get_config) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.iteritems() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.iteritems() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - return policies + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR) + ) + + return updated_policies, removed_policies @staticmethod - def _get_latest_policies(aud_scope_prefix): - """Get the latest policies of the same scope from the policy-engine""" - audit, scope_prefix = aud_scope_prefix - PolicyRest._logger.debug("%s", scope_prefix) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:scope_prefix + ".*"}) - audit.set_http_status_code(status_code) - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \ - scope_prefix, json.dumps(policy_configs or [])) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) + def _get_latest_policies(aud_policy_filter): + """ + get the latest policies by policy_filter + or all the latest policies of the same scope from the policy-engine + """ + audit, policy_filter, error_if_not_found = aud_policy_filter + str_policy_filter = json.dumps(policy_filter) + PolicyRest._logger.debug("%s", str_policy_filter) + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + + PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, + str_policy_filter, json.dumps(policy_configs or [])) + latest_policies = PolicyUtils.select_latest_policies(policy_configs) if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \ - scope_prefix, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) - return latest_policies + if error_if_not_found: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_configs or [])), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR) + ) + return None, latest_policies + + audit.set_http_status_code(status_code) + return PolicyRest.validate_policies(latest_policies) @staticmethod - def get_latest_policies(audit): + def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" PolicyRest._lazy_init() - PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes)) - audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes))) - asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] + aud_policy_filters = None + str_metrics = None + str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes) + if policy_filter is not None: + aud_policy_filters = [(audit, policy_filter, True)] + str_metrics = "get_latest_policies for policy_filter {0}".format( + str_policy_filters) + else: + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False) + for scope_prefix in PolicyRest._scope_prefixes] + str_metrics = "get_latest_policies for scopes {0} {1}".format( \ + len(PolicyRest._scope_prefixes), str_policy_filters) + + PolicyRest._logger.debug("%s", str_policy_filters) + audit.metrics_start(str_metrics) + latest_policies = None - if PolicyRest._scope_thread_pool_size == 1: - latest_policies = [PolicyRest._get_latest_policies(asps[0])] + apfs_length = len(aud_policy_filters) + if apfs_length == 1: + latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] else: - pool = ThreadPool(PolicyRest._scope_thread_pool_size) - latest_policies = pool.map(PolicyRest._get_latest_policies, asps) + pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) pool.close() pool.join() - audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \ - len(latest_policies), json.dumps(latest_policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) + audit.metrics("total result {0}: {1} {2}".format( + str_metrics, len(latest_policies), json.dumps(latest_policies)), \ + targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) + + # latest_policies == [(valid_policies, errored_policies), ...] + valid_policies = dict( + pair for (vps, _) in latest_policies if vps for pair in vps.iteritems()) + + errored_policies = dict( + pair for (_, eps) in latest_policies if eps for pair in eps.iteritems()) - latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items()) - PolicyRest._logger.debug("latest_policies: %s %s", \ - json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies)) + PolicyRest._logger.debug( + "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s", + str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies)) - return latest_policies + return valid_policies, errored_policies diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 1f1539f..9732f69 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -26,6 +26,7 @@ from threading import Thread, Lock from .policy_rest import PolicyRest from .deploy_handler import DeployHandler +from .onap.audit import Audit class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -33,40 +34,45 @@ class PolicyUpdater(Thread): def __init__(self): """init static config of PolicyUpdater.""" - Thread.__init__(self) - self.name = "policy_updater" + Thread.__init__(self, name="policy_updater") self.daemon = True - self._req_shutdown = None - self._req_catch_up = None + self._aud_shutdown = None + self._aud_catch_up = None self._lock = Lock() self._queue = Queue() - def enqueue(self, audit=None, policy_names=None): - """enqueue the policy-names""" - policy_names = policy_names or [] - PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names)) - self._queue.put((audit, policy_names)) + def enqueue(self, audit=None, policies_updated=None, policies_removed=None): + """enqueue the policy-updates""" + policies_updated = policies_updated or [] + policies_removed = policies_removed or [] + + PolicyUpdater._logger.info( + "policies_updated %s policies_removed %s", + json.dumps(policies_updated), json.dumps(policies_removed)) + self._queue.put((audit, policies_updated, policies_removed)) def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policy_names = self._queue.get() - PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names)) + audit, policies_updated, policies_removed = self._queue.get() + PolicyUpdater._logger.info( + "got policies_updated %s policies_removed %s", + json.dumps(policies_updated), json.dumps(policies_removed)) + if not self._keep_running(): self._queue.task_done() break - if self._on_catch_up(): - continue - if not policy_names: - self._queue.task_done() + if self._on_catch_up(audit) or not audit: continue - updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names)) - PolicyUpdater.policy_update(audit, updated_policies) + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (audit, policies_updated, policies_removed)) + + DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies) audit.audit_done() self._queue.task_done() @@ -74,51 +80,60 @@ class PolicyUpdater(Thread): def _keep_running(self): """thread-safe check whether to continue running""" - self._lock.acquire() - keep_running = not self._req_shutdown - self._lock.release() - if self._req_shutdown: - self._req_shutdown.audit_done() + with self._lock: + keep_running = not self._aud_shutdown + + if self._aud_shutdown: + self._aud_shutdown.audit_done() return keep_running def catch_up(self, audit): """need to bring the latest policies to DCAE-Controller""" - self._lock.acquire() - self._req_catch_up = audit - self._lock.release() + PolicyUpdater._logger.info("catch_up requested") + with self._lock: + self._aud_catch_up = audit + self.enqueue() - def _on_catch_up(self): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - req_catch_up = self._req_catch_up - if self._req_catch_up: - self._req_catch_up = None + def _reset_queue(self): + """clear up the queue""" + with self._lock: + self._aud_catch_up = None self._queue.task_done() self._queue = Queue() + + def _on_catch_up(self, audit): + """Bring the latest policies to DCAE-Controller""" + self._lock.acquire() + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None self._lock.release() - if not req_catch_up: + + if not aud_catch_up: return False PolicyUpdater._logger.info("catch_up") - latest_policies = PolicyRest.get_latest_policies(req_catch_up) - PolicyUpdater.policy_update(req_catch_up, latest_policies) - req_catch_up.audit_done() - return True - - @staticmethod - def policy_update(audit, updated_policies): - """Invoke deploy-handler""" - if updated_policies: - PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies)) - DeployHandler.policy_update(audit, updated_policies) + latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up) + + if not aud_catch_up.is_success(): + PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") + if not audit: + self._queue.task_done() + else: + DeployHandler.policy_update( + aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True) + self._reset_queue() + success, _, _ = aud_catch_up.audit_done() + PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + + return success def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") - self._lock.acquire() - self._req_shutdown = audit - self._lock.release() + with self._lock: + self._aud_shutdown = audit self.enqueue() if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py new file mode 100644 index 0000000..d664b21 --- /dev/null +++ b/policyhandler/policy_utils.py @@ -0,0 +1,134 @@ +"""policy-client communicates with policy-engine thru REST API""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import logging +import json +import re + +from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG + +class PolicyUtils(object): + """policy-client utils""" + _logger = logging.getLogger("policy_handler.policy_utils") + _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') + + @staticmethod + def safe_json_parse(json_str): + """try parsing json without exception - returns the json_str back if fails""" + if not json_str: + return json_str + try: + return json.loads(json_str) + except (ValueError, TypeError) as err: + PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) + return json_str + + @staticmethod + def extract_policy_id(policy_name): + """ policy_name = policy_id + "." + <version> + "." + <extension> + For instance, + policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml + policy_id = DCAE_alex.Config_alex_policy_number_1 + policy_scope = DCAE_alex + policy_class = Config + policy_version = 3 + type = extension = xml + delimiter = "." + policy_class_delimiter = "_" + policy_name in PAP = DCAE_alex.alex_policy_number_1 + """ + if not policy_name: + return + return PolicyUtils._policy_name_ext.sub('', policy_name) + + @staticmethod + def parse_policy_config(policy): + """try parsing the config in policy.""" + if not policy: + return policy + config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) + if config: + policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config) + return policy + + @staticmethod + def convert_to_policy(policy_config): + """wrap policy_config received from policy-engine with policy_id.""" + if not policy_config: + return + policy_name = policy_config.get(POLICY_NAME) + policy_version = policy_config.get(POLICY_VERSION) + if not policy_name or not policy_version: + return + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + return + return {POLICY_ID:policy_id, POLICY_BODY:policy_config} + + @staticmethod + def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest version + """ + if not policy_configs: + return + latest_policy_config = {} + for policy_config in policy_configs: + policy_name = policy_config.get(POLICY_NAME) + policy_version = policy_config.get(POLICY_VERSION) + if not policy_name or not policy_version or not policy_version.isdigit(): + continue + policy_version = int(policy_version) + if min_version_expected and policy_version < min_version_expected: + continue + if ignore_policy_names and policy_name in ignore_policy_names: + continue + + if not latest_policy_config \ + or int(latest_policy_config[POLICY_VERSION]) < policy_version: + latest_policy_config = policy_config + + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) + + @staticmethod + def select_latest_policies(policy_configs): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest versions + """ + if not policy_configs: + return {} + policies = {} + for policy_config in policy_configs: + policy = PolicyUtils.convert_to_policy(policy_config) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + if not policy_id or not policy_version or not policy_version.isdigit(): + continue + if policy_id not in policies \ + or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]): + policies[policy_id] = policy + + for policy_id in policies: + policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) + + return policies diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 9a5ee19..3d2503a 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -27,78 +27,187 @@ import cherrypy from .config import Config from .onap.audit import Audit from .policy_rest import PolicyRest -from .policy_engine import PolicyEngineClient +from .policy_receiver import PolicyReceiver class PolicyWeb(object): - """Main static class for REST API of policy-handler""" - logger = logging.getLogger("policy_handler.web_cherrypy") + """run REST API of policy-handler""" + logger = logging.getLogger("policy_handler.policy_web") @staticmethod - def run(): - """run forever the web-server of the policy-handler""" - PolicyWeb.logger.info("policy_handler web-service at port(%d)...", \ - Config.wservice_port) - cherrypy.config.update({"server.socket_host": "0.0.0.0", \ - 'server.socket_port': Config.wservice_port}) - cherrypy.tree.mount(PolicyLatest(), '/policy_latest') - cherrypy.tree.mount(PoliciesLatest(), '/policies_latest') - cherrypy.tree.mount(PoliciesCatchUp(), '/catch_up') - cherrypy.quickstart(Shutdown(), '/shutdown') - -class Shutdown(object): - """Shutdown the policy-handler""" - @cherrypy.expose - def index(self): - """shutdown event""" - audit = Audit(req_message="get /shutdown", headers=cherrypy.request.headers) - PolicyWeb.logger.info("--------- stopping REST API of policy-handler -----------") - cherrypy.engine.exit() - PolicyEngineClient.shutdown(audit) - PolicyWeb.logger.info("--------- the end -----------") - res = str(datetime.now()) - audit.info_requested(res) - return "goodbye! shutdown requested {0}".format(res) + def run_forever(audit): + """run the web-server of the policy-handler forever""" + PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port) + cherrypy.config.update({"server.socket_host": "0.0.0.0", + 'server.socket_port': Config.wservice_port}) + cherrypy.tree.mount(_PolicyWeb(), '/') + audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port)) + cherrypy.engine.start() + +class _PolicyWeb(object): + """REST API of policy-handler""" -class PoliciesLatest(object): - """REST API of the policy-hanlder""" + @staticmethod + def _get_request_info(request): + """returns info about the http request""" + return "{0} {1}{2}".format(request.method, request.script_name, request.path_info) @cherrypy.expose + @cherrypy.popargs('policy_id') @cherrypy.tools.json_out() - def index(self): - """find the latest policy by policy_id or all latest policies""" - audit = Audit(req_message="get /policies_latest", headers=cherrypy.request.headers) - res = PolicyRest.get_latest_policies(audit) or {} - PolicyWeb.logger.info("PoliciesLatest: %s", json.dumps(res)) - audit.audit_done(result=json.dumps(res)) + def policy_latest(self, policy_id): + """retireves the latest policy identified by policy_id""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + PolicyWeb.logger.info("%s policy_id=%s headers=%s", \ + req_info, policy_id, json.dumps(cherrypy.request.headers)) + + res = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} + + PolicyWeb.logger.info("res %s policy_id=%s res=%s", req_info, policy_id, json.dumps(res)) + + success, http_status_code, response_description = audit.audit_done(result=json.dumps(res)) + if not success: + raise cherrypy.HTTPError(http_status_code, response_description) return res -@cherrypy.popargs('policy_id') -class PolicyLatest(object): - """REST API of the policy-hanlder""" + def _get_all_policies_latest(self): + """retireves all the latest policies on GET /policies_latest""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + + valid_policies, errored_policies = PolicyRest.get_latest_policies(audit) + + res = {"valid_policies": valid_policies, "errored_policies": errored_policies} + PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res)) + + success, http_status_code, response_description = audit.audit_done(result=json.dumps(res)) + if not success: + raise cherrypy.HTTPError(http_status_code, response_description) + return res @cherrypy.expose @cherrypy.tools.json_out() - def index(self, policy_id): - """find the latest policy by policy_id or all latest policies""" - audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""), \ + @cherrypy.tools.json_in() + def policies_latest(self): + """ + on :GET: retrieves all the latest policies from policy-engine that are + in the scope of the policy-handler. + + on :POST: expects to receive the params that mimic the /getConfig of policy-engine + and retrieves the matching policies from policy-engine and picks the latest on each policy. + + sample request - policies filter + + { + "configAttributes": { "key1":"value1" }, + "configName": "alex_config_name", + "ecompName": "DCAE", + "policyName": "DCAE_alex.Config_alex_.*", + "unique": false + } + + sample response + + { + "DCAE_alex.Config_alex_priority": { + "policy_body": { + "policyName": "DCAE_alex.Config_alex_priority.3.xml", + "policyConfigMessage": "Config Retrieved! ", + "responseAttributes": {}, + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "matchingConditions": { + "priority": "10", + "key1": "value1", + "ECOMPName": "DCAE", + "ConfigName": "alex_config_name" + }, + "property": null, + "config": { + "foo": "bar", + "foo_updated": "2017-10-06T16:54:31.696Z" + }, + "policyVersion": "3" + }, + "policy_id": "DCAE_alex.Config_alex_priority" + } + } + """ + if cherrypy.request.method == "GET": + return self._get_all_policies_latest() + + if cherrypy.request.method != "POST": + raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method)) + + policy_filter = cherrypy.request.json or {} + str_policy_filter = json.dumps(policy_filter) + + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message="{0}: {1}".format(req_info, str_policy_filter), \ headers=cherrypy.request.headers) - PolicyWeb.logger.info("PolicyLatest policy_id=%s headers=%s", \ - policy_id, json.dumps(cherrypy.request.headers)) - res = PolicyRest.get_latest_policy((audit, policy_id)) or {} - PolicyWeb.logger.info("PolicyLatest policy_id=%s res=%s", policy_id, json.dumps(res)) - audit.audit_done(result=json.dumps(res)) + PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \ + req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) + + res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + + PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \ + req_info, str_policy_filter, json.dumps(res)) + + success, http_status_code, response_description = audit.audit_done(result=json.dumps(res)) + if not success: + raise cherrypy.HTTPError(http_status_code, response_description) return res -class PoliciesCatchUp(object): - """catch up with all DCAE policies""" @cherrypy.expose @cherrypy.tools.json_out() - def index(self): - """catch up with all policies""" + def catch_up(self): + """catch up with all DCAE policies""" started = str(datetime.now()) - audit = Audit(req_message="get /catch_up", headers=cherrypy.request.headers) - PolicyEngineClient.catch_up(audit) + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + PolicyReceiver.catch_up(audit) + res = {"catch-up requested": started} - PolicyWeb.logger.info("PoliciesCatchUp: %s", json.dumps(res)) + PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res)) audit.info_requested(started) return res + + @cherrypy.expose + def shutdown(self): + """Shutdown the policy-handler""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s: --- stopping REST API of policy-handler ---", req_info) + + cherrypy.engine.exit() + + PolicyReceiver.shutdown(audit) + + health = json.dumps(Audit.health()) + audit.info("policy_handler health: {0}".format(health)) + PolicyWeb.logger.info("policy_handler health: %s", health) + PolicyWeb.logger.info("%s: --------- the end -----------", req_info) + res = str(datetime.now()) + audit.info_requested(res) + return "goodbye! shutdown requested {0}".format(res) + + @cherrypy.expose + @cherrypy.tools.json_out() + def healthcheck(self): + """returns the healthcheck results""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + + res = Audit.health() + + PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res)) + + audit.audit_done(result=json.dumps(res)) + return res |