summaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/PolicyEngine.py1219
-rw-r--r--policyhandler/__init__.py20
-rw-r--r--policyhandler/__main__.py (renamed from policyhandler/policy_handler.py)41
-rw-r--r--policyhandler/config.py89
-rw-r--r--policyhandler/customize/__init__.py32
-rw-r--r--policyhandler/customize/customizer.py34
-rw-r--r--policyhandler/customize/customizer_base.py63
-rw-r--r--policyhandler/deploy_handler.py170
-rw-r--r--policyhandler/discovery.py68
-rw-r--r--policyhandler/onap/CommonLogger.py30
-rw-r--r--policyhandler/onap/__init__.py3
-rw-r--r--policyhandler/onap/audit.py469
-rw-r--r--policyhandler/onap/health.py160
-rw-r--r--policyhandler/onap/process_info.py152
-rw-r--r--policyhandler/policy_consts.py16
-rw-r--r--policyhandler/policy_engine.py103
-rw-r--r--policyhandler/policy_receiver.py200
-rw-r--r--policyhandler/policy_rest.py655
-rw-r--r--policyhandler/policy_updater.py296
-rw-r--r--policyhandler/policy_utils.py175
-rw-r--r--policyhandler/step_timer.py175
-rw-r--r--policyhandler/web_server.py236
22 files changed, 2373 insertions, 2033 deletions
diff --git a/policyhandler/PolicyEngine.py b/policyhandler/PolicyEngine.py
deleted file mode 100644
index 7a69894..0000000
--- a/policyhandler/PolicyEngine.py
+++ /dev/null
@@ -1,1219 +0,0 @@
-"""
-PolicyEngine API for Python
-
-@author: Tarun, Mike
-@version: 0.9
-@change:
- added Enum 'Other' Type and supported it in PolicyConfig class - 1/13
- supporting Remote URL capability to the PolicyEngine as the parameter - 1/13
- Request format has been updated accordingly. No need to send the PDP URLs anymore - 1/26
- Feature where the PolicyEngine chooses available PyPDP among different URLs. 1/26
- Major feature addition required for Notifications 2/17 , Fixed Session issues. 2/25
- Major change in API structure for combining results 3/4.
- Added Security support for Notifications and clearNotification Method 3/18
- newMethod for retrieving configuration using policyFileName 3/20
- logging 3/24
- Notification Bug Fixes 7/21
- basic Auth 7/22
- Notification Changes 9/3
- ECOMP Error codes included 10/1
- -2016
- Major Changes to the Policy Engine API 3/29
- DeletePolicy API and Changes to pushPolicy and getConfig 5/27
- ConfigRequestParmeters and Error code Change 7/11
- New Environment Variable and Client Authorizations Change 7/21
- Changes to the Policy Parameters 8/21
- Allow Clients to use their own Password Protection.
- Dictionary Types and its Fixes.
-"""
-
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import json,sys, os, collections, websocket , logging, time, base64, uuid
-from websocket import create_connection
-from enum import Enum
-from xml.etree.ElementTree import XML
-try:
- # For Python 3.0 and Later
- from pip._vendor import requests
-except ImportError:
- # For backend Support to Python 2's urllib2
- import requests
-try:
- # For Python 3.0 and Later
- from urllib.request import urlopen
-except ImportError:
- # Fall back for Python 2.*
- from urllib2 import urlopen
-try:
- import thread
-except ImportError:
- #Fall Back for Python 2.x
- import _thread as thread
-
-PolicyConfigStatus = Enum('PolicyConfigStatus', 'CONFIG_NOT_FOUND CONFIG_RETRIEVED')
-PolicyType = Enum('PolicyType', 'JSON XML PROPERTIES OTHER')
-PolicyResponseStatus = Enum('PolicyResponseStatus', 'ACTION_ADVISED ACTION_TAKEN NO_ACTION_REQUIRED')
-NotificationType = Enum('NotificationType', 'BOTH UPDATE REMOVE')
-UpdateType = Enum('UpdateType', 'UPDATE NEW')
-NotificationScheme = Enum('NotificationScheme', 'AUTO_ALL_NOTIFICATIONS AUTO_NOTIFICATIONS MANUAL_ALL_NOTIFICATIONS MANUAL_NOTIFICATIONS')
-AttributeType = Enum('AttributeType', 'MATCHING MICROSERVICE RULE SETTINGS')
-PolicyClass = Enum('PolicyClass', 'Action Config Decision')
-PolicyConfigType = Enum('PolicyConfigType', 'Base BRMS_PARAM BRMS_RAW ClosedLoop_Fault ClosedLoop_PM Firewall MicroService Extended')
-DeletePolicyCondition = Enum('DeletePolicyCondition', 'ONE ALL')
-RuleProvider = Enum('RuleProvider', 'Custom AAF GUARD_YAML')
-DictionaryType = Enum('DictionaryType', 'Common Action ClosedLoop Firewall Decision BRMS GOC MicroService DescriptiveScope PolicyScope Enforcer SafePolicy' )
-ImportType = Enum('ImportType', 'MICROSERVICE')
-
-class PolicyEngine:
- """
- PolicyEngine is the Class which needs to be instantiated to call the PDP server.
- It needs the *.properties* file path as the parameter to the constructor.
- """
- def __init__(self, filename, scheme=None, handler=None, clientKey=None, basic_client_auth=True):
- """
- @param filename: String format of the path location of .properties file Could also be A remote URL eg: http://localhost:8080/config.properties
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- @param clientKey: Decoded Client Key to be used by PolicyEngine.
- @attention:The .properties file must contain the PYPDP_URL parameter in it. The parameter can have multiple URLs the PolicyEngine chooses the available PyPDP among them.
- """
- self.filename = filename
- self.urldict = {}
- self.matchStore = []
- self.autoURL = None
- self.scheme = None
- self.handler = None
- self.thread = thread
- self.autows = None
- self.mclose = False
- self.restart = False
- self.logger = logging.getLogger()
- self.resturl= []
- self.encoded= []
- self.clientInfo = None
- self.environment = None
- self.policyheader = {}
- if(filename.startswith("http")):
- try:
- policy_data = urlopen(filename)
- for line in policy_data :
- line = line.decode('utf-8')
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- #print("key= "+key+" Value =" +value )
- self.urldict[key] = value
- except:
- self.logger.error("PE300 - Data Issue: Error with the Config URL: %s" , filename )
- print("PE300 - Data Issue: Config Properties URL Error")
- sys.exit(0)
- else :
- fileExtension = os.path.splitext(filename)
- if(fileExtension[1]!=".properties"):
- self.logger.error("PE300 - Data Issue: File is not in properties format: %s", filename)
- print("PE300 - Data Issue: Not a .properties file!")
- sys.exit(0)
- try :
- with open(self.filename, 'r') as f:
- for line in f:
- line = line.rstrip() # removes trailing whitespace and '\n' chars
- line = line.replace(" ","") # removing spaces
- if "=" not in line: continue #skips blanks and comments w/o =
- if line.startswith("#"): continue #skips comments which contain =
- key, value = line.split("=",1)
- key = key.rstrip().lstrip()
- value = value.lstrip()
- # self.logger.info("key=%s Value=%s", key, value)
- self.urldict[key] = value
- except FileNotFoundError:
- self.logger.error("PE300 - Data Issue: File Not found: %s", filename)
- print("PE300 - Data Issue: File Doesn't exist in the given Location")
- sys.exit(0)
- #TODO logic for best available PyPDP servers
- try:
- self.urldict = collections.OrderedDict(sorted(self.urldict.items()))
- clientID = self.urldict.get("CLIENT_ID")
- # self.logger.info("clientID decoded %s", base64.b64decode(clientID).decode("utf-8"))
- # client_parts = base64.b64decode(clientID).split(":")
- # client_parts = clientID.split(":")
- # self.logger.info("ClientAuth:Basic %s", base64.b64encode(clientID))
- # self.logger.info("CLIENT_ID[0] = %s", client_parts[0])
- # self.logger.info("CLIENT_ID[0] base64 = %s", base64.b64encode(client_parts[0]))
- # self.logger.info("CLIENT_KEY base64 = %s", base64.b64encode(client_parts[1]))
- if(clientKey is None):
- try:
- client = base64.b64decode(self.urldict.get("CLIENT_KEY")).decode("utf-8")
- except Exception:
- self.logger.warn("PE300 - Data Issue: CLIENT_KEY parameter is not in the required encoded Format taking Value as clear Text")
- client = self.urldict.get("CLIENT_KEY")
- else:
- client = clientKey
- if(clientID is None or client is None):
- self.logger.error("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file")
- sys.exit(0)
- else:
- uid = clientID.encode('ascii')
- password = client.encode('ascii')
- self.clientInfo = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.environment = self.urldict.get("ENVIRONMENT")
- if(self.environment is None):
- self.logger.info("Missing Environment Variable setting to Default Value.")
- self.environment = "DEVL"
- self.policyheader = {
- "Content-type" : "application/json",
- "Accept" : "application/json",
- "ClientAuth" : ("Basic " if basic_client_auth else "") + self.clientInfo,
- "Environment" : self.environment
- }
- for key in self.urldict.keys():
- if(key.startswith("PYPDP_URL")):
- pypdpVal = self.urldict.get(key)
- if pypdpVal is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- if ";" in pypdpVal:
- pdpDefault = pypdpVal.split(";")
- if pdpDefault is None:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- else:
- for count in range(0, len(pdpDefault)):
- self.__pdpParam(pdpDefault[count])
- else:
- self.__pdpParam(pypdpVal)
-
- self.logger.info("PolicyEngine url: %s policyheader: %s urldict: %s", \
- self.resturl, json.dumps(self.policyheader), json.dumps(self.urldict))
- if len(self.resturl)==0:
- self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
- print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
- sys.exit(0)
- except:
- self.logger.error("PE300 - Data Issue: missing parameter(s) in the properties file: %s ", filename)
- print("PE300 - Data Issue: missing parameter(s) in the properties file")
- sys.exit(0)
- # Scheme and Handler code to be handled from here.
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("handler should be a object of NotificationHandler class")
- # sys.exit(0)
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- self.logger.error("PE300 - Data Issue: Scheme not a type of NotificationScheme: %s", scheme.name)
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
- sys.exit(0)
-
- def __pdpParam(self,pdpValue):
- """
- Internal Usage for reading PyPDP Parameters
- """
- if pdpValue is None:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- elif "," in pdpValue:
- pdpValues = pdpValue.split(",")
- if (len(pdpValues)==3):
- # 0 is pypdp URL
- self.resturl.append(pdpValues[0])
- # 1 and 2 are user name password
- if pdpValues[1] and pdpValues[2]:
- uid = pdpValues[1].encode('ascii')
- password = pdpValues[2].encode('ascii')
- encoded = base64.b64encode(uid+ b':'+password).decode('utf-8')
- self.encoded.append(encoded)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
- else:
- self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
- print("PE100 - Permissions Error: No Enough Credentials to send Request")
- sys.exit(0)
-
- def getConfigByPolicyName(self, policyName, requestID=None):
- """
- @param policyName: String format of the PolicyFile Name whose configuration is required.
- @return: Returns a List of PolicyConfig Object(s).
- @deprecated: use getConfig instead.
- """
- __policyNameURL = "/getConfigByPolicyName"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__policyNamejson = {}
- self.__policyNamejson['policyName'] = policyName
- self.__cpnResponse = self.__callPDP(__policyNameURL, json.dumps(self.__policyNamejson), __headers, "POST")
- self.__cpnJSON = self.__cpnResponse.json()
- policyConfigs= self.__configResponse(self.__cpnJSON)
- return policyConfigs
-
- def listConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- listConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyNames.
- """
- __configURL = "/listConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- __configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- __cResponse = self.__callPDP(__configURL, json.dumps(__configjson), __headers, "POST")
- return __cResponse.json()
-
-
- def getConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
- """
- getConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
- @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
- @param configName: String of the configName whose configuration is required. Not Mandatory field.
- @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
- @param policyName: String of the policyName whose configuration is required.
- @param unique: Boolean value which can be set to True if Unique results are required.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyConfig Object(s).
- """
- __configURL = "/getConfig"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
- #self.__configjson['pdp_URL'] = self.pdp_url
- self.__cResponse = self.__callPDP(__configURL, json.dumps(self.__configjson), __headers, "POST")
- #self.__configURL = self.resturl+__configURL
- #self.__cResponse = requests.post(self.__configURL, data=json.dumps(self.__configjson), headers = __headers)
- self.__cJSON = self.__cResponse.json()
- policyConfigs= self.__configResponse(self.__cJSON)
- # if we have successfully retrieved a policy we will store the match values.
- matchFound = False
- for policyConfig in policyConfigs:
- if policyConfig._policyConfigStatus == PolicyConfigStatus.CONFIG_RETRIEVED.name:
- # self.logger.info("Policy has been Retrieved !!")
- matchFound = True
- if matchFound:
- __match = {}
- __match["ECOMPName"] = eCOMPComponentName
- if configName is not None:
- __match["ConfigName"] = configName
- if configAttributes is not None:
- __match.update(configAttributes)
- if not self.matchStore:
- self.matchStore.append(__match)
- else:
- __booMatch = False
- for eachDict in self.matchStore:
- if eachDict==__match:
- __booMatch = True
- break
- if __booMatch==False:
- self.matchStore.append(__match)
- return policyConfigs
-
- def __configRequestParametersJSON(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False):
- """ Internal Function to set JSON from configRequestParameters
- """
- json= {}
- if eCOMPComponentName is not None:
- json['ecompName'] = eCOMPComponentName
- if configName is not None:
- json['configName'] = configName
- if configAttributes is not None:
- json['configAttributes'] = configAttributes
- if policyName is not None:
- json['policyName'] = policyName
- json['unique'] = unique
- return json
-
- def __configResponse(self, cJSON):
- """
- Internal function to take the convert JSON to Response Object.
- """
- policyConfigs=[]
- for configJSON in cJSON:
- policyConfig = PolicyConfig()
- policyConfig._policyConfigMessage = configJSON['policyConfigMessage']
- policyConfig._policyConfigStatus = configJSON['policyConfigStatus']
- policyConfig._policyType = configJSON['type']
- policyConfig._policyName = configJSON['policyName']
- policyConfig._policyVersion = configJSON['policyVersion']
- policyConfig._matchingConditions = configJSON['matchingConditions']
- policyConfig._responseAttributes = configJSON['responseAttributes']
- if PolicyType.JSON.name == policyConfig._policyType:
- policyConfig._json = configJSON['config']
- elif PolicyType.XML.name == policyConfig._policyType:
- policyConfig._xml = XML(configJSON['config'])
- elif PolicyType.PROPERTIES.name == policyConfig._policyType:
- policyConfig._properties = configJSON['property']
- elif PolicyType.OTHER.name == policyConfig._policyType:
- policyConfig._other = configJSON['config']
- policyConfigs.append(policyConfig)
- return policyConfigs
-
- def getDecision(self, decisionAttributes, ecompcomponentName, requestID = None):
- """
- getDecision function sends the Decision Attributes to the PDP server and gets the response to the client from PDP.
- @param decisionAttributes: Dictionary of Decision Attributes in Key and Value String formats.
- @param ecompcomponentName:
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a DecisionResponse Object.
- """
- __decisionurl = "/getDecision"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__decisionjson={}
- self.__decisionjson['decisionAttributes'] = decisionAttributes
- self.__decisionjson['ecompcomponentName'] = ecompcomponentName
- self.__dResponse = self.__callPDP(__decisionurl, json.dumps(self.__decisionjson), __headers, "POST")
- self.__dJSON = self.__dResponse.json()
- decisionResponse = DecisionResponse()
- decisionResponse._decision = self.__dJSON['decision']
- decisionResponse._details = self.__dJSON['details']
- return decisionResponse
-
- def sendEvent(self, eventAttributes, requestID=None):
- """
- sendEvent function sends the Event to the PDP server and gets the response to the client from the PDP.
- @param eventAttributes:Dictonary of the EventAttributes in Key and Value String formats.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a List of PolicyResponse Object(s).
- """
- __eventurl = "/sendEvent"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- self.__eventjson = {}
- self.__eventjson['eventAttributes'] = eventAttributes
- #self.__eventjson['pdp_URL'] = self.pdp_url
- self.__eResponse = self.__callPDP(__eventurl, json.dumps(self.__eventjson), __headers, "POST")
- #self.__eventurl = self.resturl+__eventurl
- #self.__eResponse = requests.post(self.__eventurl, data=json.dumps(self.__eventjson), headers = __headers)
- self.__eJSON = self.__eResponse.json()
- policyResponses=[]
- for eventJSON in self.__eJSON:
- policyResponse = PolicyResponse()
- policyResponse._policyResponseMessage = eventJSON['policyResponseMessage']
- policyResponse._policyResponseStatus = eventJSON['policyResponseStatus']
- policyResponse._actionAdvised = eventJSON['actionAdvised']
- policyResponse._actionTaken = eventJSON['actionTaken']
- policyResponse._requestAttributes = eventJSON['requestAttributes']
- policyResponses.append(policyResponse)
- return policyResponses
-
- def createPolicy(self, policyParameters):
- """
- 'createPolicy creates Policy using the policyParameters sent'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __createurl = "/createPolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__createJson = {}
- self.__createJson = self.__policyParametersJSON(policyParameters)
- self.__createResponse = self.__callPDP(__createurl, json.dumps(self.__createJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__createResponse.status_code
- policyChangeResponse._responseMessage = self.__createResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def updatePolicy(self, policyParameters):
- """
- 'updatePolicy updates Policy using the policyParameters sent.'
- @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __updateurl = "/updatePolicy"
- __headers = self.policyheader
- try:
- if policyParameters._requestID is None:
- policyParameters._requestID = str(uuid.uuid4())
- self.__updateJson = {}
- self.__updateJson = self.__policyParametersJSON(policyParameters)
- self.__updateResponse = self.__callPDP(__updateurl, json.dumps(self.__updateJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__updateResponse.status_code
- policyChangeResponse._responseMessage = self.__updateResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
- print("PE300 - Data Issue: policyParamters object Error")
-
- def __policyParametersJSON(self, policyParameters):
- """ Internal Function to set JSON from policyParameters Object
- """
- json= {}
- if policyParameters._actionAttribute is not None:
- json['actionAttribute'] = policyParameters._actionAttribute
- if policyParameters._actionPerformer is not None:
- json['actionPerformer'] = policyParameters._actionPerformer
- if policyParameters._attributes is not None:
- json['attributes'] = policyParameters._attributes
- if policyParameters._configBody is not None:
- json['configBody'] = policyParameters._configBody
- if policyParameters._configBodyType is not None:
- json['configBodyType'] = policyParameters._configBodyType
- if policyParameters._configName is not None:
- json['configName'] = policyParameters._configName
- if policyParameters._controllerName is not None:
- json['controllerName'] = policyParameters._controllerName
- if policyParameters._dependencyNames is not None:
- json['dependencyNames'] = policyParameters._dependencyNames
- if policyParameters._dynamicRuleAlgorithmLabels is not None:
- json['dynamicRuleAlgorithmLabels'] = policyParameters._dynamicRuleAlgorithmLabels
- if policyParameters._dynamicRuleAlgorithmField1 is not None:
- json['dynamicRuleAlgorithmField1'] = policyParameters._dynamicRuleAlgorithmField1
- if policyParameters._dynamicRuleAlgorithmField2 is not None:
- json['dynamicRuleAlgorithmField2'] = policyParameters._dynamicRuleAlgorithmField2
- if policyParameters._dynamicRuleAlgorithmFunctions is not None:
- json['dynamicRuleAlgorithmFunctions'] = policyParameters._dynamicRuleAlgorithmFunctions
- if policyParameters._ecompName is not None:
- json['ecompName'] = policyParameters._ecompName
- if policyParameters._extendedOption is not None:
- json['extendedOption'] = policyParameters._extendedOption
- if policyParameters._guard is not None:
- json['guard'] = policyParameters._guard
- if policyParameters._policyClass is not None:
- json['policyClass'] = policyParameters._policyClass
- if policyParameters._policyConfigType is not None:
- json['policyConfigType'] = policyParameters._policyConfigType
- if policyParameters._policyName is not None:
- json['policyName'] = policyParameters._policyName
- if policyParameters._policyDescription is not None:
- json['policyDescription'] = policyParameters._policyDescription
- if policyParameters._priority is not None:
- json['priority'] = policyParameters._priority
- if policyParameters._requestID is not None:
- json['requestID'] = policyParameters._requestID
- if policyParameters._riskLevel is not None:
- json['riskLevel'] = policyParameters._riskLevel
- if policyParameters._riskType is not None:
- json['riskType'] = policyParameters._riskType
- if policyParameters._ruleProvider is not None:
- json['ruleProvider'] = policyParameters._ruleProvider
- if policyParameters._ttlDate is not None:
- json['ttlDate'] = policyParameters._ttlDate
- return json
-
- def pushPolicy(self, pushPolicyParameters, requestID = None):
- """
- 'pushPolicy pushes a policy based on the given Push Policy Parameters. '
- @param pushPolicyParameters: This is an object of PushPolicyParameters which is required as a parameter to this method.
- @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
- @return: Returns a PolicyChangeResponse Object
- """
- __pushurl = "/pushPolicy"
- __headers = self.policyheader
- if requestID is not None:
- __headers["X-ECOMP-RequestID"] = str(requestID)
- else:
- __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
- try:
- self.__pushJson = {}
- self.__pushJson['pdpGroup'] = pushPolicyParameters._pdpGroup
- self.__pushJson['policyName'] = pushPolicyParameters._policyName
- self.__pushJson['policyType'] = pushPolicyParameters._policyType
- self.__pushResponse = self.__callPDP(__pushurl, json.dumps(self.__pushJson), __headers, "PUT")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__pushResponse.status_code
- policyChangeResponse._responseMessage = self.__pushResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the pushPolicyParameters Object. It needs to be object of PushPolicyParameters ")
- print("PE300 - Data Issue: pushPolicyParamters object Error")
-
- def deletePolicy(self, deletePolicyParameters):
- """
- 'deletePolicy Deletes a policy or all its version according to the given deletePolicyParameters'
- @param deletePolicyParameters: This is an Object of DeletePolicyParameters which is required as a parameter to this method.
- @return: Returns a PolicyChangeResponse Object
- """
- __deleteurl = "/deletePolicy"
- __createdictionaryurl = "/createDictionaryItem"
- __headers = self.policyheader
- try:
- if deletePolicyParameters._requestID is None:
- deletePolicyParameters._requestID = str(uuid.uuid4())
- self.__deleteJson = {}
- self.__deleteJson['deleteCondition'] = deletePolicyParameters._deleteCondition
- self.__deleteJson['pdpGroup'] = deletePolicyParameters._pdpGroup
- self.__deleteJson['policyComponent'] = deletePolicyParameters._policyComponent
- self.__deleteJson['policyName'] = deletePolicyParameters._policyName
- self.__deleteJson['policyType'] = deletePolicyParameters._policyType
- self.__deleteJson['requestID'] = deletePolicyParameters._requestID
- self.__deleteResponse = self.__callPDP(__deleteurl, json.dumps(self.__deleteJson), self.policyheader, "DELETE")
- policyChangeResponse = PolicyChangeResponse()
- policyChangeResponse._responseCode = self.__deleteResponse.status_code
- policyChangeResponse._responseMessage = self.__deleteResponse.text
- return policyChangeResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the deletePolicyParameters Object. It needs to be object of DeletePolicyParameters ")
- print("PE300 - Data Issue: deletePolicyParameters object Error")
-
- def createDictionaryItems(self, dictionaryParameters):
- """
- 'createDictionaryItems adds dictionary items to the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __createdictionaryurl = '/createDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__createResponse = self.__callPDP(__createdictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__createResponse.status_code
- dictionaryResponse._responseMessage = self.__createResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
-
- def updateDictionaryItems(self, dictionaryParameters):
- """
- 'updateDictionaryItems edits dictionary items in the database for a specific dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
- @return: Returns a DictionaryResponse object
- """
- __updatedictionaryurl = '/updateDictionaryItem'
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json={}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__updateResponse = self.__callPDP(__updatedictionaryurl, json.dumps(self.__json), __headers, "PUT")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__updateResponse.status_code
- dictionaryResponse._responseMessage = self.__updateResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getDictionaryItems(self, dictionaryParameters):
- """
- 'getDictionaryItems gets all the dictionary items stored in the database for a specified dictionary'
- @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method.
- @return: Returns a DictionaryResponse object
- """
- __retrievedictionaryurl = "/getDictionaryItems"
- __headers = self.policyheader
- try:
- if dictionaryParameters._requestID is None:
- dictionaryParameters._requestID = str(uuid.uuid4())
- self.__json = {}
- self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
- self.__json['dictionary'] = dictionaryParameters._dictionary
- self.__json['requestID'] = dictionaryParameters._requestID
- self.__getResponse = self.__callPDP(__retrievedictionaryurl, json.dumps(self.__json), __headers, "POST")
- dictionaryResponse = DictionaryResponse()
- dictionaryResponse._responseCode = self.__getResponse.status_code
- dictionaryResponse._responseMessage = self.__getResponse.text
- return dictionaryResponse
- except:
- self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
- print("PE300 - Data Issue: dictionaryParameters object Error")
-
- def getNotification(self):
- """
- gets the PDPNotification if the appropriate NotificationScheme is selected.
- @return: Returns a PDPNotification Object.
- """
- if ((self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # Manual Client for websocket Code in here.
- if(self.resturl[0].startswith("https")):
- __man_url = self.resturl[0].replace("https","wss")+"notifications"
- else:
- __man_url = self.resturl[0].replace("http","ws")+"notifications"
- __result = self.__manualRequest(__man_url)
- self.logger.debug("Manual Notification with server: %s \n result is: %s" , __man_url , __result)
- # TODO convert the result to PDP Notifications.
- if (self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name):
- # need to add all the values to the PDPNotification..
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if __result is None:
- return None
- if __result['removedPolicies']:
- removedPolicies = []
- for removed in __result['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if __result['loadedPolicies']:
- updatedPolicies = []
- for updated in __result['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
- elif (self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name):
- return self.__checkNotification(__result)
- else:
- return None
-
- def setNotification(self, scheme, handler = None):
- """
- setNotification allows changes to the NotificationScheme and the NotificationHandler.
- @param scheme: NotificationScheme to select the scheme required for notification updates.
- @param handler: NotificationHandler object which will be called when an event is occurred.
- """
- if handler is not None:
- #if type(handler) is NotificationHandler:
- self.handler = handler
- #else:
- # print("Error: handler should be a object of NotificationHandler class")
- if scheme is not None:
- if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- # setup the Auto settings.
- self.scheme = scheme
- self.__startAuto()
- elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
- # setup the Manual Settings
- self.scheme = scheme
- else:
- print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
-
- def clearNotification(self):
- """
- clearNotification ShutsDown the AutoNotification service if running.
- """
- if self.scheme is not None:
- if((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.logger.info("Notification Service Stopped.")
- print("Notification Service is Stopped!!")
-
- def __callPDP(self,urlFunction, jsonData, headerData,method, files= None, params = None):
- """
- This function call is for internal usage purpose only.
- Calls the available PyPDP
- """
- connected = False
- response = None
- errormessage = ''
- for count in range(0, len(self.resturl)):
- try:
- logging.basicConfig(level=logging.DEBUG)
- request_url = self.resturl[0]+ urlFunction
- self.logger.debug("--- Sending Request to : %s",request_url)
- try:
- self.logger.debug("Request ID %s :",headerData["X-ECOMP-RequestID"])
- except:
- if jsonData is not None:
- self.logger.debug("Request ID %s :",json.loads(jsonData)['requestID'])
- self.logger.debug("Request Data is: %s" ,jsonData)
- headerData["Authorization"]= "Basic " + self.encoded[0]
- if(method=="PUT"):
- response = requests.put(request_url, data = jsonData, headers = headerData)
- elif(method=="DELETE"):
- response = requests.delete(request_url, data = jsonData, headers = headerData)
- elif(method=="POST"):
- if params is not None:
- # files = files, params = params,
- response = requests.post(request_url, params = params, headers = headerData)
- else:
- response = requests.post(request_url, data = jsonData, headers = headerData)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #response = requests.post(request_url, data = jsonData, headers = headerData, verify=False)
- self.logger.debug("--- Response is : ---")
- self.logger.debug(response.status_code)
- self.logger.debug(response.headers)
- self.logger.debug(response.text)
- if(response.status_code == 200) :
- connected = True
- self.logger.info("connected to the PyPDP: %s", request_url)
- break
- elif(response.status_code == 202) :
- connected = True
- break
- elif(response.status_code == 400):
- self.logger.debug("PE400 - Schema Issue: Incorrect Params passed: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE400 - Schema Issue: Incorrect Params passed: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 401):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- elif(response.status_code == 403):
- self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- else:
- self.logger.debug("PE200 - System Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- except Exception as e:
- print(str(e));
- self.logger.debug("PE200 - System Error: PyPDP Error: %s", self.resturl[0])
- errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
- self.__rotatePDP()
- if(connected):
- if(self.autoURL==None):
- self.__startAuto()
- elif(self.autoURL!= self.resturl[0]):
- self.__startAuto()
- return response
- else:
- self.logger.error("PE200 - System Error: cannot connect to given PYPDPServer(s) %s", self.resturl)
- print(errormessage)
- sys.exit(0)
-
- def __rotatePDP(self):
- self.resturl = collections.deque(self.resturl)
- self.resturl.rotate(-1)
- self.encoded = collections.deque(self.encoded)
- self.encoded.rotate(-1)
-
- def __checkNotification(self, resultJson):
- """
- This function call is for Internal usage purpose only.
- Checks the Notification JSON compares it with the MatchStore and returns the PDPNotification object.
- """
- if not resultJson:
- return None
- if not self.matchStore:
- return None
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if resultJson['removedPolicies']:
- removedPolicies = []
- for removed in resultJson['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if resultJson['updatedPolicies']:
- updatedPolicies = []
- for updated in resultJson['updatedPolicies']:
- updatedPolicy = LoadedPolicy()
- # check if it has matches then it is a Config Policy and compare it with Match Store.
- if updated['matches']:
- # compare the matches with our Stored Matches
- for eachDict in self.matchStore:
- if eachDict==updated['matches']:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- else:
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicies.append(updatedPolicy)
- boo_Update = True
- pDPNotification._loadedPolicies= updatedPolicies
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- return pDPNotification
-
- def __startAuto(self):
- """
- Starts the Auto Notification Feature..
- """
- if self.scheme is not None:
- if ((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
- if self.handler is None:
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- if self.autoURL is None:
- self.autoURL = self.resturl[0]
- elif self.autoURL != self.resturl[0]:
- self.autoURL = self.resturl[0]
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
- else:
- self.autows = None
- if self.autows is None:
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
- elif self.autows.sock is not None:
- if not (self.autows.sock.connected):
- self.mclose = True
- self.autows.close()
- self.restart = True
- self.__rotatePDP()
- if(self.autoURL.startswith("https")):
- __auto_url = self.autoURL.replace("https","wss")+"notifications"
- else:
- __auto_url = self.autoURL.replace("http","ws")+"notifications"
- def run(*args):
- self.__autoRequest(__auto_url)
- self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
- self.thread.start_new_thread(run , ())
-
- else:
- #stop the Auto Notification Service if it is running.
- if self.autows.sock is not None:
- if(self.autows.sock.connected):
- self.mclose= True
- self.autows.close()
-
- def __onEvent(self, message):
- """
- Handles the event Notification received.
- """
- message = json.loads(message)
- if self.handler is not None:
- if (self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name):
- pDPNotification = PDPNotification()
- boo_Remove = False
- boo_Update = False
- if message['removedPolicies']:
- removedPolicies = []
- for removed in message['removedPolicies']:
- removedPolicy = RemovedPolicy()
- removedPolicy._policyName = removed['policyName']
- removedPolicy._policyVersion = removed['versionNo']
- removedPolicies.append(removedPolicy)
- pDPNotification._removedPolicies= removedPolicies
- boo_Remove = True
- if message['loadedPolicies']:
- updatedPolicies = []
- for updated in message['loadedPolicies']:
- updatedPolicy = LoadedPolicy()
- updatedPolicy._policyName = updated['policyName']
- updatedPolicy._policyVersion = updated['versionNo']
- updatedPolicy._matchingConditions = updated['matches']
- updatedPolicy._updateType = updated['updateType']
- updatedPolicies.append(updatedPolicy)
- pDPNotification._loadedPolicies= updatedPolicies
- boo_Update = True
- if (boo_Update and boo_Remove):
- pDPNotification._notificationType = NotificationType.BOTH.name
- elif boo_Update:
- pDPNotification._notificationType = NotificationType.UPDATE.name
- elif boo_Remove:
- pDPNotification._notificationType = NotificationType.REMOVE.name
- # call the Handler.
- self.handler.notificationReceived(pDPNotification)
- elif (self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name):
- # call the handler
- self.handler(self.__checkNotification(message))
-
- def __manualRequest(self,request_url):
- """
- Takes the request_URL given and returns the JSON response back to the Caller.
- """
- ws = create_connection(request_url)
- # when using self-signed server certificate, comment previous line and uncomment following:
- #ws = create_connection(request_url, sslopt={"cert_reqs": ssl.CERT_NONE})
- ws.send("Manual")
- try:
- return json.loads(ws.recv())
- except:
- return None
- ws.close()
- ws.shutdown()
-
- def __onMessage(self, ws,message):
- """Occurs on Event
- """
- self.logger.info("Received AutoNotification message: %s" , message)
- self.__onEvent(message)
-
- def __onError(self, ws, error):
- """Self Restart the Notification Service on Error
- """
- self.logger.error("PE500 - Process Flow Issue: Auto Notification service Error!! : %s" , error)
-
- def __onclose(self, ws):
- """Occurs on Close ? Try to start again in case User didn't do it.
- """
- self.logger.debug("Connection has been Closed. ")
- if not self.mclose:
- self.__startAuto()
- self.mclose = False
-
- def __autoRequest(self, request_url):
- """
- Takes the request_URL and invokes the PolicyEngine method on any receiving a Message
- """
- websocket.enableTrace(True)
- self.autows = websocket.WebSocketApp(request_url, on_message= self.__onMessage, on_close= self.__onclose, on_error= self.__onError)
- # wait for to 5 seconds to restart
- if self.restart:
- time.sleep(5)
- self.autows.run_forever()
-
-class NotificationHandler:
- """
- 'Defines the methods which need to run when an Event or a Notification is received.'
- """
- def notificationReceived(self, notification):
- """
- Will be triggered automatically whenever a Notification is received by the PEP
- @param notification: PDPNotification object which has the information of the Policies.
- @attention: This method must be implemented by the user for AUTO type NotificationScheme
- """
- raise Exception("Unimplemented abstract method: %s" % __functionId(self, 1))
-
-def __functionId(obj, nFramesUp):
- """ Internal Usage only..
- Create a string naming the function n frames up on the stack. """
- fr = sys._getframe(nFramesUp+1)
- co = fr.f_code
- return "%s.%s" % (obj.__class__, co.co_name)
-
-class PolicyConfig:
- """
- 'PolicyConfig is the return object resulted by getConfig Call.'
- """
- def __init__(self):
- self._policyConfigMessage = None
- self._policyConfigStatus = None
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._responseAttributes = None
- self._policyType = None
- self._json = None
- self._xml = None
- self._prop = None
- self._other = None
-
-class PolicyResponse:
- """
- 'PolicyResponse is the return object resulted by sendEvent Call.'
- """
- def __init__(self):
- self._policyResponseStatus = None
- self._policyResponseMessage = None
- self._requestAttributes = None
- self._actionTaken = None
- self._actionAdvised= None
-
-class PDPNotification:
- """
- 'Defines the Notification Event sent from the PDP to PEP Client.'
- """
- def __init__(self):
- self._removedPolicies = None
- self._loadedPolicies = None
- self._notificationType = None
-
-class RemovedPolicy:
- """
- 'Defines the structure of the Removed Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
-
-class LoadedPolicy:
- """
- 'Defines the Structure of the Loaded Policy'
- """
- def __init__(self):
- self._policyName = None
- self._policyVersion = None
- self._matchingConditions = None
- self._updateType = None
-
-class PolicyParameters:
- """
- 'Defines the Structure of the Policy to Create or Update'
- """
- def __init__(self):
- self._actionPerformer = None
- self._actionAttribute = None
- self._attributes = None
- self._configBody = None
- self._configBodyType = None
- self._configName = None
- self._controllerName = None
- self._dependencyNames = None
- self._dynamicRuleAlgorithmLabels = None
- self._dynamicRuleAlgorithmFunctions = None
- self._dynamicRuleAlgorithmField1 = None
- self._dynamicRuleAlgorithmField2 = None
- self._ecompName = None
- self._extendedOption = None
- self._guard = None
- self._policyClass = None
- self._policyConfigType = None
- self._policyName = None
- self._policyDescription = None
- self._priority = None
- self._requestID = None
- self._riskLevel = None
- self._riskType = None
- self._ruleProvider = None
- self._ttlDate = None
-
-class PushPolicyParameters:
- """
- 'Defines the Structure of the Push Policy Parameters'
- """
- def __init__(self):
- self._pdpGroup = None
- self._policyName = None
- self._policyType = None
-
-class PolicyChangeResponse:
- """
- 'Defines the Structure of the policy Changes made from PDP'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
-
-class DeletePolicyParameters:
- """
- 'Defines the Structure of the Delete Policy Parameters'
- """
- def __init__(self):
- self._deleteCondition = None
- self._pdpGroup = None
- self._policyComponent = None
- self._policyName = None
- self._policyType = None
- self._requestID = None
-
-class DictionaryParameters:
- """
- 'Defines the Structure of the Dictionary Parameters'
- """
- def __init__(self):
- self._dictionaryType = None
- self._dictionary = None
- self._dictionaryJson = None
- self._requestID = None
-
-class DictionaryResponse:
- """
- 'Defines the Structure of the dictionary response'
- """
- def __init__(self):
- self._responseMessage = None
- self._responseCode = None
- self._dictionaryJson = None
- self._dictionaryData = None
-
-class DecisionResponse:
- """
- 'Defines the Structure of Decision Response'
- """
- def __init__(self):
- self._decision = None
- self._details = None
-
-class ImportParameters:
- """
- 'Defines the Structure of Policy Model Import'
- """
- def __init__(self):
- self._serviceName = None
- self._description = None
- self._requestID = None
- self._filePath = None
- self._importBody = None
- self._version = None
- self._importType = None
diff --git a/policyhandler/__init__.py b/policyhandler/__init__.py
index a3220c4..3315706 100644
--- a/policyhandler/__init__.py
+++ b/policyhandler/__init__.py
@@ -1,6 +1,5 @@
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,3 +15,20 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""policyhandler package"""
+
+class LogWriter(object):
+ """redirect the standard out + err to the logger"""
+ def __init__(self, logger_func):
+ self.logger_func = logger_func
+
+ def write(self, log_line):
+ """actual writer to be used in place of stdout or stderr"""
+ log_line = log_line.rstrip()
+ if log_line:
+ self.logger_func(log_line)
+
+ def flush(self):
+ """no real flushing of the buffer"""
+ pass
diff --git a/policyhandler/policy_handler.py b/policyhandler/__main__.py
index 50d59bc..04ca657 100644
--- a/policyhandler/policy_handler.py
+++ b/policyhandler/__main__.py
@@ -1,8 +1,5 @@
-"""run as server: python -m policyhandler/policy_handler"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,28 +16,22 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import sys
+"""
+ run as server:
+ python -m policyhandler
+
+ that will invoke this module __main__.py in folder of policyhandler
+"""
+
import logging
+import sys
+from policyhandler import LogWriter
from policyhandler.config import Config
from policyhandler.onap.audit import Audit
+from policyhandler.policy_receiver import PolicyReceiver
from policyhandler.web_server import PolicyWeb
-from policyhandler.policy_engine import PolicyEngineClient
-
-class LogWriter(object):
- """redirect the standard out + err to the logger"""
- def __init__(self, logger_func):
- self.logger_func = logger_func
- def write(self, log_line):
- """actual writer to be used in place of stdout or stderr"""
- log_line = log_line.rstrip()
- if log_line:
- self.logger_func(log_line)
-
- def flush(self):
- """no real flushing of the buffer"""
- pass
def run_policy_handler():
"""main run function for policy-handler"""
@@ -51,14 +42,16 @@ def run_policy_handler():
sys.stdout = LogWriter(logger.info)
sys.stderr = LogWriter(logger.error)
- logger.info("========== run_policy_handler ==========")
+ logger.info("========== run_policy_handler ========== %s", __package__)
Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
logger.info("starting policy_handler with config:")
- logger.info(Audit.log_json_dumps(Config.config))
+ logger.info(Audit.log_json_dumps(Config.settings))
+
+ audit = Audit(req_message="start policy handler")
+ PolicyReceiver.run(audit)
+ PolicyWeb.run_forever(audit)
- PolicyEngineClient.run()
- PolicyWeb.run()
if __name__ == "__main__":
run_policy_handler()
diff --git a/policyhandler/config.py b/policyhandler/config.py
index 7033096..8e6edf9 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -1,8 +1,5 @@
-"""read and use the config"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,13 +16,13 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import os
-import json
+"""read and use the config"""
+
import copy
-import re
-import base64
+import json
import logging
import logging.config
+import os
from .discovery import DiscoveryClient
@@ -45,7 +42,7 @@ class Config(object):
FIELD_POLICY_ENGINE = "policy_engine"
wservice_port = 25577
_logger = logging.getLogger("policy_handler.config")
- config = None
+ settings = None
@staticmethod
def merge(new_config):
@@ -53,23 +50,19 @@ class Config(object):
if not new_config:
return
- if not Config.config:
- Config.config = new_config
+ if not Config.settings:
+ Config.settings = new_config
return
new_config = copy.deepcopy(new_config)
- Config.config.update(new_config)
+ Config.settings.update(new_config)
@staticmethod
def get_system_name():
"""find the name of the policy-handler system
to be used as the key in consul-kv for config of policy-handler
"""
- system_name = None
- if Config.config:
- system_name = Config.config.get(Config.FIELD_SYSTEM)
-
- return system_name or Config.SERVICE_NAME_POLICY_HANDLER
+ return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER)
@staticmethod
def discover():
@@ -78,27 +71,14 @@ class Config(object):
new_config = DiscoveryClient.get_value(discovery_key)
if not new_config or not isinstance(new_config, dict):
- Config._logger.warn("unexpected config from discovery: %s", new_config)
+ Config._logger.warning("unexpected config from discovery: %s", new_config)
return
Config._logger.debug("loaded config from discovery(%s): %s", \
discovery_key, json.dumps(new_config))
- Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.config))
+ Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings))
Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- Config._logger.debug("merged config from discovery: %s", json.dumps(Config.config))
-
- @staticmethod
- def upload_to_discovery():
- """upload the current config settings to the discovery service"""
- if not Config.config or not isinstance(Config.config, dict):
- Config._logger.error("unexpected config: %s", Config.config)
- return
-
- discovery_key = Config.get_system_name()
- latest_config = json.dumps({Config.SERVICE_NAME_POLICY_HANDLER:Config.config})
- DiscoveryClient.put_kv(discovery_key, latest_config)
- Config._logger.debug("uploaded config to discovery(%s): %s", \
- discovery_key, latest_config)
+ Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings))
@staticmethod
def load_from_file(file_path=None):
@@ -112,7 +92,7 @@ class Config(object):
loaded_config = json.load(config_json)
if not loaded_config:
- Config._logger.info("config not loaded from file: %s", file_path)
+ Config._logger.warning("config not loaded from file: %s", file_path)
return
Config._logger.info("config loaded from file: %s", file_path)
@@ -122,44 +102,5 @@ class Config(object):
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
+ Config._logger.info("config loaded from file: %s", json.dumps(Config.settings))
return True
-
-class PolicyEngineConfig(object):
- """main config of the application"""
- # PATH_TO_PROPERTIES = r'logs/policy_engine.properties'
- PATH_TO_PROPERTIES = r'tmp/policy_engine.properties'
- PYPDP_URL = "PYPDP_URL = {0}{1}, {2}, {3}\n"
- CLIENT_ID = "CLIENT_ID = {0}\n"
- CLIENT_KEY = "CLIENT_KEY = {0}\n"
- ENVIRONMENT = "ENVIRONMENT = {0}\n"
- _logger = logging.getLogger("policy_handler.pe_config")
-
- @staticmethod
- def save_to_file():
- """create the policy_engine.properties for policy-engine client"""
- file_path = PolicyEngineConfig.PATH_TO_PROPERTIES
-
- try:
- config = Config.config[Config.FIELD_POLICY_ENGINE]
- headers = config["headers"]
- remove_basic = re.compile(r"(^Basic )")
- client_auth = headers["ClientAuth"]
- basic_client_auth = bool(remove_basic.match(client_auth))
- client_parts = base64.b64decode(remove_basic.sub("", client_auth)).split(":")
- auth_parts = base64.b64decode(remove_basic.sub("", headers["Authorization"])).split(":")
-
- props = PolicyEngineConfig.PYPDP_URL.format(config["url"], config["path_pdp"],
- auth_parts[0], auth_parts[1])
- props += PolicyEngineConfig.CLIENT_ID.format(client_parts[0])
- props += PolicyEngineConfig.CLIENT_KEY.format(base64.b64encode(client_parts[1]))
- props += PolicyEngineConfig.ENVIRONMENT.format(headers["Environment"])
-
- with open(file_path, 'w') as prp_file:
- prp_file.write(props)
- PolicyEngineConfig._logger.info("created %s basic_client_auth %s",
- file_path, basic_client_auth)
- return basic_client_auth
- except IOError:
- PolicyEngineConfig._logger.error("failed to save to %s", file_path)
- except KeyError:
- PolicyEngineConfig._logger.error("unexpected config for %s", Config.FIELD_POLICY_ENGINE)
diff --git a/policyhandler/customize/__init__.py b/policyhandler/customize/__init__.py
new file mode 100644
index 0000000..7449528
--- /dev/null
+++ b/policyhandler/customize/__init__.py
@@ -0,0 +1,32 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""polymorphic customizer changes the behavior of predefined methods in policy-handler"""
+
+from .customizer import Customizer
+
+class CustomizerUser(object):
+ """unprotected singleton around Customizer"""
+ _customizer = None
+
+ @staticmethod
+ def get_customizer():
+ """get instance of customizer"""
+ if not CustomizerUser._customizer:
+ CustomizerUser._customizer = Customizer()
+ return CustomizerUser._customizer
diff --git a/policyhandler/customize/customizer.py b/policyhandler/customize/customizer.py
new file mode 100644
index 0000000..9ab5967
--- /dev/null
+++ b/policyhandler/customize/customizer.py
@@ -0,0 +1,34 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""contains the Customizer class with method overrides per company specification"""
+
+from .customizer_base import CustomizerBase
+
+class Customizer(CustomizerBase):
+ """
+ the Customizer class inherits CustomizerBase that is owned by ONAP
+
+ :Customizer: class is owned by the company that needs to customize the policy-handler
+
+ :override: any method defined in the CustomizerBase class to customize
+ the behavior of the policy-handler
+
+ see README.md for the sample of the customizer.py
+ """
+ pass
diff --git a/policyhandler/customize/customizer_base.py b/policyhandler/customize/customizer_base.py
new file mode 100644
index 0000000..c98a9eb
--- /dev/null
+++ b/policyhandler/customize/customizer_base.py
@@ -0,0 +1,63 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""
+contains the base class :CustomizerBase:
+that defines the signatures and default behavior of the methods called by the policy-handler
+
+the methods are expected to be overriden by the child class Cutomizer that is company specific
+
+:do NOT change: this class and/or this file - it is owned by ONAP
+"""
+
+import logging
+
+class CustomizerBase(object):
+ """
+ base class for Customizer class
+
+ do NOT change this class and/or this file - it is owned by ONAP
+
+ policy-hanlder is using the instance of the child Customizer class to get the overriden methods
+
+ the methods defined in this class are the placeholders and are expected
+ to be overriden by the Customizer class
+ """
+
+ def __init__(self):
+ """base class for customization contains the default methods"""
+ self._logger = logging.getLogger("policy_handler.customizer")
+ self._logger.info("created customizer")
+
+ def get_service_url(self, audit, service_name, service):
+ """returns the service url when called from DiscoveryClient"""
+ service_url = "http://{0}:{1}".format(
+ service.get("ServiceAddress", ""), service.get("ServicePort", ""))
+
+ info = "no customization for service_url: {0} on {1}".format(service_url, service_name)
+ self._logger.info(info)
+ audit.info(info)
+ return service_url
+
+ def get_deploy_handler_kwargs(self, audit):
+ """returns the optional dict-kwargs for requests.post to deploy_handler"""
+ info = "no optional kwargs for requests.post to deploy_handler"
+ self._logger.info(info)
+ audit.info(info)
+ kwargs = {}
+ return kwargs
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 1d50fc3..ea703f4 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -1,8 +1,5 @@
-""" send notification to deploy-handler"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,13 +16,17 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
+""" send notification to deploy-handler"""
+
import json
+import logging
+
import requests
from .config import Config
+from .customize import CustomizerUser
from .discovery import DiscoveryClient
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode
+from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
POOL_SIZE = 1
@@ -37,69 +38,136 @@ class DeployHandler(object):
_requests_session = None
_config = None
_url = None
- _url_path = None
+ _url_policy = None
_target_entity = None
+ _custom_kwargs = None
+ _server_instance_uuid = None
@staticmethod
- def _lazy_init():
+ def _lazy_init(audit, rediscover=False):
""" set static properties """
- if DeployHandler._lazy_inited:
+ if DeployHandler._lazy_inited and not rediscover:
return
- DeployHandler._lazy_inited = True
-
- DeployHandler._requests_session = requests.Session()
- DeployHandler._requests_session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
- )
- DeployHandler._requests_session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
- )
-
- DeployHandler._target_entity = Config.config["deploy_handler"]
- DeployHandler._url = DiscoveryClient.get_service_url(DeployHandler._target_entity)
- DeployHandler._url_path = DeployHandler._url + '/policy'
- DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url)
+
+ DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
+ .get_deploy_handler_kwargs(audit))
+ if (not DeployHandler._custom_kwargs
+ or not isinstance(DeployHandler._custom_kwargs, dict)):
+ DeployHandler._custom_kwargs = {}
+
+ if not DeployHandler._requests_session:
+ DeployHandler._requests_session = requests.Session()
+ DeployHandler._requests_session.mount(
+ 'https://',
+ requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ )
+ DeployHandler._requests_session.mount(
+ 'http://',
+ requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ )
+
+ config_dh = Config.settings.get("deploy_handler")
+ if config_dh and isinstance(config_dh, dict):
+ # dns based routing to deployment-handler
+ # config for policy-handler >= 2.4.0
+ # "deploy_handler" : {
+ # "target_entity" : "deployment_handler",
+ # "url" : "http://deployment_handler:8188"
+ # }
+ DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
+ DeployHandler._url = config_dh.get("url")
+ DeployHandler._logger.info("dns based routing to %s: url(%s)",
+ DeployHandler._target_entity, DeployHandler._url)
+
+ if not DeployHandler._url:
+ # discover routing to deployment-handler at consul-services
+ if not isinstance(config_dh, dict):
+ # config for policy-handler <= 2.3.1
+ # "deploy_handler" : "deployment_handler"
+ DeployHandler._target_entity = str(config_dh or "deployment_handler")
+ DeployHandler._url = DiscoveryClient.get_service_url(audit,
+ DeployHandler._target_entity)
+
+ DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
+ DeployHandler._logger.info(
+ "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
+
+ DeployHandler._lazy_inited = bool(DeployHandler._url)
+
@staticmethod
- def policy_update(audit, latest_policies):
- """ post policy_updated message to deploy-handler """
- DeployHandler._lazy_init()
- msg = {"latest_policies":latest_policies}
- sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity,
- targetServiceName=DeployHandler._url_path)
- headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
-
- msg_str = json.dumps(msg)
+ def policy_update(audit, message, rediscover=False):
+ """
+ post policy_updated message to deploy-handler
+
+ returns condition whether it needs to catch_up
+ """
+ if not message:
+ return
+
+ DeployHandler._lazy_init(audit, rediscover)
+ metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
+ targetServiceName=DeployHandler._url_policy)
+ headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+
+ msg_str = json.dumps(message)
headers_str = json.dumps(headers)
- log_line = "post to deployment-handler {0} msg={1} headers={2}".format(
- DeployHandler._url_path, msg_str, headers_str)
- sub_aud.metrics_start(log_line)
+ log_action = "post to {0} at {1}".format(
+ DeployHandler._target_entity, DeployHandler._url_policy)
+ log_data = " msg={0} headers={1}".format(msg_str, headers_str)
+ log_line = log_action + log_data
DeployHandler._logger.info(log_line)
+ metrics.metrics_start(log_line)
+
+ if not DeployHandler._url:
+ error_msg = "no url found to {0}".format(log_line)
+ DeployHandler._logger.error(error_msg)
+ metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ metrics.metrics(error_msg)
+ return
res = None
try:
res = DeployHandler._requests_session.post(
- DeployHandler._url_path, json=msg, headers=headers
+ DeployHandler._url_policy, json=message, headers=headers,
+ **DeployHandler._custom_kwargs
)
- except requests.exceptions.RequestException as ex:
- error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \
- .format(DeployHandler._url_path, str(ex), msg_str, headers_str)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed to {0} {1}: {2}{3}"
+ .format(log_action, type(ex).__name__, str(ex), log_data))
DeployHandler._logger.exception(error_msg)
- sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
- audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
- sub_aud.metrics(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
return
- sub_aud.set_http_status_code(res.status_code)
+ metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- sub_aud.metrics(
- "response from deployment-handler to post {0}: {1} msg={2} text={3} headers={4}" \
- .format(DeployHandler._url_path, res.status_code, msg_str, res.text,
- res.request.headers))
+ log_line = "response {0} from {1}: text={2}{3}" \
+ .format(res.status_code, log_action, res.text, log_data)
+ metrics.metrics(log_line)
- if res.status_code == requests.codes.ok:
- return res.json()
+ if res.status_code != requests.codes.ok:
+ DeployHandler._logger.error(log_line)
+ return
+
+ DeployHandler._logger.info(log_line)
+ result = res.json() or {}
+ prev_server_instance_uuid = DeployHandler._server_instance_uuid
+ DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
+
+ deployment_handler_changed = (prev_server_instance_uuid
+ and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
+ if deployment_handler_changed:
+ log_line = ("deployment_handler_changed: {1} != {0}"
+ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
+ metrics.info(log_line)
+ DeployHandler._logger.info(log_line)
+
+ return deployment_handler_changed
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
index 7e16b90..ce24c3d 100644
--- a/policyhandler/discovery.py
+++ b/policyhandler/discovery.py
@@ -1,8 +1,5 @@
-"""client to talk to consul at the standard port 8500"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,11 +16,16 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
-import json
+"""client to talk to consul at the standard port 8500"""
+
import base64
+import json
+import logging
+
import requests
+from .customize import CustomizerUser
+
class DiscoveryClient(object):
"""talking to consul at http://consul:8500
@@ -40,34 +42,54 @@ class DiscoveryClient(object):
"""
CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}"
CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}"
- SERVICE_MASK = "http://{0}:{1}"
_logger = logging.getLogger("policy_handler.discovery")
-
@staticmethod
- def get_service_url(service_name):
+ def get_service_url(audit, service_name):
"""find the service record in consul"""
service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name)
- DiscoveryClient._logger.info("discover %s", service_path)
+ log_line = "discover {0}".format(service_path)
+ DiscoveryClient._logger.info(log_line)
+ audit.info(log_line)
response = requests.get(service_path)
+
+ log_line = "response {0} for {1}: {2}".format(
+ response.status_code, service_path, response.text)
+ DiscoveryClient._logger.info(log_line)
+ audit.info(log_line)
+
response.raise_for_status()
- service = response.json()[0]
- return DiscoveryClient.SERVICE_MASK.format( \
- service["ServiceAddress"], service["ServicePort"])
+
+ service = response.json()
+ if not service:
+ log_line = "failed discover {0}".format(service_path)
+ DiscoveryClient._logger.error(log_line)
+ audit.error(log_line)
+ return
+ service = service[0]
+
+ service_url = CustomizerUser.get_customizer().get_service_url(audit, service_name, service)
+ if not service_url:
+ log_line = "failed to get service_url for {0}".format(service_name)
+ DiscoveryClient._logger.error(log_line)
+ audit.error(log_line)
+ return
+
+ log_line = "got service_url: {0} for {1}".format(service_url, service_name)
+ DiscoveryClient._logger.info(log_line)
+ audit.info(log_line)
+ return service_url
@staticmethod
def get_value(key):
"""get the value for the key from consul-kv"""
response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key))
response.raise_for_status()
- data = response.json()[0]
- value = base64.b64decode(data["Value"]).decode("utf-8")
- DiscoveryClient._logger.info("consul-kv key=%s data=%s value(%s)", \
- key, json.dumps(data), value)
+ data = response.json()
+ if not data:
+ DiscoveryClient._logger.error("failed get_value %s", key)
+ return
+ value = base64.b64decode(data[0]["Value"]).decode("utf-8")
+ DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s",
+ key, value, json.dumps(data))
return json.loads(value)
-
- @staticmethod
- def put_kv(key, value):
- """put the value under the key in consul-kv"""
- response = requests.put(DiscoveryClient.CONSUL_KV_MASK.format(key), data=value)
- response.raise_for_status()
diff --git a/policyhandler/onap/CommonLogger.py b/policyhandler/onap/CommonLogger.py
index a70ca28..957b3aa 100644
--- a/policyhandler/onap/CommonLogger.py
+++ b/policyhandler/onap/CommonLogger.py
@@ -1,19 +1,8 @@
#!/usr/bin/python
# -*- indent-tabs-mode: nil -*- vi: set expandtab:
-"""ECOMP Common Logging library in Python.
-
-CommonLogger.py
-
-Original Written by: Terry Schmalzried
-Date written: October 1, 2015
-Last updated: December 1, 2016
-
-version 0.8
-"""
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,6 +19,17 @@ version 0.8
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+"""ECOMP Common Logging library in Python.
+
+CommonLogger.py
+
+Original Written by: Terry Schmalzried
+Date written: October 1, 2015
+Last updated: December 1, 2016
+
+version 0.8
+"""
+
from __future__ import print_function
import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections
@@ -203,7 +203,7 @@ class CommonLogger:
self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
backupCount=self._backupCount, encoding=None, delay=False)
-
+
else:
self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
mode=self._sizeRotateMode, \
@@ -860,7 +860,7 @@ if __name__ == "__main__":
parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
args = parser.parse_args()
-
+
spid = str(os.getpid())
if args.keeplogs:
spid = ""
@@ -878,7 +878,7 @@ if __name__ == "__main__":
os.remove(f)
except:
pass
- if not args.keeplogs:
+ if not args.keeplogs:
atexit.register(cleanupTmps)
with open(logcfg, "w") as o:
diff --git a/policyhandler/onap/__init__.py b/policyhandler/onap/__init__.py
index a3220c4..e9d0246 100644
--- a/policyhandler/onap/__init__.py
+++ b/policyhandler/onap/__init__.py
@@ -1,6 +1,5 @@
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index a1df861..a007a26 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -1,15 +1,5 @@
-"""generic class to keep track of request handling
- from receiving it through reponse and log all the activities
-
- call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests
-
- start each outside request with creation of the Audit object
- audit = Audit(request_id=None, headers=None, msg=None)
-"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,15 +16,30 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import os
+"""generic class to keep track of request handling
+ from receiving it through reponse and log all the activities
+
+ call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests
+
+ start each outside request with creation of the Audit object
+ audit = Audit(request_id=None, headers=None, msg=None)
+"""
+
+import copy
import json
-import uuid
+import os
+import re
+import subprocess
+import sys
+import threading
import time
-import copy
-from threading import Lock
+import uuid
+from datetime import datetime
from enum import Enum
from .CommonLogger import CommonLogger
+from .health import Health
+from .process_info import ProcessInfo
REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID"
REQUEST_REMOTE_ADDR = "Remote-Addr"
@@ -44,21 +49,31 @@ HOSTNAME = "HOSTNAME"
AUDIT_REQUESTID = 'requestID'
AUDIT_IPADDRESS = 'IPAddress'
AUDIT_SERVER = 'server'
+AUDIT_TARGET_ENTITY = 'targetEntity'
+AUDIT_METRICS = 'metrics'
+AUDIT_TOTAL_STATS = 'audit_total_stats'
+METRICS_TOTAL_STATS = 'metrics_total_stats'
HEADER_CLIENTAUTH = "clientauth"
HEADER_AUTHORIZATION = "authorization"
+ERROR_CODE = "errorCode"
+ERROR_DESCRIPTION = "errorDescription"
+
+
class AuditHttpCode(Enum):
"""audit http codes"""
HTTP_OK = 200
- DATA_NOT_FOUND_ERROR = 400
PERMISSION_UNAUTHORIZED_ERROR = 401
PERMISSION_FORBIDDEN_ERROR = 403
+ RESPONSE_ERROR = 400
+ DATA_NOT_FOUND_ERROR = 404
SERVER_INTERNAL_ERROR = 500
SERVICE_UNAVAILABLE_ERROR = 503
DATA_ERROR = 1030
SCHEMA_ERROR = 1040
+
class AuditResponseCode(Enum):
"""audit response codes"""
SUCCESS = 0
@@ -72,23 +87,25 @@ class AuditResponseCode(Enum):
@staticmethod
def get_response_code(http_status_code):
"""calculates the response_code from max_http_status_code"""
+ response_code = AuditResponseCode.UNKNOWN_ERROR
if http_status_code <= AuditHttpCode.HTTP_OK.value:
- return AuditResponseCode.SUCCESS
-
- if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \
- AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
- return AuditResponseCode.PERMISSION_ERROR
- if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
- return AuditResponseCode.AVAILABILITY_ERROR
- if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- return AuditResponseCode.BUSINESS_PROCESS_ERROR
- if http_status_code in [AuditHttpCode.DATA_ERROR.value, \
- AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
- return AuditResponseCode.DATA_ERROR
- if http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
- return AuditResponseCode.SCHEMA_ERROR
-
- return AuditResponseCode.UNKNOWN_ERROR
+ response_code = AuditResponseCode.SUCCESS
+
+ elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value,
+ AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
+ response_code = AuditResponseCode.PERMISSION_ERROR
+ elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
+ response_code = AuditResponseCode.AVAILABILITY_ERROR
+ elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR
+ elif http_status_code in [AuditHttpCode.DATA_ERROR.value,
+ AuditHttpCode.RESPONSE_ERROR.value,
+ AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
+ response_code = AuditResponseCode.DATA_ERROR
+ elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
+ response_code = AuditResponseCode.SCHEMA_ERROR
+
+ return response_code
@staticmethod
def get_human_text(response_code):
@@ -97,7 +114,8 @@ class AuditResponseCode(Enum):
return "unknown"
return response_code.name.lower().replace("_", " ")
-class Audit(object):
+
+class _Audit(object):
"""put the audit object on stack per each initiating request in the system
:request_id: is the X-ECOMP-RequestID for tracing
@@ -109,75 +127,90 @@ class Audit(object):
:kwargs: - put any request related params into kwargs
"""
_service_name = ""
- _service_instance_UUID = str(uuid.uuid4())
+ _service_version = ""
+ _service_instance_uuid = str(uuid.uuid4())
+ _started = datetime.utcnow()
+ _key_format = re.compile(r"\W")
_logger_debug = None
_logger_error = None
_logger_metrics = None
_logger_audit = None
+ _health = Health()
+ _py_ver = sys.version.replace("\n", "")
+ _packages = []
@staticmethod
def init(service_name, config_file_path):
"""init static invariants and loggers"""
- Audit._service_name = service_name
- Audit._logger_debug = CommonLogger(config_file_path, "debug", \
- instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
- Audit._logger_error = CommonLogger(config_file_path, "error", \
- instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
- Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
- instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
- Audit._logger_audit = CommonLogger(config_file_path, "audit", \
- instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
-
- def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs):
+ _Audit._service_name = service_name
+ _Audit._logger_debug = CommonLogger(config_file_path, "debug", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_error = CommonLogger(config_file_path, "error", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_audit = CommonLogger(config_file_path, "audit", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ ProcessInfo.init()
+ try:
+ _Audit._service_version = subprocess.check_output(
+ ["python", "setup.py", "--version"], universal_newlines=True).strip()
+ except subprocess.CalledProcessError:
+ pass
+ try:
+ _Audit._packages = list(
+ filter(None, subprocess.check_output(["pip", "freeze"],
+ universal_newlines=True).splitlines()))
+ except subprocess.CalledProcessError:
+ pass
+
+
+ def health(self, full=False):
+ """returns json for health check"""
+ utcnow = datetime.utcnow()
+ health = {
+ "server" : {
+ "service_name" : _Audit._service_name,
+ "service_version" : _Audit._service_version,
+ "service_instance_uuid" : _Audit._service_instance_uuid
+ },
+ "runtime" : {
+ "started" : str(_Audit._started),
+ "utcnow" : str(utcnow),
+ "uptime" : str(utcnow - _Audit._started),
+ "active_threads" : ProcessInfo.active_threads(),
+ "gc" : ProcessInfo.gc_info(full),
+ "virtual_memory" : ProcessInfo.virtual_memory(),
+ "process_memory" : ProcessInfo.process_memory()
+ },
+ "stats" : _Audit._health.dump(),
+ "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages}
+ }
+ self.info("{} health: {}".format(_Audit._service_name, json.dumps(health)))
+ return health
+
+ def process_info(self):
+ """get the debug info on all the threads and memory"""
+ process_info = ProcessInfo.get_all()
+ self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info)))
+ return process_info
+
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
"""create audit object per each request in the system
+ :job_name: is the name of the audit job for health stats
:request_id: is the X-ECOMP-RequestID for tracing
:req_message: is the request message string for logging
- :aud_parent: is the parent Audit - used for sub-query metrics to other systems
:kwargs: - put any request related params into kwargs
"""
+ self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
self.request_id = request_id
self.req_message = req_message or ""
- self.aud_parent = aud_parent
self.kwargs = kwargs or {}
- self.retry_get_config = False
self.max_http_status_code = 0
- self._lock = Lock()
-
- if self.aud_parent:
- if not self.request_id:
- self.request_id = self.aud_parent.request_id
- if not self.req_message:
- self.req_message = self.aud_parent.req_message
- self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
- else:
- headers = self.kwargs.get("headers", {})
- if headers:
- if not self.request_id:
- self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
- if AUDIT_IPADDRESS not in self.kwargs:
- self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
-
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
-
- created_req = ""
- if not self.request_id:
- created_req = " with new"
- self.request_id = str(uuid.uuid4())
-
- self.kwargs[AUDIT_REQUESTID] = self.request_id
-
- self._started = time.time()
- self._start_event = Audit._logger_audit.getStartRecordEvent()
- self.metrics_start()
+ self._lock = threading.Lock()
- if not self.aud_parent:
- self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
- .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
def merge_all_kwargs(self, **kwargs):
"""returns the merge of copy of self.kwargs with the param kwargs"""
@@ -188,10 +221,15 @@ class Audit(object):
def set_http_status_code(self, http_status_code):
"""accumulate the highest(worst) http status code"""
- self._lock.acquire()
- if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- self.max_http_status_code = max(http_status_code, self.max_http_status_code)
- self._lock.release()
+ with self._lock:
+ if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ self.max_http_status_code = max(http_status_code, self.max_http_status_code)
+
+ def get_max_http_status_code(self):
+ """returns the highest(worst) http status code"""
+ with self._lock:
+ max_http_status_code = self.max_http_status_code
+ return max_http_status_code
@staticmethod
def get_status_code(success):
@@ -200,6 +238,71 @@ class Audit(object):
return 'COMPLETE'
return 'ERROR'
+ def is_serious_error(self, status_code):
+ """returns whether the response_code is success and a human text for response code"""
+ return (AuditResponseCode.PERMISSION_ERROR.value
+ == AuditResponseCode.get_response_code(status_code).value
+ or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ def _get_response_status(self):
+ """calculates the response status fields from max_http_status_code"""
+ max_http_status_code = self.get_max_http_status_code()
+ response_code = AuditResponseCode.get_response_code(max_http_status_code)
+ success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ response_description = AuditResponseCode.get_human_text(response_code)
+ return success, max_http_status_code, response_code, response_description
+
+ def is_success(self):
+ """returns whether the response_code is success and a human text for response code"""
+ success, _, _, _ = self._get_response_status()
+ return success
+
+ def debug(self, log_line, **kwargs):
+ """debug - the debug=lowest level of logging"""
+ _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def info(self, log_line, **kwargs):
+ """debug - the info level of logging"""
+ _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def info_requested(self, result=None, **kwargs):
+ """info "requested ..." - the info level of logging"""
+ self.info("requested {0} {1}".format(self.req_message, result or ""), \
+ **self.merge_all_kwargs(**kwargs))
+
+ def warn(self, log_line, error_code=None, **kwargs):
+ """debug+error - the warn level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.warn(log_line, **all_kwargs)
+ _Audit._logger_error.warn(log_line, **all_kwargs)
+
+ def error(self, log_line, error_code=None, **kwargs):
+ """debug+error - the error level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.error(log_line, **all_kwargs)
+ _Audit._logger_error.error(log_line, **all_kwargs)
+
+ def fatal(self, log_line, error_code=None, **kwargs):
+ """debug+error - the fatal level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.fatal(log_line, **all_kwargs)
+ _Audit._logger_error.fatal(log_line, **all_kwargs)
+
@staticmethod
def hide_secrets(obj):
"""hides the known secret field values of the dictionary"""
@@ -210,7 +313,7 @@ class Audit(object):
if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
obj[key] = "*"
elif isinstance(obj[key], dict):
- obj[key] = Audit.hide_secrets(obj[key])
+ obj[key] = _Audit.hide_secrets(obj[key])
return obj
@@ -220,101 +323,151 @@ class Audit(object):
if not isinstance(obj, dict):
return json.dumps(obj, **kwargs)
- return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
+ return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
- def get_response_code(self):
- """calculates the response_code from max_http_status_code"""
- self._lock.acquire()
- max_http_status_code = self.max_http_status_code
- self._lock.release()
- return AuditResponseCode.get_response_code(max_http_status_code)
+ @staticmethod
+ def get_elapsed_time(started):
+ """returns the elapsed time since started in milliseconds"""
+ return int(round(1000 * (time.time() - (started or 0))))
- def debug(self, log_line, **kwargs):
- """debug - the debug=lowest level of logging"""
- Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
- def info(self, log_line, **kwargs):
- """debug - the info level of logging"""
- Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+class Audit(_Audit):
+ """Audit class to track the high level operations"""
- def info_requested(self, result=None, **kwargs):
- """info "requested ..." - the info level of logging"""
- self.info("requested {0} {1}".format(self.req_message, result or ""), \
- **self.merge_all_kwargs(**kwargs))
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
+ """create audit object per each request in the system
- def warn(self, log_line, **kwargs):
- """debug+error - the warn level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.warn(log_line, **all_kwargs)
- Audit._logger_error.warn(log_line, **all_kwargs)
+ :job_name: is the name of the audit job for health stats
+ :request_id: is the X-ECOMP-RequestID for tracing
+ :req_message: is the request message string for logging
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ super().__init__(job_name=job_name,
+ request_id=request_id,
+ req_message=req_message,
+ **kwargs)
- def error(self, log_line, **kwargs):
- """debug+error - the error level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.error(log_line, **all_kwargs)
- Audit._logger_error.error(log_line, **all_kwargs)
+ headers = self.kwargs.get("headers", {})
+ if headers:
+ if not self.request_id:
+ self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+ if AUDIT_IPADDRESS not in self.kwargs:
+ self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
- def fatal(self, log_line, **kwargs):
- """debug+error - the fatal level of logging"""
+ created_req = ""
+ if not self.request_id:
+ created_req = " with new"
+ self.request_id = str(uuid.uuid4())
+
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
+
+ self.kwargs[AUDIT_REQUESTID] = self.request_id
+
+ _Audit._health.start(self.job_name, self.request_id)
+ _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id)
+
+ self._started = time.time()
+ self._start_event = Audit._logger_audit.getStartRecordEvent()
+
+ self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
+ .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+
+
+ def audit_done(self, result=None, **kwargs):
+ """debug+audit - the audit=top level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.fatal(log_line, **all_kwargs)
- Audit._logger_error.fatal(log_line, **all_kwargs)
+ success, max_http_status_code, response_code, response_description = \
+ self._get_response_status()
+ log_line = "{0} {1}".format(self.req_message, result or "").strip()
+ audit_func = None
+ timer = _Audit.get_elapsed_time(self._started)
+ if success:
+ log_line = "done: {0}".format(log_line)
+ self.info(log_line, **all_kwargs)
+ audit_func = _Audit._logger_audit.info
+ _Audit._health.success(self.job_name, timer, self.request_id)
+ _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id)
+ else:
+ log_line = "failed: {0}".format(log_line)
+ self.error(log_line, errorCode=response_code.value,
+ errorDescription=response_description, **all_kwargs)
+ audit_func = _Audit._logger_audit.error
+ _Audit._health.error(self.job_name, timer, self.request_id)
+ _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id)
+
+ audit_func(log_line, begTime=self._start_event, timer=timer,
+ statusCode=_Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs)
+
+ return (success, max_http_status_code, response_description)
+
+
+class Metrics(_Audit):
+ """Metrics class to track the calls to outside systems"""
+
+ def __init__(self, aud_parent, **kwargs):
+ """create audit object per each request in the system
+
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ super().__init__(job_name=aud_parent.job_name,
+ request_id=aud_parent.request_id,
+ req_message=aud_parent.req_message,
+ **aud_parent.merge_all_kwargs(**kwargs))
+ self.aud_parent = aud_parent
+ self._metrics_name = _Audit._key_format.sub(
+ '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name))
+
+ self._metrics_started = None
+ self._metrics_start_event = None
- @staticmethod
- def get_elapsed_time(started):
- """returns the elapsed time since started in milliseconds"""
- return int(round(1000 * (time.time() - started)))
def metrics_start(self, log_line=None, **kwargs):
"""reset metrics timing"""
+ self.merge_all_kwargs(**kwargs)
self._metrics_started = time.time()
- self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
+ self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent()
if log_line:
self.info(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._health.start(self._metrics_name, self.request_id)
+ _Audit._health.start(METRICS_TOTAL_STATS, self.request_id)
+
def metrics(self, log_line, **kwargs):
"""debug+metrics - the metrics=sub-audit level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- response_code = self.get_response_code()
- success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ success, max_http_status_code, response_code, response_description = \
+ self._get_response_status()
metrics_func = None
+ timer = _Audit.get_elapsed_time(self._metrics_started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
- metrics_func = Audit._logger_metrics.info
+ metrics_func = _Audit._logger_metrics.info
+ _Audit._health.success(self._metrics_name, timer, self.request_id)
+ _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id)
else:
log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value, \
- errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs)
- metrics_func = Audit._logger_metrics.error
-
- metrics_func(log_line, begTime=self._metrics_start_event, \
- timer=Audit.get_elapsed_time(self._metrics_started), \
- statusCode=Audit.get_status_code(success), responseCode=response_code.value, \
- responseDescription=AuditResponseCode.get_human_text(response_code), \
+ self.error(log_line, errorCode=response_code.value,
+ errorDescription=response_description, **all_kwargs)
+ metrics_func = _Audit._logger_metrics.error
+ _Audit._health.error(self._metrics_name, timer, self.request_id)
+ _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id)
+
+ metrics_func(
+ log_line,
+ begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()),
+ timer=timer,
+ statusCode=_Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
**all_kwargs)
- self.metrics_start()
-
- def audit_done(self, result=None, **kwargs):
- """debug+audit - the audit=top level of logging"""
- all_kwargs = self.merge_all_kwargs(**kwargs)
- response_code = self.get_response_code()
- success = (response_code.value == AuditResponseCode.SUCCESS.value)
- log_line = "{0} {1}".format(self.req_message, result or "").strip()
- audit_func = None
- if success:
- log_line = "done: {0}".format(log_line)
- self.info(log_line, **all_kwargs)
- audit_func = Audit._logger_audit.info
- else:
- log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value, \
- errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs)
- audit_func = Audit._logger_audit.error
-
- audit_func(log_line, begTime=self._start_event, \
- timer=Audit.get_elapsed_time(self._started), \
- statusCode=Audit.get_status_code(success), responseCode=response_code.value, \
- responseDescription=AuditResponseCode.get_human_text(response_code), \
- **all_kwargs)
+ return (success, max_http_status_code, response_description)
diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py
new file mode 100644
index 0000000..d117255
--- /dev/null
+++ b/policyhandler/onap/health.py
@@ -0,0 +1,160 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""generic class to keep track of app health"""
+
+import uuid
+from threading import Lock
+from datetime import datetime
+
+class HealthStats(object):
+ """keep track of stats for calls"""
+ def __init__(self, name):
+ """keep track of stats for metrics calls"""
+ self._name = name or "stats_" + str(uuid.uuid4())
+ self._lock = Lock()
+
+ self._call_count = 0
+ self._error_count = 0
+ self._active_count = 0
+
+ self._longest_timer = 0
+ self._total_timer = 0
+
+ self._last_success = None
+ self._last_error = None
+ self._last_start = None
+ self._longest_end_ts = None
+
+ self._last_success_request_id = None
+ self._last_error_request_id = None
+ self._last_started_request_id = None
+ self._longest_request_id = None
+
+
+ def dump(self):
+ """returns dict of stats"""
+ dump = None
+ with self._lock:
+ dump = {
+ "total" : {
+ "call_count" : self._call_count,
+ "ave_timer_millisecs" : (float(self._total_timer)/self._call_count
+ if self._call_count else 0)
+ },
+ "success" : {
+ "success_count" : (self._call_count - self._error_count),
+ "last_success" : str(self._last_success),
+ "last_success_request_id" : self._last_success_request_id
+ },
+ "error" : {
+ "error_count" : self._error_count,
+ "last_error" : str(self._last_error),
+ "last_error_request_id" : self._last_error_request_id
+ },
+ "active" : {
+ "active_count" : self._active_count,
+ "last_start" : str(self._last_start),
+ "last_started_request_id" : self._last_started_request_id
+ },
+ "longest" : {
+ "longest_timer_millisecs" : self._longest_timer,
+ "longest_request_id" : self._longest_request_id,
+ "longest_end" : str(self._longest_end_ts)
+ }
+ }
+ return dump
+
+
+ def start(self, request_id=None):
+ """records the start of active execution"""
+ with self._lock:
+ self._active_count += 1
+ self._last_start = datetime.utcnow()
+ self._last_started_request_id = request_id
+
+
+ def success(self, timer, request_id=None):
+ """records the successful execution"""
+ with self._lock:
+ self._active_count -= 1
+ self._call_count += 1
+ self._last_success = datetime.utcnow()
+ self._last_success_request_id = request_id
+ self._total_timer += timer
+ if not self._longest_timer or self._longest_timer < timer:
+ self._longest_timer = timer
+ self._longest_request_id = request_id
+ self._longest_end_ts = self._last_success
+
+
+ def error(self, timer, request_id=None):
+ """records the errored execution"""
+ with self._lock:
+ self._active_count -= 1
+ self._call_count += 1
+ self._error_count += 1
+ self._last_error = datetime.utcnow()
+ self._last_error_request_id = request_id
+ self._total_timer += timer
+ if not self._longest_timer or self._longest_timer < timer:
+ self._longest_timer = timer
+ self._longest_request_id = request_id
+ self._longest_end_ts = self._last_error
+
+
+class Health(object):
+ """Health stats for multiple requests"""
+ def __init__(self):
+ """Health stats for application"""
+ self._all_stats = {}
+ self._lock = Lock()
+
+
+ def _add_or_get_stats(self, stats_name):
+ """add to or get from the ever growing dict of HealthStats"""
+ with self._lock:
+ stats = self._all_stats.get(stats_name)
+ if not stats:
+ self._all_stats[stats_name] = stats = HealthStats(stats_name)
+ return stats
+
+
+ def start(self, stats_name, request_id=None):
+ """records the start of execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.start(request_id)
+
+
+ def success(self, stats_name, timer, request_id=None):
+ """records the successful execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.success(timer, request_id)
+
+
+ def error(self, stats_name, timer, request_id=None):
+ """records the error execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.error(timer, request_id)
+
+
+ def dump(self):
+ """returns dict of stats"""
+ with self._lock:
+ stats = dict((k, v.dump()) for (k, v) in self._all_stats.items())
+ return stats
diff --git a/policyhandler/onap/process_info.py b/policyhandler/onap/process_info.py
new file mode 100644
index 0000000..a22ccf9
--- /dev/null
+++ b/policyhandler/onap/process_info.py
@@ -0,0 +1,152 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""generic class to keep get real time info about the current process"""
+
+import gc
+import sys
+import threading
+import traceback
+from functools import wraps
+
+import psutil
+
+
+def safe_operation(func):
+ """safequard the function against any exception"""
+ if not func:
+ return
+
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ """wrapper around the function"""
+ try:
+ return func(*args, **kwargs)
+ except Exception as ex:
+ return {type(ex).__name__ : str(ex)}
+ return wrapper
+
+
+class ProcessInfo(object):
+ """static class to calculate process info"""
+ _BIBYTES_SYMBOLS = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
+ _BIBYTES_VALS = {}
+ _inited = False
+ _lock = threading.Lock()
+
+ @staticmethod
+ def init():
+ """init static constants"""
+ if ProcessInfo._inited:
+ return
+ with ProcessInfo._lock:
+ if ProcessInfo._inited:
+ return
+
+ for i, bibytes_symbol in enumerate(ProcessInfo._BIBYTES_SYMBOLS):
+ ProcessInfo._BIBYTES_VALS[bibytes_symbol] = 1 << (i + 1) * 10
+ ProcessInfo._BIBYTES_SYMBOLS = list(reversed(ProcessInfo._BIBYTES_SYMBOLS))
+ ProcessInfo._inited = True
+
+ @staticmethod
+ def bytes_to_bibytes(byte_count):
+ """converts byte count to human value in kibi-mebi-gibi-...-bytes"""
+ if byte_count is None:
+ return "unknown"
+ if not byte_count or not isinstance(byte_count, int):
+ return byte_count
+ ProcessInfo.init()
+
+ for bibytes_symbol in ProcessInfo._BIBYTES_SYMBOLS:
+ bibytes_value = ProcessInfo._BIBYTES_VALS[bibytes_symbol]
+ if byte_count >= bibytes_value:
+ value = float(byte_count) / bibytes_value
+ return '%.2f %s' % (value, bibytes_symbol)
+ return "%s B" % byte_count
+
+ @staticmethod
+ @safe_operation
+ def process_memory():
+ """calculates the memory usage of the current process"""
+ process = psutil.Process()
+ with process.oneshot():
+ return dict((k, ProcessInfo.bytes_to_bibytes(v))
+ for k, v in process.memory_full_info()._asdict().items())
+
+
+ @staticmethod
+ @safe_operation
+ def virtual_memory():
+ """calculates the virtual memory usage of the whole vm"""
+ return dict((k, ProcessInfo.bytes_to_bibytes(v))
+ for k, v in psutil.virtual_memory()._asdict().items())
+
+
+ @staticmethod
+ @safe_operation
+ def active_threads():
+ """list of active threads"""
+ return sorted([thr.name + "(" + str(thr.ident) + ")" for thr in threading.enumerate()])
+
+
+ @staticmethod
+ @safe_operation
+ def thread_stacks():
+ """returns the current threads with their stack"""
+ thread_names = dict((thr.ident, thr.name) for thr in threading.enumerate())
+ return [
+ {
+ "thread_id" : thread_id,
+ "thread_name" : thread_names.get(thread_id),
+ "thread_stack" : [
+ {
+ "filename" : filename,
+ "lineno" : lineno,
+ "function" : function_name,
+ "line" : line.strip() if line else None
+ }
+ for filename, lineno, function_name, line in traceback.extract_stack(stack)
+ ]
+ }
+ for thread_id, stack in sys._current_frames().items()
+ ]
+
+
+ @staticmethod
+ @safe_operation
+ def gc_info(full=False):
+ """gets info from garbage collector"""
+ gc_info = {
+ "gc_count" : str(gc.get_count()),
+ "gc_threshold" : str(gc.get_threshold())
+ }
+ if gc.garbage:
+ gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage]
+ if full else len(gc.garbage))
+ return gc_info
+
+ @staticmethod
+ def get_all():
+ """all info"""
+ return {
+ "active_threads" : ProcessInfo.active_threads(),
+ "gc" : ProcessInfo.gc_info(full=True),
+ "process_memory" : ProcessInfo.process_memory(),
+ "virtual_memory" : ProcessInfo.virtual_memory(),
+ "thread_stacks" : ProcessInfo.thread_stacks()
+ }
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 640b724..90ede47 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -1,8 +1,5 @@
-"""contants of policy-handler"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,10 +16,19 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+"""contants of policy-handler"""
+
POLICY_ID = 'policy_id'
POLICY_VERSION = "policyVersion"
POLICY_NAME = "policyName"
POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
-POLICY_GET_CONFIG = 'getConfig'
+CATCH_UP = "catch_up"
+AUTO_CATCH_UP = "auto catch_up"
+LATEST_POLICIES = "latest_policies"
+REMOVED_POLICIES = "removed_policies"
+ERRORED_POLICIES = "errored_policies"
+ERRORED_SCOPES = "errored_scopes"
+SCOPE_PREFIXES = "scope_prefixes"
+POLICY_FILTER = "policy_filter"
diff --git a/policyhandler/policy_engine.py b/policyhandler/policy_engine.py
deleted file mode 100644
index a0ff697..0000000
--- a/policyhandler/policy_engine.py
+++ /dev/null
@@ -1,103 +0,0 @@
-"""policy-engine-client communicates with policy-engine thru PolicyEngine client object"""
-
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import logging
-import re
-
-from .config import Config, PolicyEngineConfig
-from .onap.audit import Audit
-from .PolicyEngine import PolicyEngine, NotificationHandler, NotificationScheme
-from .policy_updater import PolicyUpdater
-
-class PolicyNotificationHandler(NotificationHandler):
- """handler of the policy-engine push notifications"""
- _logger = logging.getLogger("policy_handler.policy_notification")
-
- def __init__(self, policy_updater):
- scope_prefixes = [scope_prefix.replace(".", "[.]")
- for scope_prefix in Config.config["scope_prefixes"]]
- self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
- PolicyNotificationHandler._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
- self._policy_updater = policy_updater
- self._policy_updater.start()
-
- def notificationReceived(self, notification):
- if not notification or not notification._loadedPolicies:
- return
-
- policy_names = [loaded._policyName
- for loaded in notification._loadedPolicies
- if self._policy_scopes.match(loaded._policyName)]
-
- if not policy_names:
- PolicyNotificationHandler._logger.info("no policy updated for scopes %s",
- self._policy_scopes.pattern)
- return
-
- audit = Audit(req_message="notificationReceived from PDP")
- audit.retry_get_config = True
- self._policy_updater.enqueue(audit, policy_names)
-
-class PolicyEngineClient(object):
- """ policy-engine client"""
- _logger = logging.getLogger("policy_handler.policy_engine")
- _policy_updater = None
- _pdp_notification_handler = None
- _policy_engine = None
-
- @staticmethod
- def shutdown(audit):
- """Shutdown the notification-handler"""
- PolicyEngineClient._policy_updater.shutdown(audit)
-
- @staticmethod
- def catch_up(audit):
- """bring the latest policies from policy-engine"""
- PolicyEngineClient._policy_updater.catch_up(audit)
-
- @staticmethod
- def create_policy_engine_properties():
- """create the policy_engine.properties file from config.json"""
- pass
-
- @staticmethod
- def run():
- """Using policy-engine client to talk to policy engine"""
- audit = Audit(req_message="start PDP client")
- PolicyEngineClient._policy_updater = PolicyUpdater()
- PolicyEngineClient._pdp_notification_handler = PolicyNotificationHandler(
- PolicyEngineClient._policy_updater)
-
- sub_aud = Audit(aud_parent=audit)
- sub_aud.metrics_start("create client to PDP")
- basic_client_auth = PolicyEngineConfig.save_to_file()
- PolicyEngineClient._policy_engine = PolicyEngine(
- PolicyEngineConfig.PATH_TO_PROPERTIES,
- scheme=NotificationScheme.AUTO_ALL_NOTIFICATIONS.name,
- handler=PolicyEngineClient._pdp_notification_handler,
- basic_client_auth=basic_client_auth
- )
- sub_aud.metrics("created client to PDP")
- seed_scope = ".*"
- PolicyEngineClient._policy_engine.getConfig(policyName=seed_scope)
- sub_aud.metrics("seeded client by PDP.getConfig for policyName={0}".format(seed_scope))
-
- PolicyEngineClient.catch_up(audit)
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
new file mode 100644
index 0000000..e1584a3
--- /dev/null
+++ b/policyhandler/policy_receiver.py
@@ -0,0 +1,200 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""
+policy-receiver communicates with policy-engine
+thru web-socket to receive push notifications
+on updates and removal of policies.
+
+on receiving the policy-notifications, the policy-receiver
+filters them out by the policy scope(s) provided in policy-handler config
+and passes the notifications to policy-updater
+"""
+
+import json
+import logging
+import re
+import time
+from threading import Lock, Thread
+
+import websocket
+
+from .config import Config
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
+from .policy_updater import PolicyUpdater
+
+LOADED_POLICIES = 'loadedPolicies'
+REMOVED_POLICIES = 'removedPolicies'
+POLICY_NAME = 'policyName'
+POLICY_VER = 'versionNo'
+
+class _PolicyReceiver(Thread):
+ """web-socket to PolicyEngine"""
+ _logger = logging.getLogger("policy_handler.policy_receiver")
+
+ def __init__(self):
+ """web-socket inside the thread to receive policy notifications from PolicyEngine"""
+ Thread.__init__(self, name="policy_receiver")
+ self.daemon = True
+
+ self._lock = Lock()
+ self._keep_running = True
+
+ config = Config.settings[Config.FIELD_POLICY_ENGINE]
+ self.web_socket_url = resturl = config["url"] + config["path_pdp"]
+
+ if resturl.startswith("https:"):
+ self.web_socket_url = resturl.replace("https:", "wss:") + "notifications"
+ else:
+ self.web_socket_url = resturl.replace("http:", "ws:") + "notifications"
+
+ self._web_socket = None
+
+ scope_prefixes = [scope_prefix.replace(".", "[.]")
+ for scope_prefix in Config.settings["scope_prefixes"]]
+ self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
+ _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
+ self._policy_updater = PolicyUpdater()
+ self._policy_updater.start()
+
+ def run(self):
+ """listen on web-socket and pass the policy notifications to policy-updater"""
+ websocket.enableTrace(True)
+ restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ self._stop_notifications()
+
+ if restarting:
+ time.sleep(5)
+
+ _PolicyReceiver._logger.info(
+ "connecting to policy-notifications at: %s", self.web_socket_url)
+ self._web_socket = websocket.WebSocketApp(
+ self.web_socket_url,
+ on_message=self._on_pdp_message,
+ on_close=self._on_ws_close,
+ on_error=self._on_ws_error
+ )
+
+ _PolicyReceiver._logger.info("waiting for policy-notifications...")
+ self._web_socket.run_forever()
+ restarting = True
+
+ _PolicyReceiver._logger.info("exit policy-receiver")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _stop_notifications(self):
+ """Shuts down the AutoNotification service if running."""
+ with self._lock:
+ if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
+ self._web_socket.close()
+ _PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
+
+ def _on_pdp_message(self, _, message):
+ """received the notification from PDP"""
+ try:
+ _PolicyReceiver._logger.info("Received notification message: %s", message)
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _PolicyReceiver._logger.warning("unexpected message from PDP: %s",
+ json.dumps(message))
+ return
+
+ policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(LOADED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+ policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(REMOVED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+
+ if not policies_updated and not policies_removed:
+ _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
+ self._policy_scopes.pattern)
+ return
+
+ audit = Audit(job_name="policy_update",
+ req_message="policy-notification - updated[{0}], removed[{1}]"
+ .format(len(policies_updated), len(policies_removed)),
+ retry_get_config=True)
+ self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _PolicyReceiver._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+
+ def _on_ws_error(self, _, error):
+ """report an error"""
+ _PolicyReceiver._logger.error("policy-notification error: %s", error)
+
+ def _on_ws_close(self, _):
+ """restart web-socket on close"""
+ _PolicyReceiver._logger.info("lost connection to PDP - restarting...")
+
+ def shutdown(self, audit):
+ """Shutdown the policy-receiver"""
+ _PolicyReceiver._logger.info("shutdown policy-receiver")
+ with self._lock:
+ self._keep_running = False
+
+ self._stop_notifications()
+
+ if self.is_alive():
+ self.join()
+
+ self._policy_updater.shutdown(audit)
+
+ def catch_up(self, audit):
+ """need to bring the latest policies to DCAE-Controller"""
+ self._policy_updater.catch_up(audit)
+
+class PolicyReceiver(object):
+ """policy-receiver - static singleton wrapper"""
+ _policy_receiver = None
+
+ @staticmethod
+ def shutdown(audit):
+ """Shutdown the notification-handler"""
+ PolicyReceiver._policy_receiver.shutdown(audit)
+
+ @staticmethod
+ def catch_up(audit):
+ """bring the latest policies from policy-engine"""
+ PolicyReceiver._policy_receiver.catch_up(audit)
+
+ @staticmethod
+ def run(audit):
+ """Using policy-engine client to talk to policy engine"""
+ PolicyReceiver._policy_receiver = _PolicyReceiver()
+ PolicyReceiver._policy_receiver.start()
+
+ PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index bf8a31d..c8018f6 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -1,8 +1,5 @@
-"""policy-client communicates with policy-engine thru REST API"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,126 +16,43 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
-import json
+"""policy-client communicates with policy-engine thru REST API"""
+
import copy
-import re
+import json
+import logging
import time
from multiprocessing.dummy import Pool as ThreadPool
+
import requests
from .config import Config
-from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \
- POLICY_BODY, POLICY_CONFIG
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode
-
-class PolicyUtils(object):
- """policy-client utils"""
- _logger = logging.getLogger("policy_handler.policy_utils")
- _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
-
- @staticmethod
- def safe_json_parse(json_str):
- """try parsing json without exception - returns the json_str back if fails"""
- if not json_str:
- return json_str
- try:
- return json.loads(json_str)
- except ValueError as err:
- PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
- return json_str
-
- @staticmethod
- def extract_policy_id(policy_name):
- """ policy_name = policy_id + "." + <version> + "." + <extension>
- For instance,
- policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
- policy_id = DCAE_alex.Config_alex_policy_number_1
- policy_scope = DCAE_alex
- policy_class = Config
- policy_version = 3
- type = extension = xml
- delimiter = "."
- policy_class_delimiter = "_"
- policy_name in PAP = DCAE_alex.alex_policy_number_1
- """
- if not policy_name:
- return
- return PolicyUtils._policy_name_ext.sub('', policy_name)
-
- @staticmethod
- def parse_policy_config(policy):
- """try parsing the config in policy."""
- if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]:
- policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(
- policy[POLICY_BODY][POLICY_CONFIG])
- return policy
-
- @staticmethod
- def convert_to_policy(policy_config):
- """wrap policy_config received from policy-engine with policy_id."""
- if not policy_config or POLICY_NAME not in policy_config \
- or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]:
- return
- policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME])
- if not policy_id:
- return
- return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
-
- @staticmethod
- def select_latest_policy(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
- DCAE-Controller is only interested in the latest version
- """
- if not policy_configs:
- return
- latest_policy_config = {}
- for policy_config in policy_configs:
- if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \
- or not policy_config[POLICY_VERSION].isdigit():
- continue
- if not latest_policy_config \
- or int(policy_config[POLICY_VERSION]) \
- > int(latest_policy_config[POLICY_VERSION]):
- latest_policy_config = policy_config
-
- return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
-
- @staticmethod
- def select_latest_policies(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
- DCAE-Controller is only interested in the latest versions
- """
- if not policy_configs:
- return {}
- policies = {}
- for policy_config in policy_configs:
- policy = PolicyUtils.convert_to_policy(policy_config)
- if not policy or POLICY_ID not in policy or POLICY_BODY not in policy:
- continue
- if POLICY_VERSION not in policy[POLICY_BODY] \
- or not policy[POLICY_BODY][POLICY_VERSION] \
- or not policy[POLICY_BODY][POLICY_VERSION].isdigit():
- continue
- if policy[POLICY_ID] not in policies:
- policies[policy[POLICY_ID]] = policy
- continue
- if int(policy[POLICY_BODY][POLICY_VERSION]) \
- > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]):
- policies[policy[POLICY_ID]] = policy
-
- for policy_id in policies:
- policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES,
+ POLICY_BODY, POLICY_CONFIG, POLICY_FILTER,
+ POLICY_ID, POLICY_NAME, SCOPE_PREFIXES)
+from .policy_utils import PolicyUtils
- return policies
class PolicyRest(object):
- """ policy-engine """
+ """using the http API to policy-engine"""
_logger = logging.getLogger("policy_handler.policy_rest")
_lazy_inited = False
+ POLICY_GET_CONFIG = 'getConfig'
+ PDP_CONFIG_STATUS = "policyConfigStatus"
+ PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
+ PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
+ PDP_CONFIG_MESSAGE = "policyConfigMessage"
+ PDP_NO_RESPONSE_RECEIVED = "No Response Received"
+ PDP_STATUS_CODE_ERROR = 400
+ PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
+
+ MIN_VERSION_EXPECTED = "min_version_expected"
+ IGNORE_POLICY_NAMES = "ignore_policy_names"
_requests_session = None
- _url = None
+ _url_get_config = None
_headers = None
_target_entity = None
_thread_pool_size = 4
@@ -154,7 +68,7 @@ class PolicyRest(object):
return
PolicyRest._lazy_inited = True
- config = Config.config[Config.FIELD_POLICY_ENGINE]
+ config = Config.settings[Config.FIELD_POLICY_ENGINE]
pool_size = config.get("pool_connections", 20)
PolicyRest._requests_session = requests.Session()
@@ -167,191 +81,460 @@ class PolicyRest(object):
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
- PolicyRest._url = config["url"] + config["path_api"]
+ PolicyRest._url_get_config = config["url"] \
+ + config["path_api"] + PolicyRest.POLICY_GET_CONFIG
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
- PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4)
+ PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._scope_prefixes = Config.config["scope_prefixes"]
+ PolicyRest._scope_prefixes = Config.settings["scope_prefixes"]
PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \
len(PolicyRest._scope_prefixes))
- PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1
- PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
+ PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
+ PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
- PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \
- PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \
+ PolicyRest._logger.info(
+ "PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
+ PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers),
json.dumps(PolicyRest._scope_prefixes))
@staticmethod
- def _post(audit, path, json_body):
+ def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
- full_path = PolicyRest._url + path
- sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \
- targetServiceName=full_path)
+ metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
+ targetServiceName=PolicyRest._url_get_config)
msg = json.dumps(json_body)
headers = copy.copy(PolicyRest._headers)
- headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id
- headers_str = Audit.log_json_dumps(headers)
+ headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+ headers_str = Metrics.log_json_dumps(headers)
- log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str)
- sub_aud.metrics_start(log_line)
+ log_line = "post to PDP {0} msg={1} headers={2}".format(
+ PolicyRest._url_get_config, msg, headers_str)
+ metrics.metrics_start(log_line)
PolicyRest._logger.info(log_line)
res = None
try:
- res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers)
- except requests.exceptions.RequestException as ex:
- error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
- error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \
- .format(full_path, str(ex), msg, headers_str)
+ res = PolicyRest._requests_session.post(
+ PolicyRest._url_get_config, json=json_body, headers=headers)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = (
+ "failed to post to PDP {0} {1}: {2} msg={3} headers={4}"
+ .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str))
PolicyRest._logger.exception(error_msg)
- sub_aud.set_http_status_code(error_code)
+ metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
- sub_aud.metrics(error_msg)
+ metrics.metrics(error_msg)
return (error_code, None)
- log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \
- full_path, res.status_code, msg, res.text, \
- Audit.log_json_dumps(dict(res.request.headers.items())))
- sub_aud.set_http_status_code(res.status_code)
- sub_aud.metrics(log_line)
+ log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
+ PolicyRest._url_get_config, res.status_code, msg, res.text,
+ Metrics.log_json_dumps(dict(res.request.headers.items())))
+
+ status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
+
+ if status_code:
+ return status_code, res_data
+
+ metrics.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
PolicyRest._logger.info(log_line)
+ return res.status_code, res_data
+ @staticmethod
+ def _extract_pdp_res_data(audit, metrics, log_line, res):
+ """special treatment of pdp response"""
+ res_data = None
if res.status_code == requests.codes.ok:
- return res.status_code, res.json()
+ res_data = res.json()
+
+ if res_data and isinstance(res_data, list) and len(res_data) == 1:
+ rslt = res_data[0]
+ if rslt and not rslt.get(POLICY_NAME):
+ res_data = None
+ if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
+ error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ error_msg = "unexpected {0}".format(log_line)
+
+ PolicyRest._logger.error(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return error_code, None
+ return None, res_data
+
+ if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR:
+ try:
+ res_data = res.json()
+ except ValueError:
+ return None, None
+
+ if not res_data or not isinstance(res_data, list) or len(res_data) != 1:
+ return None, None
+
+ rslt = res_data[0]
+ if (rslt and not rslt.get(POLICY_NAME)
+ and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
+ and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
+ status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
+ info_msg = "not found {0}".format(log_line)
+
+ PolicyRest._logger.info(info_msg)
+ metrics.set_http_status_code(status_code)
+ metrics.metrics(info_msg)
+ return status_code, None
+ return None, None
- return res.status_code, None
@staticmethod
- def get_latest_policy(aud_policy_name):
- """Get the latest policy for the policy_name from the policy-engine"""
- PolicyRest._lazy_init()
- audit, policy_name = aud_policy_name
+ def _validate_policy(policy):
+ """Validates the config on policy"""
+ if not policy:
+ return
- status_code = 0
+ policy_body = policy.get(POLICY_BODY)
+
+ return bool(
+ policy_body
+ and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED
+ and policy_body.get(POLICY_CONFIG)
+ )
+
+ @staticmethod
+ def get_latest_policy(aud_policy_id):
+ """safely try retrieving the latest policy for the policy_id from the policy-engine"""
+ audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
+ str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format(
+ policy_id, min_version_expected, json.dumps(ignore_policy_names))
+
+ try:
+ return PolicyRest._get_latest_policy(
+ audit, policy_id, min_version_expected, ignore_policy_names, str_metrics)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policy", str_metrics))
+
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
+
+
+ @staticmethod
+ def _get_latest_policy(audit, policy_id,
+ min_version_expected, ignore_policy_names, str_metrics):
+ """retry several times getting the latest policy for the policy_id from the policy-engine"""
+ PolicyRest._lazy_init()
latest_policy = None
- for retry in xrange(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug("%s", policy_name)
- status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
- {POLICY_NAME:policy_name})
- PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \
- json.dumps(policy_configs or []))
- latest_policy = PolicyUtils.select_latest_policy(policy_configs)
- if not latest_policy:
- audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \
- .format(policy_name, json.dumps(policy_configs or [])), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
-
- if latest_policy or not audit.retry_get_config \
- or not PolicyRest._policy_retry_sleep \
- or AuditResponseCode.PERMISSION_ERROR.value \
- == AuditResponseCode.get_response_code(status_code).value:
+ status_code = 0
+ retry_get_config = audit.kwargs.get("retry_get_config")
+ expect_policy_removed = (ignore_policy_names and not min_version_expected)
+
+ for retry in range(1, PolicyRest._policy_retry_count + 1):
+ PolicyRest._logger.debug(str_metrics)
+
+ done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
+ audit, policy_id, min_version_expected, ignore_policy_names,
+ expect_policy_removed)
+
+ if done or not retry_get_config or not PolicyRest._policy_retry_sleep:
break
if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \
- .format(POLICY_GET_CONFIG, retry, policy_name), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ .format(PolicyRest._url_get_config, retry, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
break
- audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \
- .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ audit.warn(
+ "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ retry, PolicyRest._url_get_config,
+ PolicyRest._policy_retry_sleep, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
time.sleep(PolicyRest._policy_retry_sleep)
+ if (expect_policy_removed and not latest_policy
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code):
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
audit.set_http_status_code(status_code)
- if not latest_policy:
+ if not PolicyRest._validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
return latest_policy
@staticmethod
- def get_latest_policies_by_names(aud_policy_names):
+ def _get_latest_policy_once(audit, policy_id,
+ min_version_expected, ignore_policy_names,
+ expect_policy_removed):
+ """single attempt to get the latest policy for the policy_id from the policy-engine"""
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
+
+ PolicyRest._logger.debug("%s %s policy_configs: %s",
+ status_code, policy_id, json.dumps(policy_configs or []))
+
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_configs, min_version_expected, ignore_policy_names
+ )
+
+ if not latest_policy and not expect_policy_removed:
+ audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ .format(policy_id, json.dumps(policy_configs or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ done = bool(latest_policy
+ or (expect_policy_removed and not policy_configs)
+ or audit.is_serious_error(status_code))
+
+ return done, latest_policy, status_code
+
+ @staticmethod
+ def get_latest_updated_policies(aud_policy_updates):
+ """safely try retrieving the latest policies for the list of policy_names"""
+ audit, policies_updated, policies_removed = aud_policy_updates
+ if not policies_updated and not policies_removed:
+ return None, None
+
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
+
+ try:
+ return PolicyRest._get_latest_updated_policies(
+ audit, str_metrics, policies_updated, policies_removed)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_updated_policies", str_metrics))
+
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None
+
+ @staticmethod
+ def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
"""Get the latest policies of the list of policy_names from the policy-engine"""
PolicyRest._lazy_init()
- audit, policy_names = aud_policy_names
- if not policy_names:
- return
+ metrics = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ PolicyRest._logger.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_name, policy_version) in policies_updated:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+ policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+
+ for (policy_name, _) in policies_removed:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
- audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \
- len(policy_names), json.dumps(policy_names)))
- PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names))
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.items()]
- thread_count = min(PolicyRest._thread_pool_size, len(policy_names))
- apns = [(audit, policy_name) for policy_name in policy_names]
policies = None
- if thread_count == 1:
+ apns_length = len(apns)
+ if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
else:
- pool = ThreadPool(thread_count)
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
policies = pool.map(PolicyRest.get_latest_policy, apns)
pool.close()
pool.join()
- audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \
- len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \
- targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
- policies = dict([(policy[POLICY_ID], policy) \
- for policy in policies if policy and POLICY_ID in policy])
- PolicyRest._logger.debug("policies %s", json.dumps(policies))
- if not policies:
+ metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)))
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ PolicyRest._logger.debug(
+ "result updated_policies %s, removed_policies %s, errored_policies %s",
+ json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- return policies
+ audit.error(
+ "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return updated_policies, removed_policies
+
@staticmethod
- def _get_latest_policies(aud_scope_prefix):
- """Get the latest policies of the same scope from the policy-engine"""
- audit, scope_prefix = aud_scope_prefix
- PolicyRest._logger.debug("%s", scope_prefix)
- status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
- {POLICY_NAME:scope_prefix + ".*"})
- audit.set_http_status_code(status_code)
- PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \
- scope_prefix, json.dumps(policy_configs or []))
- latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+ def _get_latest_policies(aud_policy_filter):
+ """
+ get the latest policies by policy_filter
+ or all the latest policies of the same scope from the policy-engine
+ """
+ audit, policy_filter, scope_prefix = aud_policy_filter
+ try:
+ str_policy_filter = json.dumps(policy_filter)
+ PolicyRest._logger.debug("%s", str_policy_filter)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+
+ PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_configs or []))
+
+ latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+
+ if (scope_prefix and not policy_configs
+ and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value):
+ audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
+ error_code=AuditResponseCode.DATA_ERROR)
+ return None, latest_policies, scope_prefix
+
+ if not latest_policies:
+ if not scope_prefix:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.warn(
+ "received no policies from PDP for policy_filter {0}: {1}"
+ .format(str_policy_filter, json.dumps(policy_configs or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+ return None, latest_policies, None
+
+ audit.set_http_status_code(status_code)
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in latest_policies.items():
+ if PolicyRest._validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+ return valid_policies, errored_policies, None
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "_get_latest_policies", json.dumps(policy_filter), scope_prefix))
+
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None, scope_prefix
- if not latest_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \
- scope_prefix, json.dumps(policy_configs or [])), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
- return latest_policies
@staticmethod
- def get_latest_policies(audit):
+ def get_latest_policies(audit, policy_filter=None):
"""Get the latest policies of the same scope from the policy-engine"""
- PolicyRest._lazy_init()
- PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes))
-
- audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \
- len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes)))
- asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes]
- latest_policies = None
- if PolicyRest._scope_thread_pool_size == 1:
- latest_policies = [PolicyRest._get_latest_policies(asps[0])]
- else:
- pool = ThreadPool(PolicyRest._scope_thread_pool_size)
- latest_policies = pool.map(PolicyRest._get_latest_policies, asps)
- pool.close()
- pool.join()
+ result = {}
+ aud_policy_filters = None
+ str_policy_filters = None
+ str_metrics = None
+ target_entity = None
- audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \
- len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \
- len(latest_policies), json.dumps(latest_policies)), \
- targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
-
- latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items())
- PolicyRest._logger.debug("latest_policies: %s %s", \
- json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies))
+ try:
+ PolicyRest._lazy_init()
+ if policy_filter is not None:
+ aud_policy_filters = [(audit, policy_filter, None)]
+ str_policy_filters = json.dumps(policy_filter)
+ str_metrics = "get_latest_policies for policy_filter {0}".format(
+ str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by policy_filter"
+ .format(PolicyRest._target_entity))
+ result[POLICY_FILTER] = copy.deepcopy(policy_filter)
+ else:
+ aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
+ for scope_prefix in PolicyRest._scope_prefixes]
+ str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
+ str_metrics = "get_latest_policies for scopes {0} {1}".format( \
+ len(PolicyRest._scope_prefixes), str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by scope_prefixes"
+ .format(PolicyRest._target_entity))
+ result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
+
+ PolicyRest._logger.debug("%s", str_policy_filters)
+ metrics = Metrics(aud_parent=audit, targetEntity=target_entity,
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics.metrics_start(str_metrics)
+
+ latest_policies = None
+ apfs_length = len(aud_policy_filters)
+ if apfs_length == 1:
+ latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+ latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
+ pool.close()
+ pool.join()
+
+ metrics.metrics("total result {0}: {1} {2}".format(
+ str_metrics, len(latest_policies), json.dumps(latest_policies)))
+
+ # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+ result[LATEST_POLICIES] = dict(
+ pair for (vps, _, _) in latest_policies if vps for pair in vps.items())
+
+ result[ERRORED_POLICIES] = dict(
+ pair for (_, eps, _) in latest_policies if eps for pair in eps.items())
+
+ result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
+
+ PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
+ str_policy_filters, json.dumps(result))
+ return result
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policies", str_metrics))
- return latest_policies
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 1f1539f..5ba7c29 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -1,8 +1,5 @@
-"""policy-updater thread"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,13 +16,23 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
+"""policy-updater thread"""
+
+import copy
import json
-from Queue import Queue
-from threading import Thread, Lock
+import logging
+from queue import Queue
+from threading import Lock, Thread
-from .policy_rest import PolicyRest
+from .config import Config
from .deploy_handler import DeployHandler
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
+from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
+ REMOVED_POLICIES)
+from .policy_rest import PolicyRest
+from .policy_utils import Utils
+from .step_timer import StepTimer
+
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -33,92 +40,259 @@ class PolicyUpdater(Thread):
def __init__(self):
"""init static config of PolicyUpdater."""
- Thread.__init__(self)
- self.name = "policy_updater"
+ Thread.__init__(self, name="policy_updater")
self.daemon = True
- self._req_shutdown = None
- self._req_catch_up = None
+ self._catch_up_timer = None
+ self._aud_shutdown = None
+ self._aud_catch_up = None
+
+ catch_up_config = Config.settings.get(CATCH_UP, {})
+ self._catch_up_interval = catch_up_config.get("interval") or 15*60
+ self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
+ self._catch_up_skips = 0
+ self._catch_up_prev_message = None
self._lock = Lock()
self._queue = Queue()
- def enqueue(self, audit=None, policy_names=None):
- """enqueue the policy-names"""
- policy_names = policy_names or []
- PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names))
- self._queue.put((audit, policy_names))
+
+ def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
+ """enqueue the policy-updates"""
+ policies_updated = policies_updated or []
+ policies_removed = policies_removed or []
+
+ PolicyUpdater._logger.info(
+ "enqueue request_id %s policies_updated %s policies_removed %s",
+ ((audit and audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+ with self._lock:
+ self._queue.put((audit, policies_updated, policies_removed))
+
+
+ def catch_up(self, audit=None):
+ """need to bring the latest policies to DCAE-Controller"""
+ with self._lock:
+ if not self._aud_catch_up:
+ self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+ PolicyUpdater._logger.info(
+ "catch_up %s request_id %s",
+ self._aud_catch_up.req_message, self._aud_catch_up.request_id
+ )
+
+ self.enqueue()
+
def run(self):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- audit, policy_names = self._queue.get()
- PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names))
+ queued_audit, policies_updated, policies_removed = self._queue.get()
+ PolicyUpdater._logger.info(
+ "got request_id %s policies_updated %s policies_removed %s",
+ ((queued_audit and queued_audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
if not self._keep_running():
- self._queue.task_done()
break
+
if self._on_catch_up():
+ self._reset_queue()
continue
-
- if not policy_names:
- self._queue.task_done()
+ elif not queued_audit:
continue
- updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names))
- PolicyUpdater.policy_update(audit, updated_policies)
- audit.audit_done()
- self._queue.task_done()
+ self._on_policies_update(queued_audit, policies_updated, policies_removed)
PolicyUpdater._logger.info("exit policy-updater")
def _keep_running(self):
"""thread-safe check whether to continue running"""
- self._lock.acquire()
- keep_running = not self._req_shutdown
- self._lock.release()
- if self._req_shutdown:
- self._req_shutdown.audit_done()
+ with self._lock:
+ keep_running = not self._aud_shutdown
+
+ if self._aud_shutdown:
+ self._aud_shutdown.audit_done()
return keep_running
- def catch_up(self, audit):
- """need to bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- self._req_catch_up = audit
- self._lock.release()
- self.enqueue()
+ def _run_catch_up_timer(self):
+ """create and start the catch_up timer"""
+ if not self._catch_up_interval:
+ return
+
+ if self._catch_up_timer:
+ self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
+ self._catch_up_timer.next()
+ return
+
+ self._catch_up_timer = StepTimer(
+ "catch_up_timer",
+ self._catch_up_interval,
+ PolicyUpdater.catch_up,
+ PolicyUpdater._logger,
+ self
+ )
+ self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
+ self._catch_up_timer.start()
+
+ def _pause_catch_up_timer(self):
+ """pause catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("pause catch_up_timer")
+ self._catch_up_timer.pause()
+
+ def _stop_catch_up_timer(self):
+ """stop and destroy the catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("stopping catch_up_timer")
+ self._catch_up_timer.stop()
+ self._catch_up_timer.join()
+ self._catch_up_timer = None
+ self._logger.info("stopped catch_up_timer")
+
+ def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
+ """try not to send the duplicate messages on auto catchup unless hitting the max count"""
+ if aud_catch_up.req_message != AUTO_CATCH_UP \
+ or self._catch_up_skips >= self._catch_up_max_skips \
+ or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
+ self._catch_up_skips = 0
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "going to send the catch_up {0}: {1}".format(
+ aud_catch_up.req_message,
+ json.dumps(self._catch_up_prev_message)
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
+ return True
+
+ self._catch_up_skips += 1
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format(
+ self._catch_up_skips, self._catch_up_max_skips,
+ aud_catch_up.req_message, json.dumps(self._catch_up_prev_message)
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
+ return False
+
+ def _reset_queue(self):
+ """clear up the queue"""
+ with self._lock:
+ if not self._aud_catch_up and not self._aud_shutdown:
+ with self._queue.mutex:
+ self._queue.queue.clear()
def _on_catch_up(self):
- """Bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- req_catch_up = self._req_catch_up
- if self._req_catch_up:
- self._req_catch_up = None
- self._queue.task_done()
- self._queue = Queue()
- self._lock.release()
- if not req_catch_up:
+ """bring all the latest policies to DCAE-Controller"""
+ with self._lock:
+ aud_catch_up = self._aud_catch_up
+ if self._aud_catch_up:
+ self._aud_catch_up = None
+
+ if not aud_catch_up:
return False
- PolicyUpdater._logger.info("catch_up")
- latest_policies = PolicyRest.get_latest_policies(req_catch_up)
- PolicyUpdater.policy_update(req_catch_up, latest_policies)
- req_catch_up.audit_done()
- return True
+ log_line = "catch_up {0} request_id {1}".format(
+ aud_catch_up.req_message, aud_catch_up.request_id
+ )
+ try:
+ PolicyUpdater._logger.info(log_line)
+ self._pause_catch_up_timer()
+
+ catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+ catch_up_message[CATCH_UP] = True
+
+ catch_up_result = ""
+ if not aud_catch_up.is_success():
+ catch_up_result = "- not sending catch-up to deployment-handler due to errors"
+ PolicyUpdater._logger.warning(catch_up_result)
+ elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+ catch_up_result = "- skipped sending the same policies"
+ else:
+ DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+ if not aud_catch_up.is_success():
+ catch_up_result = "- failed to send catch-up to deployment-handler"
+ PolicyUpdater._logger.warning(catch_up_result)
+ else:
+ catch_up_result = "- sent catch-up to deployment-handler"
+ success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(log_line + " " + catch_up_result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(aud_catch_up.request_id, type(ex).__name__, str(ex),
+ "on_catch_up", log_line + " " + catch_up_result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
+
+ if not success:
+ self._catch_up_prev_message = None
+
+ self._run_catch_up_timer()
+
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_catch_up.health(full=True)))
+ PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
+ return success
+
+
+ def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
+ """handle the event of policy-updates from the queue"""
+ deployment_handler_changed = None
+ result = ""
+
+ log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
+ ((queued_audit and queued_audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+ try:
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (queued_audit, policies_updated, policies_removed))
+
+ if not queued_audit.is_success():
+ result = "- not sending policy-updates to deployment-handler due to errors"
+ PolicyUpdater._logger.warning(result)
+ else:
+ message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+ deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+ if not queued_audit.is_success():
+ result = "- failed to send policy-updates to deployment-handler"
+ PolicyUpdater._logger.warning(result)
+ else:
+ result = "- sent policy-updates to deployment-handler"
+
+ success, _, _ = queued_audit.audit_done(result=result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(queued_audit.request_id, type(ex).__name__, str(ex),
+ "on_policies_update", log_line + " " + result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
+
+ if deployment_handler_changed:
+ self._catch_up_prev_message = None
+ self._pause_catch_up_timer()
+ self.catch_up()
+ elif not success:
+ self._catch_up_prev_message = None
- @staticmethod
- def policy_update(audit, updated_policies):
- """Invoke deploy-handler"""
- if updated_policies:
- PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies))
- DeployHandler.policy_update(audit, updated_policies)
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")
- self._lock.acquire()
- self._req_shutdown = audit
- self._lock.release()
+ with self._lock:
+ self._aud_shutdown = audit
self.enqueue()
+
+ self._stop_catch_up_timer()
+
if self.is_alive():
self.join()
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
new file mode 100644
index 0000000..c2a8b07
--- /dev/null
+++ b/policyhandler/policy_utils.py
@@ -0,0 +1,175 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""policy-client communicates with policy-engine thru REST API"""
+
+import json
+import logging
+import re
+
+from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME,
+ POLICY_VERSION)
+
+class PolicyUtils(object):
+ """policy-client utils"""
+ _logger = logging.getLogger("policy_handler.policy_utils")
+ _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
+
+ @staticmethod
+ def extract_policy_id(policy_name):
+ """ policy_name = policy_id + "." + <version> + "." + <extension>
+ For instance,
+ policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
+ policy_id = DCAE_alex.Config_alex_policy_number_1
+ policy_scope = DCAE_alex
+ policy_class = Config
+ policy_version = 3
+ type = extension = xml
+ delimiter = "."
+ policy_class_delimiter = "_"
+ policy_name in PAP = DCAE_alex.alex_policy_number_1
+ """
+ if not policy_name:
+ return
+ return PolicyUtils._policy_name_ext.sub('', policy_name)
+
+ @staticmethod
+ def parse_policy_config(policy):
+ """try parsing the config in policy."""
+ if not policy:
+ return policy
+ config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
+ if config:
+ policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
+ return policy
+
+ @staticmethod
+ def convert_to_policy(policy_config):
+ """wrap policy_config received from policy-engine with policy_id."""
+ if not policy_config:
+ return
+ policy_name = policy_config.get(POLICY_NAME)
+ policy_version = policy_config.get(POLICY_VERSION)
+ if not policy_name or not policy_version:
+ return
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ return
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
+
+ @staticmethod
+ def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None):
+ """For some reason, the policy-engine returns all version of the policy_configs.
+ DCAE-Controller is only interested in the latest version
+ """
+ if not policy_configs:
+ return
+ latest_policy_config = {}
+ for policy_config in policy_configs:
+ policy_name = policy_config.get(POLICY_NAME)
+ policy_version = policy_config.get(POLICY_VERSION)
+ if not policy_name or not policy_version or not policy_version.isdigit():
+ continue
+ policy_version = int(policy_version)
+ if min_version_expected and policy_version < min_version_expected:
+ continue
+ if ignore_policy_names and policy_name in ignore_policy_names:
+ continue
+
+ if not latest_policy_config \
+ or int(latest_policy_config[POLICY_VERSION]) < policy_version:
+ latest_policy_config = policy_config
+
+ return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
+
+ @staticmethod
+ def select_latest_policies(policy_configs):
+ """For some reason, the policy-engine returns all version of the policy_configs.
+ DCAE-Controller is only interested in the latest versions
+ """
+ if not policy_configs:
+ return {}
+ policies = {}
+ for policy_config in policy_configs:
+ policy = PolicyUtils.convert_to_policy(policy_config)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ if not policy_id or not policy_version or not policy_version.isdigit():
+ continue
+ if policy_id not in policies \
+ or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]):
+ policies[policy_id] = policy
+
+ for policy_id in policies:
+ policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
+
+ return policies
+
+class Utils(object):
+ """general purpose utils"""
+ _logger = logging.getLogger("policy_handler.utils")
+
+ @staticmethod
+ def safe_json_parse(json_str):
+ """try parsing json without exception - returns the json_str back if fails"""
+ if not json_str:
+ return json_str
+ try:
+ return json.loads(json_str)
+ except (ValueError, TypeError) as err:
+ Utils._logger.warning("unexpected json error(%s): len(%s) str[:100]: (%s)",
+ str(err), len(json_str), str(json_str)[:100])
+ return json_str
+
+ @staticmethod
+ def are_the_same(body_1, body_2):
+ """check whether both objects are the same"""
+ if (body_1 and not body_2) or (not body_1 and body_2):
+ Utils._logger.debug("only one is empty %s != %s", body_1, body_2)
+ return False
+
+ if body_1 is None and body_2 is None:
+ return True
+
+ if isinstance(body_1, list) and isinstance(body_2, list):
+ if len(body_1) != len(body_2):
+ Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2))
+ return False
+
+ for val_1, val_2 in zip(body_1, body_2):
+ if not Utils.are_the_same(val_1, val_2):
+ return False
+ return True
+
+ if isinstance(body_1, dict) and isinstance(body_2, dict):
+ if body_1.keys() ^ body_2.keys():
+ Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2))
+ return False
+
+ for key, val_1 in body_1.items():
+ if not Utils.are_the_same(val_1, body_2[key]):
+ return False
+ return True
+
+ # ... here when primitive values or mismatched types ...
+ the_same_values = (body_1 == body_2)
+ if not the_same_values:
+ Utils._logger.debug("values %s != %s", body_1, body_2)
+ return the_same_values
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
new file mode 100644
index 0000000..768b400
--- /dev/null
+++ b/policyhandler/step_timer.py
@@ -0,0 +1,175 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""periodically callback"""
+
+import json
+from datetime import datetime
+from threading import Event, RLock, Thread
+
+
+class StepTimer(Thread):
+ """call on_time after interval number of seconds, then wait to continue"""
+ STATE_INIT = "init"
+ STATE_NEXT = "next"
+ STATE_STARTED = "started"
+ STATE_PAUSED = "paused"
+ STATE_STOPPING = "stopping"
+ STATE_STOPPED = "stopped"
+
+ def __init__(self, name, interval, on_time, logger, *args, **kwargs):
+ """create step timer with controlled start. next step and pause"""
+ Thread.__init__(self, name=name)
+ self._interval = interval
+ self._on_time = on_time
+ self._logger = logger
+ self._args = args
+ self._kwargs = kwargs
+
+ self._lock = RLock()
+
+ self._timeout = Event()
+ self._waiting_for_timeout = False
+ self._next = Event()
+ self._paused = False
+ self._finished = False
+
+ self._request = StepTimer.STATE_INIT
+ self._req_count = 0
+ self._req_time = 0
+ self._req_ts = datetime.utcnow()
+
+ self._substep = None
+ self._substep_time = 0
+ self._substep_ts = datetime.utcnow()
+
+ def get_timer_status(self):
+ """returns timer status"""
+ with self._lock:
+ return "{0}[{1}] {2}: timeout({3}), paused({4}), next({5}), finished({6})".format(
+ self._request,
+ self._req_count,
+ self._substep,
+ self._timeout.is_set(),
+ self._paused,
+ self._next.is_set(),
+ self._finished,
+ )
+
+ def next(self):
+ """continue with the next timeout"""
+ with self._lock:
+ self._paused = False
+ if self._waiting_for_timeout:
+ self._next.set()
+ self._timeout.set()
+ else:
+ self._next.set()
+ self._request_to_timer(StepTimer.STATE_NEXT)
+
+ def pause(self):
+ """pause the timer"""
+ with self._lock:
+ self._paused = True
+ self._next.clear()
+ self._request_to_timer(StepTimer.STATE_PAUSED)
+
+ def stop(self):
+ """stop the timer if it hasn't finished yet"""
+ with self._lock:
+ self._finished = True
+ self._timeout.set()
+ self._next.set()
+ self._request_to_timer(StepTimer.STATE_STOPPING)
+
+ def _request_to_timer(self, request):
+ """set the request on the timer"""
+ with self._lock:
+ if request in [StepTimer.STATE_NEXT, StepTimer.STATE_STARTED]:
+ self._req_count += 1
+
+ prev_req = self._request
+ self._request = request
+ utcnow = datetime.utcnow()
+ self._req_time = (utcnow - self._req_ts).total_seconds()
+ self._req_ts = utcnow
+ self._logger.info("{0}[{1}] {2}->{3}".format(
+ self.name, self._req_time, prev_req, self.get_timer_status()))
+
+ def _log_substep(self, substep):
+ """log timer substep"""
+ with self._lock:
+ self._substep = substep
+ utcnow = datetime.utcnow()
+ self._substep_time = (utcnow - self._substep_ts).total_seconds()
+ self._substep_ts = utcnow
+ self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status()))
+
+ def _on_time_event(self):
+ """execute the _on_time event"""
+ if self._paused:
+ self._log_substep("paused - skip on_time event")
+ return
+
+ try:
+ self._log_substep("on_time event")
+ self._on_time(*self._args, **self._kwargs)
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})"
+ .format(self.name, type(ex).__name__, str(ex), "_on_time",
+ json.dumps(self._args), json.dumps(self._kwargs)))
+ self._logger.exception(error_msg)
+
+ def run(self):
+ """loop one step a time until stopped=finished"""
+ self._request_to_timer(StepTimer.STATE_STARTED)
+ while True:
+ with self._lock:
+ self._timeout.clear()
+ self._waiting_for_timeout = True
+ self._log_substep("waiting for timeout {0}...".format(self._interval))
+
+ interrupted = self._timeout.wait(self._interval)
+
+ with self._lock:
+ self._waiting_for_timeout = False
+ self._log_substep("woke up after {0}timeout"
+ .format((interrupted and "interrupted ") or ""))
+
+ if self._finished:
+ self._log_substep("finished")
+ break
+
+ if self._next.is_set() and interrupted:
+ self._next.clear()
+ self._log_substep("restart timer")
+ continue
+
+ self._on_time_event()
+
+ self._log_substep("waiting for next...")
+ self._next.wait()
+ with self._lock:
+ self._next.clear()
+ self._log_substep("woke up on next")
+
+ if self._finished:
+ self._log_substep("finished")
+ break
+
+ self._request_to_timer(StepTimer.STATE_STOPPED)
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 9a5ee19..c49536f 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -1,8 +1,5 @@
-"""web-service for policy_handler"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,6 +16,8 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+"""web-service for policy_handler"""
+
import logging
import json
from datetime import datetime
@@ -27,78 +26,195 @@ import cherrypy
from .config import Config
from .onap.audit import Audit
from .policy_rest import PolicyRest
-from .policy_engine import PolicyEngineClient
+from .policy_receiver import PolicyReceiver
class PolicyWeb(object):
- """Main static class for REST API of policy-handler"""
- logger = logging.getLogger("policy_handler.web_cherrypy")
+ """run REST API of policy-handler"""
+ SERVER_HOST = "0.0.0.0"
+ logger = logging.getLogger("policy_handler.policy_web")
@staticmethod
- def run():
- """run forever the web-server of the policy-handler"""
- PolicyWeb.logger.info("policy_handler web-service at port(%d)...", \
- Config.wservice_port)
- cherrypy.config.update({"server.socket_host": "0.0.0.0", \
- 'server.socket_port': Config.wservice_port})
- cherrypy.tree.mount(PolicyLatest(), '/policy_latest')
- cherrypy.tree.mount(PoliciesLatest(), '/policies_latest')
- cherrypy.tree.mount(PoliciesCatchUp(), '/catch_up')
- cherrypy.quickstart(Shutdown(), '/shutdown')
-
-class Shutdown(object):
- """Shutdown the policy-handler"""
- @cherrypy.expose
- def index(self):
- """shutdown event"""
- audit = Audit(req_message="get /shutdown", headers=cherrypy.request.headers)
- PolicyWeb.logger.info("--------- stopping REST API of policy-handler -----------")
- cherrypy.engine.exit()
- PolicyEngineClient.shutdown(audit)
- PolicyWeb.logger.info("--------- the end -----------")
- res = str(datetime.now())
- audit.info_requested(res)
- return "goodbye! shutdown requested {0}".format(res)
+ def run_forever(audit):
+ """run the web-server of the policy-handler forever"""
+ PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port)
+ cherrypy.config.update({"server.socket_host": PolicyWeb.SERVER_HOST,
+ 'server.socket_port': Config.wservice_port})
+ cherrypy.tree.mount(_PolicyWeb(), '/')
+ audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port))
+ cherrypy.engine.start()
+
+class _PolicyWeb(object):
+ """REST API of policy-handler"""
-class PoliciesLatest(object):
- """REST API of the policy-hanlder"""
+ @staticmethod
+ def _get_request_info(request):
+ """returns info about the http request"""
+ return "{0} {1}{2}".format(request.method, request.script_name, request.path_info)
@cherrypy.expose
+ @cherrypy.popargs('policy_id')
@cherrypy.tools.json_out()
- def index(self):
- """find the latest policy by policy_id or all latest policies"""
- audit = Audit(req_message="get /policies_latest", headers=cherrypy.request.headers)
- res = PolicyRest.get_latest_policies(audit) or {}
- PolicyWeb.logger.info("PoliciesLatest: %s", json.dumps(res))
- audit.audit_done(result=json.dumps(res))
- return res
+ def policy_latest(self, policy_id):
+ """retireves the latest policy identified by policy_id"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="get_latest_policy",
+ req_message=req_info, headers=cherrypy.request.headers)
+ PolicyWeb.logger.info("%s policy_id=%s headers=%s", \
+ req_info, policy_id, json.dumps(cherrypy.request.headers))
+
+ latest_policy = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {}
+
+ PolicyWeb.logger.info("res %s policy_id=%s latest_policy=%s",
+ req_info, policy_id, json.dumps(latest_policy))
-@cherrypy.popargs('policy_id')
-class PolicyLatest(object):
- """REST API of the policy-hanlder"""
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(latest_policy))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return latest_policy
+
+ def _get_all_policies_latest(self):
+ """retireves all the latest policies on GET /policies_latest"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="get_all_policies_latest",
+ req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+
+ result = PolicyRest.get_latest_policies(audit)
+
+ PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result
@cherrypy.expose
@cherrypy.tools.json_out()
- def index(self, policy_id):
- """find the latest policy by policy_id or all latest policies"""
- audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""), \
+ @cherrypy.tools.json_in()
+ def policies_latest(self):
+ """
+ on :GET: retrieves all the latest policies from policy-engine that are
+ in the scope of the policy-handler.
+
+ on :POST: expects to receive the params that mimic the /getConfig of policy-engine
+ and retrieves the matching policies from policy-engine and picks the latest on each policy.
+
+ sample request - policies filter
+
+ {
+ "configAttributes": { "key1":"value1" },
+ "configName": "alex_config_name",
+ "onapName": "DCAE",
+ "policyName": "DCAE_alex.Config_alex_.*",
+ "unique": false
+ }
+
+ sample response
+
+ {
+ "DCAE_alex.Config_alex_priority": {
+ "policy_body": {
+ "policyName": "DCAE_alex.Config_alex_priority.3.xml",
+ "policyConfigMessage": "Config Retrieved! ",
+ "responseAttributes": {},
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "matchingConditions": {
+ "priority": "10",
+ "key1": "value1",
+ "ONAPName": "DCAE",
+ "ConfigName": "alex_config_name"
+ },
+ "property": null,
+ "config": {
+ "foo": "bar",
+ "foo_updated": "2018-10-06T16:54:31.696Z"
+ },
+ "policyVersion": "3"
+ },
+ "policy_id": "DCAE_alex.Config_alex_priority"
+ }
+ }
+ """
+ if cherrypy.request.method == "GET":
+ return self._get_all_policies_latest()
+
+ if cherrypy.request.method != "POST":
+ raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method))
+
+ policy_filter = cherrypy.request.json or {}
+ str_policy_filter = json.dumps(policy_filter)
+
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="get_latest_policies",
+ req_message="{0}: {1}".format(req_info, str_policy_filter), \
headers=cherrypy.request.headers)
- PolicyWeb.logger.info("PolicyLatest policy_id=%s headers=%s", \
- policy_id, json.dumps(cherrypy.request.headers))
- res = PolicyRest.get_latest_policy((audit, policy_id)) or {}
- PolicyWeb.logger.info("PolicyLatest policy_id=%s res=%s", policy_id, json.dumps(res))
- audit.audit_done(result=json.dumps(res))
- return res
+ PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \
+ req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
+
+ result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
+
+ PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \
+ req_info, str_policy_filter, json.dumps(result))
+
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
+ if not success:
+ cherrypy.response.status = http_status_code
+
+ return result
-class PoliciesCatchUp(object):
- """catch up with all DCAE policies"""
@cherrypy.expose
@cherrypy.tools.json_out()
- def index(self):
- """catch up with all policies"""
- started = str(datetime.now())
- audit = Audit(req_message="get /catch_up", headers=cherrypy.request.headers)
- PolicyEngineClient.catch_up(audit)
+ def catch_up(self):
+ """catch up with all DCAE policies"""
+ started = str(datetime.utcnow())
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="catch_up", req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+ PolicyReceiver.catch_up(audit)
+
res = {"catch-up requested": started}
- PolicyWeb.logger.info("PoliciesCatchUp: %s", json.dumps(res))
+ PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res))
audit.info_requested(started)
return res
+
+ @cherrypy.expose
+ def shutdown(self):
+ """Shutdown the policy-handler"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="shutdown", req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s: --- stopping REST API of policy-handler ---", req_info)
+
+ cherrypy.engine.exit()
+
+ PolicyReceiver.shutdown(audit)
+
+ PolicyWeb.logger.info("policy_handler health: {0}"
+ .format(json.dumps(audit.health(full=True))))
+ PolicyWeb.logger.info("%s: --------- the end -----------", req_info)
+ res = str(datetime.utcnow())
+ audit.info_requested(res)
+ PolicyWeb.logger.info("process_info: %s", json.dumps(audit.process_info()))
+ return "goodbye! shutdown requested {0}".format(res)
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def healthcheck(self):
+ """returns the healthcheck results"""
+ req_info = _PolicyWeb._get_request_info(cherrypy.request)
+ audit = Audit(job_name="healthcheck",
+ req_message=req_info, headers=cherrypy.request.headers)
+
+ PolicyWeb.logger.info("%s", req_info)
+
+ res = audit.health()
+
+ PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res))
+
+ audit.audit_done(result=json.dumps(res))
+ return res