aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/PolicyEngine.py1214
-rw-r--r--policyhandler/__init__.py18
-rw-r--r--policyhandler/config.py194
-rw-r--r--policyhandler/deploy_handler.py88
-rw-r--r--policyhandler/discovery.py73
-rw-r--r--policyhandler/onap/CommonLogger.py953
-rw-r--r--policyhandler/onap/__init__.py18
-rw-r--r--policyhandler/onap/audit.py320
-rw-r--r--policyhandler/onap/crypto.py72
-rw-r--r--policyhandler/policy_consts.py28
-rw-r--r--policyhandler/policy_engine.py100
-rw-r--r--policyhandler/policy_handler.py87
-rw-r--r--policyhandler/policy_rest.py342
-rw-r--r--policyhandler/policy_updater.py124
-rw-r--r--policyhandler/web_server.py104
15 files changed, 3735 insertions, 0 deletions
diff --git a/policyhandler/PolicyEngine.py b/policyhandler/PolicyEngine.py
new file mode 100644
index 0000000..df83c78
--- /dev/null
+++ b/policyhandler/PolicyEngine.py
@@ -0,0 +1,1214 @@
+"""
+PolicyEngine API for Python
+
+@author: Tarun, Mike
+@version: 0.9
+@change:
+ added Enum 'Other' Type and supported it in PolicyConfig class - 1/13
+ supporting Remote URL capability to the PolicyEngine as the parameter - 1/13
+ Request format has been updated accordingly. No need to send the PDP URLs anymore - 1/26
+ Feature where the PolicyEngine chooses available PyPDP among different URLs. 1/26
+ Major feature addition required for Notifications 2/17 , Fixed Session issues. 2/25
+ Major change in API structure for combining results 3/4.
+ Added Security support for Notifications and clearNotification Method 3/18
+ newMethod for retrieving configuration using policyFileName 3/20
+ logging 3/24
+ Notification Bug Fixes 7/21
+ basic Auth 7/22
+ Notification Changes 9/3
+ ECOMP Error codes included 10/1
+ -2016
+ Major Changes to the Policy Engine API 3/29
+ DeletePolicy API and Changes to pushPolicy and getConfig 5/27
+ ConfigRequestParmeters and Error code Change 7/11
+ New Environment Variable and Client Authorizations Change 7/21
+ Changes to the Policy Parameters 8/21
+ Allow Clients to use their own Password Protection.
+ Dictionary Types and its Fixes.
+"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import json,sys, os, collections, websocket , logging, time, base64, uuid
+from websocket import create_connection
+from enum import Enum
+from xml.etree.ElementTree import XML
+try:
+ # For Python 3.0 and Later
+ from pip._vendor import requests
+except ImportError:
+ # For backend Support to Python 2's urllib2
+ import requests
+try:
+ # For Python 3.0 and Later
+ from urllib.request import urlopen
+except ImportError:
+ # Fall back for Python 2.*
+ from urllib2 import urlopen
+try:
+ import thread
+except ImportError:
+ #Fall Back for Python 2.x
+ import _thread as thread
+
+PolicyConfigStatus = Enum('PolicyConfigStatus', 'CONFIG_NOT_FOUND CONFIG_RETRIEVED')
+PolicyType = Enum('PolicyType', 'JSON XML PROPERTIES OTHER')
+PolicyResponseStatus = Enum('PolicyResponseStatus', 'ACTION_ADVISED ACTION_TAKEN NO_ACTION_REQUIRED')
+NotificationType = Enum('NotificationType', 'BOTH UPDATE REMOVE')
+UpdateType = Enum('UpdateType', 'UPDATE NEW')
+NotificationScheme = Enum('NotificationScheme', 'AUTO_ALL_NOTIFICATIONS AUTO_NOTIFICATIONS MANUAL_ALL_NOTIFICATIONS MANUAL_NOTIFICATIONS')
+AttributeType = Enum('AttributeType', 'MATCHING MICROSERVICE RULE SETTINGS')
+PolicyClass = Enum('PolicyClass', 'Action Config Decision')
+PolicyConfigType = Enum('PolicyConfigType', 'Base BRMS_PARAM BRMS_RAW ClosedLoop_Fault ClosedLoop_PM Firewall MicroService Extended')
+DeletePolicyCondition = Enum('DeletePolicyCondition', 'ONE ALL')
+RuleProvider = Enum('RuleProvider', 'Custom AAF GUARD_YAML')
+DictionaryType = Enum('DictionaryType', 'Common Action ClosedLoop Firewall Decision BRMS GOC MicroService DescriptiveScope PolicyScope Enforcer SafePolicy' )
+ImportType = Enum('ImportType', 'MICROSERVICE')
+
+class PolicyEngine:
+ """
+ PolicyEngine is the Class which needs to be instantiated to call the PDP server.
+ It needs the *.properties* file path as the parameter to the constructor.
+ """
+ def __init__(self, filename, scheme = None, handler = None, clientKey = None):
+ """
+ @param filename: String format of the path location of .properties file Could also be A remote URL eg: http://localhost:8080/config.properties
+ @param scheme: NotificationScheme to select the scheme required for notification updates.
+ @param handler: NotificationHandler object which will be called when an event is occurred.
+ @param clientKey: Decoded Client Key to be used by PolicyEngine.
+ @attention:The .properties file must contain the PYPDP_URL parameter in it. The parameter can have multiple URLs the PolicyEngine chooses the available PyPDP among them.
+ """
+ self.filename = filename
+ self.urldict = {}
+ self.matchStore = []
+ self.autoURL = None
+ self.scheme = None
+ self.handler = None
+ self.thread = thread
+ self.autows = None
+ self.mclose = False
+ self.restart = False
+ self.logger = logging.getLogger()
+ self.resturl= []
+ self.encoded= []
+ self.clientInfo = None
+ self.environment = None
+ self.policyheader = {}
+ if(filename.startswith("http")):
+ try:
+ policy_data = urlopen(filename)
+ for line in policy_data :
+ line = line.decode('utf-8')
+ line = line.rstrip() # removes trailing whitespace and '\n' chars
+ line = line.replace(" ","") # removing spaces
+ if "=" not in line: continue #skips blanks and comments w/o =
+ if line.startswith("#"): continue #skips comments which contain =
+ key, value = line.split("=",1)
+ key = key.rstrip().lstrip()
+ value = value.lstrip()
+ #print("key= "+key+" Value =" +value )
+ self.urldict[key] = value
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the Config URL: %s" , filename )
+ print("PE300 - Data Issue: Config Properties URL Error")
+ sys.exit(0)
+ else :
+ fileExtension = os.path.splitext(filename)
+ if(fileExtension[1]!=".properties"):
+ self.logger.error("PE300 - Data Issue: File is not in properties format: %s", filename)
+ print("PE300 - Data Issue: Not a .properties file!")
+ sys.exit(0)
+ try :
+ with open(self.filename, 'r') as f:
+ for line in f:
+ line = line.rstrip() # removes trailing whitespace and '\n' chars
+ line = line.replace(" ","") # removing spaces
+ if "=" not in line: continue #skips blanks and comments w/o =
+ if line.startswith("#"): continue #skips comments which contain =
+ key, value = line.split("=",1)
+ key = key.rstrip().lstrip()
+ value = value.lstrip()
+ # self.logger.info("key=%s Value=%s", key, value)
+ self.urldict[key] = value
+ except FileNotFoundError:
+ self.logger.error("PE300 - Data Issue: File Not found: %s", filename)
+ print("PE300 - Data Issue: File Doesn't exist in the given Location")
+ sys.exit(0)
+ #TODO logic for best available PyPDP servers
+ try:
+ self.urldict = collections.OrderedDict(sorted(self.urldict.items()))
+ clientID = self.urldict.get("CLIENT_ID")
+ # self.logger.info("clientID decoded %s", base64.b64decode(clientID).decode("utf-8"))
+ # client_parts = base64.b64decode(clientID).split(":")
+ # client_parts = clientID.split(":")
+ # self.logger.info("ClientAuth:Basic %s", base64.b64encode(clientID))
+ # self.logger.info("CLIENT_ID[0] = %s", client_parts[0])
+ # self.logger.info("CLIENT_ID[0] base64 = %s", base64.b64encode(client_parts[0]))
+ # self.logger.info("CLIENT_KEY base64 = %s", base64.b64encode(client_parts[1]))
+ if(clientKey is None):
+ try:
+ client = base64.b64decode(self.urldict.get("CLIENT_KEY")).decode("utf-8")
+ except Exception:
+ self.logger.warn("PE300 - Data Issue: CLIENT_KEY parameter is not in the required encoded Format taking Value as clear Text")
+ client = self.urldict.get("CLIENT_KEY")
+ else:
+ client = clientKey
+ if(clientID is None or client is None):
+ self.logger.error("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file: %s ", filename)
+ print("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file")
+ sys.exit(0)
+ else:
+ uid = clientID.encode('ascii')
+ password = client.encode('ascii')
+ self.clientInfo = base64.b64encode(uid+ b':'+password).decode('utf-8')
+ self.environment = self.urldict.get("ENVIRONMENT")
+ if(self.environment is None):
+ self.logger.info("Missing Environment Variable setting to Default Value.")
+ self.environment = "DEVL"
+ self.policyheader = {"Content-type" : "application/json", "Accept" : "application/json", "ClientAuth" : "Basic "+self.clientInfo, "Environment" : self.environment}
+ for key in self.urldict.keys():
+ if(key.startswith("PYPDP_URL")):
+ pypdpVal = self.urldict.get(key)
+ if pypdpVal is None:
+ self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
+ print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
+ sys.exit(0)
+ if ";" in pypdpVal:
+ pdpDefault = pypdpVal.split(";")
+ if pdpDefault is None:
+ self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
+ print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
+ sys.exit(0)
+ else:
+ for count in range(0, len(pdpDefault)):
+ self.__pdpParam(pdpDefault[count])
+ else:
+ self.__pdpParam(pypdpVal)
+
+ self.logger.info("PolicyEngine url: %s policyheader: %s urldict: %s", \
+ self.resturl, json.dumps(self.policyheader), json.dumps(self.urldict))
+ if len(self.resturl)==0:
+ self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename)
+ print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file")
+ sys.exit(0)
+ except:
+ self.logger.error("PE300 - Data Issue: missing parameter(s) in the properties file: %s ", filename)
+ print("PE300 - Data Issue: missing parameter(s) in the properties file")
+ sys.exit(0)
+ # Scheme and Handler code to be handled from here.
+ if handler is not None:
+ #if type(handler) is NotificationHandler:
+ self.handler = handler
+ #else:
+ # print("handler should be a object of NotificationHandler class")
+ # sys.exit(0)
+ if scheme is not None:
+ if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
+ # setup the Auto settings.
+ self.scheme = scheme
+ elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
+ # setup the Manual Settings
+ self.scheme = scheme
+ else:
+ self.logger.error("PE300 - Data Issue: Scheme not a type of NotificationScheme: %s", scheme.name)
+ print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
+ sys.exit(0)
+
+ def __pdpParam(self,pdpValue):
+ """
+ Internal Usage for reading PyPDP Parameters
+ """
+ if pdpValue is None:
+ self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
+ print("PE100 - Permissions Error: No Enough Credentials to send Request")
+ sys.exit(0)
+ elif "," in pdpValue:
+ pdpValues = pdpValue.split(",")
+ if (len(pdpValues)==3):
+ # 0 is pypdp URL
+ self.resturl.append(pdpValues[0])
+ # 1 and 2 are user name password
+ if pdpValues[1] and pdpValues[2]:
+ uid = pdpValues[1].encode('ascii')
+ password = pdpValues[2].encode('ascii')
+ encoded = base64.b64encode(uid+ b':'+password).decode('utf-8')
+ self.encoded.append(encoded)
+ else:
+ self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
+ print("PE100 - Permissions Error: No Enough Credentials to send Request")
+ sys.exit(0)
+ else:
+ self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
+ print("PE100 - Permissions Error: No Enough Credentials to send Request")
+ sys.exit(0)
+ else:
+ self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request")
+ print("PE100 - Permissions Error: No Enough Credentials to send Request")
+ sys.exit(0)
+
+ def getConfigByPolicyName(self, policyName, requestID=None):
+ """
+ @param policyName: String format of the PolicyFile Name whose configuration is required.
+ @return: Returns a List of PolicyConfig Object(s).
+ @deprecated: use getConfig instead.
+ """
+ __policyNameURL = "/getConfigByPolicyName"
+ __headers = self.policyheader
+ if requestID is not None:
+ __headers["X-ECOMP-RequestID"] = str(requestID)
+ else:
+ __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
+ self.__policyNamejson = {}
+ self.__policyNamejson['policyName'] = policyName
+ self.__cpnResponse = self.__callPDP(__policyNameURL, json.dumps(self.__policyNamejson), __headers, "POST")
+ self.__cpnJSON = self.__cpnResponse.json()
+ policyConfigs= self.__configResponse(self.__cpnJSON)
+ return policyConfigs
+
+ def listConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
+ """
+ listConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
+ @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
+ @param configName: String of the configName whose configuration is required. Not Mandatory field.
+ @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
+ @param policyName: String of the policyName whose configuration is required.
+ @param unique: Boolean value which can be set to True if Unique results are required.
+ @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
+ @return: Returns a List of PolicyNames.
+ """
+ __configURL = "/listConfig"
+ __headers = self.policyheader
+ if requestID is not None:
+ __headers["X-ECOMP-RequestID"] = str(requestID)
+ else:
+ __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
+ __configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
+ #self.__configjson['pdp_URL'] = self.pdp_url
+ __cResponse = self.__callPDP(__configURL, json.dumps(__configjson), __headers, "POST")
+ return __cResponse.json()
+
+
+ def getConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None):
+ """
+ getConfig function calls the PDP for the configuration required using the parameters and returns the PDP response.
+ @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required.
+ @param configName: String of the configName whose configuration is required. Not Mandatory field.
+ @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field.
+ @param policyName: String of the policyName whose configuration is required.
+ @param unique: Boolean value which can be set to True if Unique results are required.
+ @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
+ @return: Returns a List of PolicyConfig Object(s).
+ """
+ __configURL = "/getConfig"
+ __headers = self.policyheader
+ if requestID is not None:
+ __headers["X-ECOMP-RequestID"] = str(requestID)
+ else:
+ __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
+ self.__configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique)
+ #self.__configjson['pdp_URL'] = self.pdp_url
+ self.__cResponse = self.__callPDP(__configURL, json.dumps(self.__configjson), __headers, "POST")
+ #self.__configURL = self.resturl+__configURL
+ #self.__cResponse = requests.post(self.__configURL, data=json.dumps(self.__configjson), headers = __headers)
+ self.__cJSON = self.__cResponse.json()
+ policyConfigs= self.__configResponse(self.__cJSON)
+ # if we have successfully retrieved a policy we will store the match values.
+ matchFound = False
+ for policyConfig in policyConfigs:
+ if policyConfig._policyConfigStatus == PolicyConfigStatus.CONFIG_RETRIEVED.name:
+ # self.logger.info("Policy has been Retrieved !!")
+ matchFound = True
+ if matchFound:
+ __match = {}
+ __match["ECOMPName"] = eCOMPComponentName
+ if configName is not None:
+ __match["ConfigName"] = configName
+ if configAttributes is not None:
+ __match.update(configAttributes)
+ if not self.matchStore:
+ self.matchStore.append(__match)
+ else:
+ __booMatch = False
+ for eachDict in self.matchStore:
+ if eachDict==__match:
+ __booMatch = True
+ break
+ if __booMatch==False:
+ self.matchStore.append(__match)
+ return policyConfigs
+
+ def __configRequestParametersJSON(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False):
+ """ Internal Function to set JSON from configRequestParameters
+ """
+ json= {}
+ if eCOMPComponentName is not None:
+ json['ecompName'] = eCOMPComponentName
+ if configName is not None:
+ json['configName'] = configName
+ if configAttributes is not None:
+ json['configAttributes'] = configAttributes
+ if policyName is not None:
+ json['policyName'] = policyName
+ json['unique'] = unique
+ return json
+
+ def __configResponse(self, cJSON):
+ """
+ Internal function to take the convert JSON to Response Object.
+ """
+ policyConfigs=[]
+ for configJSON in cJSON:
+ policyConfig = PolicyConfig()
+ policyConfig._policyConfigMessage = configJSON['policyConfigMessage']
+ policyConfig._policyConfigStatus = configJSON['policyConfigStatus']
+ policyConfig._policyType = configJSON['type']
+ policyConfig._policyName = configJSON['policyName']
+ policyConfig._policyVersion = configJSON['policyVersion']
+ policyConfig._matchingConditions = configJSON['matchingConditions']
+ policyConfig._responseAttributes = configJSON['responseAttributes']
+ if PolicyType.JSON.name == policyConfig._policyType:
+ policyConfig._json = configJSON['config']
+ elif PolicyType.XML.name == policyConfig._policyType:
+ policyConfig._xml = XML(configJSON['config'])
+ elif PolicyType.PROPERTIES.name == policyConfig._policyType:
+ policyConfig._properties = configJSON['property']
+ elif PolicyType.OTHER.name == policyConfig._policyType:
+ policyConfig._other = configJSON['config']
+ policyConfigs.append(policyConfig)
+ return policyConfigs
+
+ def getDecision(self, decisionAttributes, ecompcomponentName, requestID = None):
+ """
+ getDecision function sends the Decision Attributes to the PDP server and gets the response to the client from PDP.
+ @param decisionAttributes: Dictionary of Decision Attributes in Key and Value String formats.
+ @param ecompcomponentName:
+ @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
+ @return: Returns a DecisionResponse Object.
+ """
+ __decisionurl = "/getDecision"
+ __headers = self.policyheader
+ if requestID is not None:
+ __headers["X-ECOMP-RequestID"] = str(requestID)
+ else:
+ __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
+ self.__decisionjson={}
+ self.__decisionjson['decisionAttributes'] = decisionAttributes
+ self.__decisionjson['ecompcomponentName'] = ecompcomponentName
+ self.__dResponse = self.__callPDP(__decisionurl, json.dumps(self.__decisionjson), __headers, "POST")
+ self.__dJSON = self.__dResponse.json()
+ decisionResponse = DecisionResponse()
+ decisionResponse._decision = self.__dJSON['decision']
+ decisionResponse._details = self.__dJSON['details']
+ return decisionResponse
+
+ def sendEvent(self, eventAttributes, requestID=None):
+ """
+ sendEvent function sends the Event to the PDP server and gets the response to the client from the PDP.
+ @param eventAttributes:Dictonary of the EventAttributes in Key and Value String formats.
+ @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
+ @return: Returns a List of PolicyResponse Object(s).
+ """
+ __eventurl = "/sendEvent"
+ __headers = self.policyheader
+ if requestID is not None:
+ __headers["X-ECOMP-RequestID"] = str(requestID)
+ else:
+ __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
+ self.__eventjson = {}
+ self.__eventjson['eventAttributes'] = eventAttributes
+ #self.__eventjson['pdp_URL'] = self.pdp_url
+ self.__eResponse = self.__callPDP(__eventurl, json.dumps(self.__eventjson), __headers, "POST")
+ #self.__eventurl = self.resturl+__eventurl
+ #self.__eResponse = requests.post(self.__eventurl, data=json.dumps(self.__eventjson), headers = __headers)
+ self.__eJSON = self.__eResponse.json()
+ policyResponses=[]
+ for eventJSON in self.__eJSON:
+ policyResponse = PolicyResponse()
+ policyResponse._policyResponseMessage = eventJSON['policyResponseMessage']
+ policyResponse._policyResponseStatus = eventJSON['policyResponseStatus']
+ policyResponse._actionAdvised = eventJSON['actionAdvised']
+ policyResponse._actionTaken = eventJSON['actionTaken']
+ policyResponse._requestAttributes = eventJSON['requestAttributes']
+ policyResponses.append(policyResponse)
+ return policyResponses
+
+ def createPolicy(self, policyParameters):
+ """
+ 'createPolicy creates Policy using the policyParameters sent'
+ @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
+ @return: Returns a PolicyChangeResponse Object
+ """
+ __createurl = "/createPolicy"
+ __headers = self.policyheader
+ try:
+ if policyParameters._requestID is None:
+ policyParameters._requestID = str(uuid.uuid4())
+ self.__createJson = {}
+ self.__createJson = self.__policyParametersJSON(policyParameters)
+ self.__createResponse = self.__callPDP(__createurl, json.dumps(self.__createJson), __headers, "PUT")
+ policyChangeResponse = PolicyChangeResponse()
+ policyChangeResponse._responseCode = self.__createResponse.status_code
+ policyChangeResponse._responseMessage = self.__createResponse.text
+ return policyChangeResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
+ print("PE300 - Data Issue: policyParamters object Error")
+
+ def updatePolicy(self, policyParameters):
+ """
+ 'updatePolicy updates Policy using the policyParameters sent.'
+ @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method.
+ @return: Returns a PolicyChangeResponse Object
+ """
+ __updateurl = "/updatePolicy"
+ __headers = self.policyheader
+ try:
+ if policyParameters._requestID is None:
+ policyParameters._requestID = str(uuid.uuid4())
+ self.__updateJson = {}
+ self.__updateJson = self.__policyParametersJSON(policyParameters)
+ self.__updateResponse = self.__callPDP(__updateurl, json.dumps(self.__updateJson), __headers, "PUT")
+ policyChangeResponse = PolicyChangeResponse()
+ policyChangeResponse._responseCode = self.__updateResponse.status_code
+ policyChangeResponse._responseMessage = self.__updateResponse.text
+ return policyChangeResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ")
+ print("PE300 - Data Issue: policyParamters object Error")
+
+ def __policyParametersJSON(self, policyParameters):
+ """ Internal Function to set JSON from policyParameters Object
+ """
+ json= {}
+ if policyParameters._actionAttribute is not None:
+ json['actionAttribute'] = policyParameters._actionAttribute
+ if policyParameters._actionPerformer is not None:
+ json['actionPerformer'] = policyParameters._actionPerformer
+ if policyParameters._attributes is not None:
+ json['attributes'] = policyParameters._attributes
+ if policyParameters._configBody is not None:
+ json['configBody'] = policyParameters._configBody
+ if policyParameters._configBodyType is not None:
+ json['configBodyType'] = policyParameters._configBodyType
+ if policyParameters._configName is not None:
+ json['configName'] = policyParameters._configName
+ if policyParameters._controllerName is not None:
+ json['controllerName'] = policyParameters._controllerName
+ if policyParameters._dependencyNames is not None:
+ json['dependencyNames'] = policyParameters._dependencyNames
+ if policyParameters._dynamicRuleAlgorithmLabels is not None:
+ json['dynamicRuleAlgorithmLabels'] = policyParameters._dynamicRuleAlgorithmLabels
+ if policyParameters._dynamicRuleAlgorithmField1 is not None:
+ json['dynamicRuleAlgorithmField1'] = policyParameters._dynamicRuleAlgorithmField1
+ if policyParameters._dynamicRuleAlgorithmField2 is not None:
+ json['dynamicRuleAlgorithmField2'] = policyParameters._dynamicRuleAlgorithmField2
+ if policyParameters._dynamicRuleAlgorithmFunctions is not None:
+ json['dynamicRuleAlgorithmFunctions'] = policyParameters._dynamicRuleAlgorithmFunctions
+ if policyParameters._ecompName is not None:
+ json['ecompName'] = policyParameters._ecompName
+ if policyParameters._extendedOption is not None:
+ json['extendedOption'] = policyParameters._extendedOption
+ if policyParameters._guard is not None:
+ json['guard'] = policyParameters._guard
+ if policyParameters._policyClass is not None:
+ json['policyClass'] = policyParameters._policyClass
+ if policyParameters._policyConfigType is not None:
+ json['policyConfigType'] = policyParameters._policyConfigType
+ if policyParameters._policyName is not None:
+ json['policyName'] = policyParameters._policyName
+ if policyParameters._policyDescription is not None:
+ json['policyDescription'] = policyParameters._policyDescription
+ if policyParameters._priority is not None:
+ json['priority'] = policyParameters._priority
+ if policyParameters._requestID is not None:
+ json['requestID'] = policyParameters._requestID
+ if policyParameters._riskLevel is not None:
+ json['riskLevel'] = policyParameters._riskLevel
+ if policyParameters._riskType is not None:
+ json['riskType'] = policyParameters._riskType
+ if policyParameters._ruleProvider is not None:
+ json['ruleProvider'] = policyParameters._ruleProvider
+ if policyParameters._ttlDate is not None:
+ json['ttlDate'] = policyParameters._ttlDate
+ return json
+
+ def pushPolicy(self, pushPolicyParameters, requestID = None):
+ """
+ 'pushPolicy pushes a policy based on the given Push Policy Parameters. '
+ @param pushPolicyParameters: This is an object of PushPolicyParameters which is required as a parameter to this method.
+ @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated.
+ @return: Returns a PolicyChangeResponse Object
+ """
+ __pushurl = "/pushPolicy"
+ __headers = self.policyheader
+ if requestID is not None:
+ __headers["X-ECOMP-RequestID"] = str(requestID)
+ else:
+ __headers["X-ECOMP-RequestID"] = str(uuid.uuid4())
+ try:
+ self.__pushJson = {}
+ self.__pushJson['pdpGroup'] = pushPolicyParameters._pdpGroup
+ self.__pushJson['policyName'] = pushPolicyParameters._policyName
+ self.__pushJson['policyType'] = pushPolicyParameters._policyType
+ self.__pushResponse = self.__callPDP(__pushurl, json.dumps(self.__pushJson), __headers, "PUT")
+ policyChangeResponse = PolicyChangeResponse()
+ policyChangeResponse._responseCode = self.__pushResponse.status_code
+ policyChangeResponse._responseMessage = self.__pushResponse.text
+ return policyChangeResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the pushPolicyParameters Object. It needs to be object of PushPolicyParameters ")
+ print("PE300 - Data Issue: pushPolicyParamters object Error")
+
+ def deletePolicy(self, deletePolicyParameters):
+ """
+ 'deletePolicy Deletes a policy or all its version according to the given deletePolicyParameters'
+ @param deletePolicyParameters: This is an Object of DeletePolicyParameters which is required as a parameter to this method.
+ @return: Returns a PolicyChangeResponse Object
+ """
+ __deleteurl = "/deletePolicy"
+ __createdictionaryurl = "/createDictionaryItem"
+ __headers = self.policyheader
+ try:
+ if deletePolicyParameters._requestID is None:
+ deletePolicyParameters._requestID = str(uuid.uuid4())
+ self.__deleteJson = {}
+ self.__deleteJson['deleteCondition'] = deletePolicyParameters._deleteCondition
+ self.__deleteJson['pdpGroup'] = deletePolicyParameters._pdpGroup
+ self.__deleteJson['policyComponent'] = deletePolicyParameters._policyComponent
+ self.__deleteJson['policyName'] = deletePolicyParameters._policyName
+ self.__deleteJson['policyType'] = deletePolicyParameters._policyType
+ self.__deleteJson['requestID'] = deletePolicyParameters._requestID
+ self.__deleteResponse = self.__callPDP(__deleteurl, json.dumps(self.__deleteJson), self.policyheader, "DELETE")
+ policyChangeResponse = PolicyChangeResponse()
+ policyChangeResponse._responseCode = self.__deleteResponse.status_code
+ policyChangeResponse._responseMessage = self.__deleteResponse.text
+ return policyChangeResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the deletePolicyParameters Object. It needs to be object of DeletePolicyParameters ")
+ print("PE300 - Data Issue: deletePolicyParameters object Error")
+
+ def createDictionaryItems(self, dictionaryParameters):
+ """
+ 'createDictionaryItems adds dictionary items to the database for a specific dictionary'
+ @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
+ @return: Returns a DictionaryResponse object
+ """
+ __createdictionaryurl = '/createDictionaryItem'
+ __headers = self.policyheader
+ try:
+ if dictionaryParameters._requestID is None:
+ dictionaryParameters._requestID = str(uuid.uuid4())
+ self.__json={}
+ self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
+ self.__json['dictionary'] = dictionaryParameters._dictionary
+ self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
+ self.__json['requestID'] = dictionaryParameters._requestID
+ self.__createResponse = self.__callPDP(__createdictionaryurl, json.dumps(self.__json), __headers, "PUT")
+ dictionaryResponse = DictionaryResponse()
+ dictionaryResponse._responseCode = self.__createResponse.status_code
+ dictionaryResponse._responseMessage = self.__createResponse.text
+ return dictionaryResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
+ print("PE300 - Data Issue: dictionaryParameters object Error")
+
+
+ def updateDictionaryItems(self, dictionaryParameters):
+ """
+ 'updateDictionaryItems edits dictionary items in the database for a specific dictionary'
+ @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method
+ @return: Returns a DictionaryResponse object
+ """
+ __updatedictionaryurl = '/updateDictionaryItem'
+ __headers = self.policyheader
+ try:
+ if dictionaryParameters._requestID is None:
+ dictionaryParameters._requestID = str(uuid.uuid4())
+ self.__json={}
+ self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
+ self.__json['dictionary'] = dictionaryParameters._dictionary
+ self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson
+ self.__json['requestID'] = dictionaryParameters._requestID
+ self.__updateResponse = self.__callPDP(__updatedictionaryurl, json.dumps(self.__json), __headers, "PUT")
+ dictionaryResponse = DictionaryResponse()
+ dictionaryResponse._responseCode = self.__updateResponse.status_code
+ dictionaryResponse._responseMessage = self.__updateResponse.text
+ return dictionaryResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
+ print("PE300 - Data Issue: dictionaryParameters object Error")
+
+ def getDictionaryItems(self, dictionaryParameters):
+ """
+ 'getDictionaryItems gets all the dictionary items stored in the database for a specified dictionary'
+ @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method.
+ @return: Returns a DictionaryResponse object
+ """
+ __retrievedictionaryurl = "/getDictionaryItems"
+ __headers = self.policyheader
+ try:
+ if dictionaryParameters._requestID is None:
+ dictionaryParameters._requestID = str(uuid.uuid4())
+ self.__json = {}
+ self.__json['dictionaryType'] = dictionaryParameters._dictionaryType
+ self.__json['dictionary'] = dictionaryParameters._dictionary
+ self.__json['requestID'] = dictionaryParameters._requestID
+ self.__getResponse = self.__callPDP(__retrievedictionaryurl, json.dumps(self.__json), __headers, "POST")
+ dictionaryResponse = DictionaryResponse()
+ dictionaryResponse._responseCode = self.__getResponse.status_code
+ dictionaryResponse._responseMessage = self.__getResponse.text
+ return dictionaryResponse
+ except:
+ self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ")
+ print("PE300 - Data Issue: dictionaryParameters object Error")
+
+ def getNotification(self):
+ """
+ gets the PDPNotification if the appropriate NotificationScheme is selected.
+ @return: Returns a PDPNotification Object.
+ """
+ if ((self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
+ # Manual Client for websocket Code in here.
+ if(self.resturl[0].startswith("https")):
+ __man_url = self.resturl[0].replace("https","wss")+"notifications"
+ else:
+ __man_url = self.resturl[0].replace("http","ws")+"notifications"
+ __result = self.__manualRequest(__man_url)
+ self.logger.debug("Manual Notification with server: %s \n result is: %s" , __man_url , __result)
+ # TODO convert the result to PDP Notifications.
+ if (self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name):
+ # need to add all the values to the PDPNotification..
+ pDPNotification = PDPNotification()
+ boo_Remove = False
+ boo_Update = False
+ if __result is None:
+ return None
+ if __result['removedPolicies']:
+ removedPolicies = []
+ for removed in __result['removedPolicies']:
+ removedPolicy = RemovedPolicy()
+ removedPolicy._policyName = removed['policyName']
+ removedPolicy._policyVersion = removed['versionNo']
+ removedPolicies.append(removedPolicy)
+ pDPNotification._removedPolicies= removedPolicies
+ boo_Remove = True
+ if __result['loadedPolicies']:
+ updatedPolicies = []
+ for updated in __result['loadedPolicies']:
+ updatedPolicy = LoadedPolicy()
+ updatedPolicy._policyName = updated['policyName']
+ updatedPolicy._policyVersion = updated['versionNo']
+ updatedPolicy._matchingConditions = updated['matches']
+ updatedPolicy._updateType = updated['updateType']
+ updatedPolicies.append(updatedPolicy)
+ pDPNotification._loadedPolicies= updatedPolicies
+ boo_Update = True
+ if (boo_Update and boo_Remove):
+ pDPNotification._notificationType = NotificationType.BOTH.name
+ elif boo_Update:
+ pDPNotification._notificationType = NotificationType.UPDATE.name
+ elif boo_Remove:
+ pDPNotification._notificationType = NotificationType.REMOVE.name
+ return pDPNotification
+ elif (self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name):
+ return self.__checkNotification(__result)
+ else:
+ return None
+
+ def setNotification(self, scheme, handler = None):
+ """
+ setNotification allows changes to the NotificationScheme and the NotificationHandler.
+ @param scheme: NotificationScheme to select the scheme required for notification updates.
+ @param handler: NotificationHandler object which will be called when an event is occurred.
+ """
+ if handler is not None:
+ #if type(handler) is NotificationHandler:
+ self.handler = handler
+ #else:
+ # print("Error: handler should be a object of NotificationHandler class")
+ if scheme is not None:
+ if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
+ # setup the Auto settings.
+ self.scheme = scheme
+ self.__startAuto()
+ elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)):
+ # setup the Manual Settings
+ self.scheme = scheme
+ else:
+ print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ")
+
+ def clearNotification(self):
+ """
+ clearNotification ShutsDown the AutoNotification service if running.
+ """
+ if self.scheme is not None:
+ if((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
+ if self.autows.sock is not None:
+ if(self.autows.sock.connected):
+ self.mclose = True
+ self.autows.close()
+ self.logger.info("Notification Service Stopped.")
+ print("Notification Service is Stopped!!")
+
+ def __callPDP(self,urlFunction, jsonData, headerData,method, files= None, params = None):
+ """
+ This function call is for internal usage purpose only.
+ Calls the available PyPDP
+ """
+ connected = False
+ response = None
+ errormessage = ''
+ for count in range(0, len(self.resturl)):
+ try:
+ logging.basicConfig(level=logging.DEBUG)
+ request_url = self.resturl[0]+ urlFunction
+ self.logger.debug("--- Sending Request to : %s",request_url)
+ try:
+ self.logger.debug("Request ID %s :",headerData["X-ECOMP-RequestID"])
+ except:
+ if jsonData is not None:
+ self.logger.debug("Request ID %s :",json.loads(jsonData)['requestID'])
+ self.logger.debug("Request Data is: %s" ,jsonData)
+ headerData["Authorization"]= "Basic " + self.encoded[0]
+ if(method=="PUT"):
+ response = requests.put(request_url, data = jsonData, headers = headerData)
+ elif(method=="DELETE"):
+ response = requests.delete(request_url, data = jsonData, headers = headerData)
+ elif(method=="POST"):
+ if params is not None:
+ # files = files, params = params,
+ response = requests.post(request_url, params = params, headers = headerData)
+ else:
+ response = requests.post(request_url, data = jsonData, headers = headerData)
+ # when using self-signed server certificate, comment previous line and uncomment following:
+ #response = requests.post(request_url, data = jsonData, headers = headerData, verify=False)
+ self.logger.debug("--- Response is : ---")
+ self.logger.debug(response.status_code)
+ self.logger.debug(response.headers)
+ self.logger.debug(response.text)
+ if(response.status_code == 200) :
+ connected = True
+ self.logger.info("connected to the PyPDP: %s", request_url)
+ break
+ elif(response.status_code == 202) :
+ connected = True
+ break
+ elif(response.status_code == 400):
+ self.logger.debug("PE400 - Schema Issue: Incorrect Params passed: %s %s", self.resturl[0], response.status_code)
+ errormessage+="\n PE400 - Schema Issue: Incorrect Params passed: "+ self.resturl[0]
+ self.__rotatePDP()
+ elif(response.status_code == 401):
+ self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
+ errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
+ self.__rotatePDP()
+ elif(response.status_code == 403):
+ self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
+ errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0]
+ self.__rotatePDP()
+ else:
+ self.logger.debug("PE200 - System Error: PyPDP Error: %s %s", self.resturl[0], response.status_code)
+ errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
+ self.__rotatePDP()
+ except Exception as e:
+ print(str(e));
+ self.logger.debug("PE200 - System Error: PyPDP Error: %s", self.resturl[0])
+ errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0]
+ self.__rotatePDP()
+ if(connected):
+ if(self.autoURL==None):
+ self.__startAuto()
+ elif(self.autoURL!= self.resturl[0]):
+ self.__startAuto()
+ return response
+ else:
+ self.logger.error("PE200 - System Error: cannot connect to given PYPDPServer(s) %s", self.resturl)
+ print(errormessage)
+ sys.exit(0)
+
+ def __rotatePDP(self):
+ self.resturl = collections.deque(self.resturl)
+ self.resturl.rotate(-1)
+ self.encoded = collections.deque(self.encoded)
+ self.encoded.rotate(-1)
+
+ def __checkNotification(self, resultJson):
+ """
+ This function call is for Internal usage purpose only.
+ Checks the Notification JSON compares it with the MatchStore and returns the PDPNotification object.
+ """
+ if not resultJson:
+ return None
+ if not self.matchStore:
+ return None
+ pDPNotification = PDPNotification()
+ boo_Remove = False
+ boo_Update = False
+ if resultJson['removedPolicies']:
+ removedPolicies = []
+ for removed in resultJson['removedPolicies']:
+ removedPolicy = RemovedPolicy()
+ removedPolicy._policyName = removed['policyName']
+ removedPolicy._policyVersion = removed['versionNo']
+ removedPolicies.append(removedPolicy)
+ pDPNotification._removedPolicies= removedPolicies
+ boo_Remove = True
+ if resultJson['updatedPolicies']:
+ updatedPolicies = []
+ for updated in resultJson['updatedPolicies']:
+ updatedPolicy = LoadedPolicy()
+ # check if it has matches then it is a Config Policy and compare it with Match Store.
+ if updated['matches']:
+ # compare the matches with our Stored Matches
+ for eachDict in self.matchStore:
+ if eachDict==updated['matches']:
+ updatedPolicy._policyName = updated['policyName']
+ updatedPolicy._policyVersion = updated['versionNo']
+ updatedPolicy._matchingConditions = updated['matches']
+ updatedPolicy._updateType = updated['updateType']
+ updatedPolicies.append(updatedPolicy)
+ boo_Update = True
+ else:
+ updatedPolicy._policyName = updated['policyName']
+ updatedPolicy._policyVersion = updated['versionNo']
+ updatedPolicies.append(updatedPolicy)
+ boo_Update = True
+ pDPNotification._loadedPolicies= updatedPolicies
+ if (boo_Update and boo_Remove):
+ pDPNotification._notificationType = NotificationType.BOTH.name
+ elif boo_Update:
+ pDPNotification._notificationType = NotificationType.UPDATE.name
+ elif boo_Remove:
+ pDPNotification._notificationType = NotificationType.REMOVE.name
+ return pDPNotification
+
+ def __startAuto(self):
+ """
+ Starts the Auto Notification Feature..
+ """
+ if self.scheme is not None:
+ if ((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)):
+ if self.handler is None:
+ if self.autows.sock is not None:
+ if(self.autows.sock.connected):
+ self.mclose= True
+ self.autows.close()
+ else:
+ if self.autoURL is None:
+ self.autoURL = self.resturl[0]
+ elif self.autoURL != self.resturl[0]:
+ self.autoURL = self.resturl[0]
+ if self.autows.sock is not None:
+ if(self.autows.sock.connected):
+ self.mclose= True
+ self.autows.close()
+ else:
+ self.autows = None
+ if self.autows is None:
+ if(self.autoURL.startswith("https")):
+ __auto_url = self.autoURL.replace("https","wss")+"notifications"
+ else:
+ __auto_url = self.autoURL.replace("http","ws")+"notifications"
+ def run(*args):
+ self.__autoRequest(__auto_url)
+ self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
+ self.thread.start_new_thread(run , ())
+ elif self.autows.sock is not None:
+ if not (self.autows.sock.connected):
+ self.mclose = True
+ self.autows.close()
+ self.restart = True
+ self.__rotatePDP()
+ if(self.autoURL.startswith("https")):
+ __auto_url = self.autoURL.replace("https","wss")+"notifications"
+ else:
+ __auto_url = self.autoURL.replace("http","ws")+"notifications"
+ def run(*args):
+ self.__autoRequest(__auto_url)
+ self.logger.info("Starting AutoNotification Service with : %s" , __auto_url)
+ self.thread.start_new_thread(run , ())
+
+ else:
+ #stop the Auto Notification Service if it is running.
+ if self.autows.sock is not None:
+ if(self.autows.sock.connected):
+ self.mclose= True
+ self.autows.close()
+
+ def __onEvent(self, message):
+ """
+ Handles the event Notification received.
+ """
+ message = json.loads(message)
+ if self.handler is not None:
+ if (self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name):
+ pDPNotification = PDPNotification()
+ boo_Remove = False
+ boo_Update = False
+ if message['removedPolicies']:
+ removedPolicies = []
+ for removed in message['removedPolicies']:
+ removedPolicy = RemovedPolicy()
+ removedPolicy._policyName = removed['policyName']
+ removedPolicy._policyVersion = removed['versionNo']
+ removedPolicies.append(removedPolicy)
+ pDPNotification._removedPolicies= removedPolicies
+ boo_Remove = True
+ if message['loadedPolicies']:
+ updatedPolicies = []
+ for updated in message['loadedPolicies']:
+ updatedPolicy = LoadedPolicy()
+ updatedPolicy._policyName = updated['policyName']
+ updatedPolicy._policyVersion = updated['versionNo']
+ updatedPolicy._matchingConditions = updated['matches']
+ updatedPolicy._updateType = updated['updateType']
+ updatedPolicies.append(updatedPolicy)
+ pDPNotification._loadedPolicies= updatedPolicies
+ boo_Update = True
+ if (boo_Update and boo_Remove):
+ pDPNotification._notificationType = NotificationType.BOTH.name
+ elif boo_Update:
+ pDPNotification._notificationType = NotificationType.UPDATE.name
+ elif boo_Remove:
+ pDPNotification._notificationType = NotificationType.REMOVE.name
+ # call the Handler.
+ self.handler.notificationReceived(pDPNotification)
+ elif (self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name):
+ # call the handler
+ self.handler(self.__checkNotification(message))
+
+ def __manualRequest(self,request_url):
+ """
+ Takes the request_URL given and returns the JSON response back to the Caller.
+ """
+ ws = create_connection(request_url)
+ # when using self-signed server certificate, comment previous line and uncomment following:
+ #ws = create_connection(request_url, sslopt={"cert_reqs": ssl.CERT_NONE})
+ ws.send("Manual")
+ try:
+ return json.loads(ws.recv())
+ except:
+ return None
+ ws.close()
+ ws.shutdown()
+
+ def __onMessage(self, ws,message):
+ """Occurs on Event
+ """
+ self.logger.info("Received AutoNotification message: %s" , message)
+ self.__onEvent(message)
+
+ def __onError(self, ws, error):
+ """Self Restart the Notification Service on Error
+ """
+ self.logger.error("PE500 - Process Flow Issue: Auto Notification service Error!! : %s" , error)
+
+ def __onclose(self, ws):
+ """Occurs on Close ? Try to start again in case User didn't do it.
+ """
+ self.logger.debug("Connection has been Closed. ")
+ if not self.mclose:
+ self.__startAuto()
+ self.mclose = False
+
+ def __autoRequest(self, request_url):
+ """
+ Takes the request_URL and invokes the PolicyEngine method on any receiving a Message
+ """
+ websocket.enableTrace(True)
+ self.autows = websocket.WebSocketApp(request_url, on_message= self.__onMessage, on_close= self.__onclose, on_error= self.__onError)
+ # wait for to 5 seconds to restart
+ if self.restart:
+ time.sleep(5)
+ self.autows.run_forever()
+
+class NotificationHandler:
+ """
+ 'Defines the methods which need to run when an Event or a Notification is received.'
+ """
+ def notificationReceived(self, notification):
+ """
+ Will be triggered automatically whenever a Notification is received by the PEP
+ @param notification: PDPNotification object which has the information of the Policies.
+ @attention: This method must be implemented by the user for AUTO type NotificationScheme
+ """
+ raise Exception("Unimplemented abstract method: %s" % __functionId(self, 1))
+
+def __functionId(obj, nFramesUp):
+ """ Internal Usage only..
+ Create a string naming the function n frames up on the stack. """
+ fr = sys._getframe(nFramesUp+1)
+ co = fr.f_code
+ return "%s.%s" % (obj.__class__, co.co_name)
+
+class PolicyConfig:
+ """
+ 'PolicyConfig is the return object resulted by getConfig Call.'
+ """
+ def __init__(self):
+ self._policyConfigMessage = None
+ self._policyConfigStatus = None
+ self._policyName = None
+ self._policyVersion = None
+ self._matchingConditions = None
+ self._responseAttributes = None
+ self._policyType = None
+ self._json = None
+ self._xml = None
+ self._prop = None
+ self._other = None
+
+class PolicyResponse:
+ """
+ 'PolicyResponse is the return object resulted by sendEvent Call.'
+ """
+ def __init__(self):
+ self._policyResponseStatus = None
+ self._policyResponseMessage = None
+ self._requestAttributes = None
+ self._actionTaken = None
+ self._actionAdvised= None
+
+class PDPNotification:
+ """
+ 'Defines the Notification Event sent from the PDP to PEP Client.'
+ """
+ def __init__(self):
+ self._removedPolicies = None
+ self._loadedPolicies = None
+ self._notificationType = None
+
+class RemovedPolicy:
+ """
+ 'Defines the structure of the Removed Policy'
+ """
+ def __init__(self):
+ self._policyName = None
+ self._policyVersion = None
+
+class LoadedPolicy:
+ """
+ 'Defines the Structure of the Loaded Policy'
+ """
+ def __init__(self):
+ self._policyName = None
+ self._policyVersion = None
+ self._matchingConditions = None
+ self._updateType = None
+
+class PolicyParameters:
+ """
+ 'Defines the Structure of the Policy to Create or Update'
+ """
+ def __init__(self):
+ self._actionPerformer = None
+ self._actionAttribute = None
+ self._attributes = None
+ self._configBody = None
+ self._configBodyType = None
+ self._configName = None
+ self._controllerName = None
+ self._dependencyNames = None
+ self._dynamicRuleAlgorithmLabels = None
+ self._dynamicRuleAlgorithmFunctions = None
+ self._dynamicRuleAlgorithmField1 = None
+ self._dynamicRuleAlgorithmField2 = None
+ self._ecompName = None
+ self._extendedOption = None
+ self._guard = None
+ self._policyClass = None
+ self._policyConfigType = None
+ self._policyName = None
+ self._policyDescription = None
+ self._priority = None
+ self._requestID = None
+ self._riskLevel = None
+ self._riskType = None
+ self._ruleProvider = None
+ self._ttlDate = None
+
+class PushPolicyParameters:
+ """
+ 'Defines the Structure of the Push Policy Parameters'
+ """
+ def __init__(self):
+ self._pdpGroup = None
+ self._policyName = None
+ self._policyType = None
+
+class PolicyChangeResponse:
+ """
+ 'Defines the Structure of the policy Changes made from PDP'
+ """
+ def __init__(self):
+ self._responseMessage = None
+ self._responseCode = None
+
+class DeletePolicyParameters:
+ """
+ 'Defines the Structure of the Delete Policy Parameters'
+ """
+ def __init__(self):
+ self._deleteCondition = None
+ self._pdpGroup = None
+ self._policyComponent = None
+ self._policyName = None
+ self._policyType = None
+ self._requestID = None
+
+class DictionaryParameters:
+ """
+ 'Defines the Structure of the Dictionary Parameters'
+ """
+ def __init__(self):
+ self._dictionaryType = None
+ self._dictionary = None
+ self._dictionaryJson = None
+ self._requestID = None
+
+class DictionaryResponse:
+ """
+ 'Defines the Structure of the dictionary response'
+ """
+ def __init__(self):
+ self._responseMessage = None
+ self._responseCode = None
+ self._dictionaryJson = None
+ self._dictionaryData = None
+
+class DecisionResponse:
+ """
+ 'Defines the Structure of Decision Response'
+ """
+ def __init__(self):
+ self._decision = None
+ self._details = None
+
+class ImportParameters:
+ """
+ 'Defines the Structure of Policy Model Import'
+ """
+ def __init__(self):
+ self._serviceName = None
+ self._description = None
+ self._requestID = None
+ self._filePath = None
+ self._importBody = None
+ self._version = None
+ self._importType = None
diff --git a/policyhandler/__init__.py b/policyhandler/__init__.py
new file mode 100644
index 0000000..a3220c4
--- /dev/null
+++ b/policyhandler/__init__.py
@@ -0,0 +1,18 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
diff --git a/policyhandler/config.py b/policyhandler/config.py
new file mode 100644
index 0000000..ea10167
--- /dev/null
+++ b/policyhandler/config.py
@@ -0,0 +1,194 @@
+"""read and use the config"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import os
+import json
+import copy
+import base64
+import logging
+
+from .discovery import DiscoveryClient
+from .onap.crypto import Cipher
+
+logging.basicConfig(
+ filename='logs/policy_handler.log', \
+ format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \
+ '%(threadName)s %(name)s.%(funcName)s: %(message)s', \
+ datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG)
+
+class Config(object):
+ """main config of the application"""
+ CONFIG_FILE_PATH = "etc/config.json"
+ LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config"
+ UPLOAD_CONFIG_FILE_PATH = "etc_upload/config.json"
+ SERVICE_NAME_POLICY_HANDLER = "policy_handler"
+ FIELD_SYSTEM = "system"
+ FIELD_CONFIG_PWD = "config_pwd"
+ FIELD_WSERVICE_PORT = "wservice_port"
+ FIELD_POLICY_ENGINE = "policy_engine"
+ CRYPTED_FIELDS = ["ClientAuth", "Authorization", "config_pwd"]
+ config_pwd = "donottell150&$"
+ wservice_port = 25577
+ _logger = logging.getLogger("policy_handler.config")
+ config = None
+
+ @staticmethod
+ def merge(new_config):
+ """merge the new_config into current config - override the values"""
+ if not new_config:
+ return
+
+ if not Config.config:
+ Config.config = new_config
+ return
+
+ new_config = copy.deepcopy(new_config)
+ Config.config.update(new_config)
+
+ @staticmethod
+ def decrypt_secret_value(field_name, field_value):
+ """decrypt the value of the secret field"""
+ if field_name in Config.CRYPTED_FIELDS and isinstance(field_value, basestring):
+ return Cipher.decrypt(Config.config_pwd, field_value)
+ return field_value
+
+ @staticmethod
+ def encrypt_secret_value(field_name, field_value):
+ """encrypt the value of the secret field"""
+ if field_name in Config.CRYPTED_FIELDS and isinstance(field_value, basestring):
+ return Cipher.encrypt(Config.config_pwd, field_value)
+ return field_value
+
+ @staticmethod
+ def update_tree_leaves(tree_element, func_on_leaf):
+ """traverse through json tree and apply function func_on_leaf to each leaf"""
+ if not tree_element:
+ return
+
+ for field_name in tree_element:
+ field_value = func_on_leaf(field_name, tree_element[field_name])
+ tree_element[field_name] = field_value
+ if isinstance(field_value, dict):
+ Config.update_tree_leaves(field_value, func_on_leaf)
+
+ @staticmethod
+ def get_system_name():
+ """find the name of the policy-handler system
+ to be used as the key in consul-kv for config of policy-handler
+ """
+ system_name = None
+ if Config.config:
+ system_name = Config.config.get(Config.FIELD_SYSTEM)
+
+ return system_name or Config.SERVICE_NAME_POLICY_HANDLER
+
+ @staticmethod
+ def discover():
+ """bring and merge the config settings from the discovery service"""
+ discovery_key = Config.get_system_name()
+ new_config = DiscoveryClient.get_value(discovery_key)
+
+ if not new_config or not isinstance(new_config, dict):
+ Config._logger.warn("unexpected config from discovery: %s", new_config)
+ return
+
+ Config._logger.debug("loaded config from discovery(%s): %s", \
+ discovery_key, json.dumps(new_config))
+ Config.update_tree_leaves(new_config, Config.decrypt_secret_value)
+ Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.config))
+ Config.merge(new_config)
+ Config._logger.debug("merged config from discovery: %s", json.dumps(Config.config))
+
+ @staticmethod
+ def upload_to_discovery():
+ """upload the current config settings to the discovery service"""
+ if not Config.config or not isinstance(Config.config, dict):
+ Config._logger.error("unexpected config: %s", Config.config)
+ return
+
+ latest_config = copy.deepcopy(Config.config)
+ Config.update_tree_leaves(latest_config, Config.encrypt_secret_value)
+
+ discovery_key = Config.get_system_name()
+ latest_config = json.dumps(latest_config)
+ DiscoveryClient.put_kv(discovery_key, latest_config)
+ Config._logger.debug("uploaded config to discovery(%s): %s", \
+ discovery_key, latest_config)
+
+ @staticmethod
+ def load_from_file(file_path=None):
+ """read and store the config from config file"""
+ if not file_path:
+ file_path = Config.CONFIG_FILE_PATH
+
+ loaded_config = None
+ if os.access(file_path, os.R_OK):
+ with open(file_path, 'r') as config_json:
+ loaded_config = json.load(config_json)
+
+ if not loaded_config:
+ Config._logger.info("config not loaded from file: %s", file_path)
+ return
+
+ Config._logger.info("config loaded from file: %s", file_path)
+ logging_config = loaded_config.get("logging")
+ if logging_config:
+ logging.config.dictConfig(logging_config)
+
+ Config.config_pwd = loaded_config.get(Config.FIELD_CONFIG_PWD, Config.config_pwd)
+ Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
+ Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
+ return True
+
+class PolicyEngineConfig(object):
+ """main config of the application"""
+ # PATH_TO_PROPERTIES = r'logs/policy_engine.properties'
+ PATH_TO_PROPERTIES = r'tmp/policy_engine.properties'
+ PYPDP_URL = "PYPDP_URL = {0}{1}, {2}, {3}\n"
+ CLIENT_ID = "CLIENT_ID = {0}\n"
+ CLIENT_KEY = "CLIENT_KEY = {0}\n"
+ ENVIRONMENT = "ENVIRONMENT = {0}\n"
+ _logger = logging.getLogger("policy_handler.pe_config")
+
+ @staticmethod
+ def save_to_file():
+ """create the policy_engine.properties for policy-engine client"""
+ file_path = PolicyEngineConfig.PATH_TO_PROPERTIES
+
+ try:
+ config = Config.config[Config.FIELD_POLICY_ENGINE]
+ headers = config["headers"]
+ client_parts = base64.b64decode(headers["ClientAuth"].split()[1]).split(":")
+ auth_parts = base64.b64decode(headers["Authorization"].split()[1]).split(":")
+
+ props = PolicyEngineConfig.PYPDP_URL.format(config["url"], config["path_pdp"],
+ auth_parts[0], auth_parts[1])
+ props += PolicyEngineConfig.CLIENT_ID.format(client_parts[0])
+ props += PolicyEngineConfig.CLIENT_KEY.format(base64.b64encode(client_parts[1]))
+ props += PolicyEngineConfig.ENVIRONMENT.format(headers["Environment"])
+
+ with open(file_path, 'w') as prp_file:
+ prp_file.write(props)
+ PolicyEngineConfig._logger.info("created %s", file_path)
+ except IOError:
+ PolicyEngineConfig._logger.error("failed to save to %s", file_path)
+ except KeyError:
+ PolicyEngineConfig._logger.error("unexpected config for %s", Config.FIELD_POLICY_ENGINE)
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
new file mode 100644
index 0000000..02807f8
--- /dev/null
+++ b/policyhandler/deploy_handler.py
@@ -0,0 +1,88 @@
+""" send notification to deploy-handler"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+import requests
+
+from .config import Config
+from .discovery import DiscoveryClient
+from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode
+
+class DeployHandler(object):
+ """ deploy-handler """
+ _logger = logging.getLogger("policy_handler.deploy_handler")
+ _policy_update = '/policy_update'
+ _lazy_inited = False
+ _config = None
+ _url = None
+ _path = None
+ _target_entity = None
+
+ @staticmethod
+ def _lazy_init():
+ """ set static properties """
+ if DeployHandler._lazy_inited:
+ return
+ DeployHandler._lazy_inited = True
+ DeployHandler._target_entity = Config.config["deploy_handler"]
+ DeployHandler._url = DiscoveryClient.get_service_url(DeployHandler._target_entity)
+ DeployHandler._path = DeployHandler._url + DeployHandler._policy_update
+ DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url)
+
+ @staticmethod
+ def policy_update(audit, latest_policies):
+ """ post policy_updated message to deploy-handler """
+ DeployHandler._lazy_init()
+ msg = {"latest_policies":latest_policies}
+ sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, \
+ targetServiceName=DeployHandler._path)
+ headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
+
+ msg_str = json.dumps(msg)
+ headers_str = json.dumps(headers)
+
+ log_line = "post to deployment-handler {0} msg={1} headers={2}".format(\
+ DeployHandler._path, msg_str, headers_str)
+ sub_aud.metrics_start(log_line)
+ DeployHandler._logger.info(log_line)
+
+ res = None
+ try:
+ res = requests.post(DeployHandler._path, json=msg, headers=headers)
+ except requests.exceptions.RequestException as ex:
+ error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \
+ .format(DeployHandler._path, str(ex), msg_str, headers_str)
+ DeployHandler._logger.exception(error_msg)
+ sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ sub_aud.metrics(error_msg)
+ return
+
+ sub_aud.set_http_status_code(res.status_code)
+ audit.set_http_status_code(res.status_code)
+
+ sub_aud.metrics( \
+ "response from deployment-handler to post {0}: {1} msg={2} text={3} headers={4}" \
+ .format(DeployHandler._path, res.status_code, msg_str, res.text, res.request.headers))
+
+ if res.status_code == requests.codes.ok:
+ return res.json()
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
new file mode 100644
index 0000000..7e16b90
--- /dev/null
+++ b/policyhandler/discovery.py
@@ -0,0 +1,73 @@
+"""client to talk to consul at the standard port 8500"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+import base64
+import requests
+
+class DiscoveryClient(object):
+ """talking to consul at http://consul:8500
+
+ relies on proper --add-host "consul:<consul-agent ip>" in
+ docker run command that runs along the consul-agent:
+
+ docker run --name ${APPNAME} -d
+ -e HOSTNAME
+ --add-host "consul:<consul-agent ip>"
+ -v ${BASEDIR}/logs:${TARGETDIR}/logs
+ -v ${BASEDIR}/etc:${TARGETDIR}/etc
+ -p <outport>:<innerport>
+ ${APPNAME}:latest
+ """
+ CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}"
+ CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}"
+ SERVICE_MASK = "http://{0}:{1}"
+ _logger = logging.getLogger("policy_handler.discovery")
+
+
+ @staticmethod
+ def get_service_url(service_name):
+ """find the service record in consul"""
+ service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name)
+ DiscoveryClient._logger.info("discover %s", service_path)
+ response = requests.get(service_path)
+ response.raise_for_status()
+ service = response.json()[0]
+ return DiscoveryClient.SERVICE_MASK.format( \
+ service["ServiceAddress"], service["ServicePort"])
+
+ @staticmethod
+ def get_value(key):
+ """get the value for the key from consul-kv"""
+ response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key))
+ response.raise_for_status()
+ data = response.json()[0]
+ value = base64.b64decode(data["Value"]).decode("utf-8")
+ DiscoveryClient._logger.info("consul-kv key=%s data=%s value(%s)", \
+ key, json.dumps(data), value)
+ return json.loads(value)
+
+ @staticmethod
+ def put_kv(key, value):
+ """put the value under the key in consul-kv"""
+ response = requests.put(DiscoveryClient.CONSUL_KV_MASK.format(key), data=value)
+ response.raise_for_status()
diff --git a/policyhandler/onap/CommonLogger.py b/policyhandler/onap/CommonLogger.py
new file mode 100644
index 0000000..a70ca28
--- /dev/null
+++ b/policyhandler/onap/CommonLogger.py
@@ -0,0 +1,953 @@
+#!/usr/bin/python
+# -*- indent-tabs-mode: nil -*- vi: set expandtab:
+"""ECOMP Common Logging library in Python.
+
+CommonLogger.py
+
+Original Written by: Terry Schmalzried
+Date written: October 1, 2015
+Last updated: December 1, 2016
+
+version 0.8
+"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+from __future__ import print_function
+import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections
+
+class CommonLogger:
+ """ECOMP Common Logging object.
+
+ Public methods:
+ __init__
+ setFields
+ debug
+ info
+ warn
+ error
+ fatal
+ """
+
+ UnknownFile = -1
+ ErrorFile = 0
+ DebugFile = 1
+ AuditFile = 2
+ MetricsFile = 3
+ DateFmt = '%Y-%m-%dT%H:%M:%S'
+ verbose = False
+
+ def __init__(self, configFile, logKey, **kwargs):
+ """Construct a Common Logger for one Log File.
+
+ Arguments:
+ configFile -- configuration filename.
+ logKey -- the keyword in configFile that identifies the log filename.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages,
+ one of CommonLogger.ErrorFile, CommonLogger.DebugFile,
+ CommonLogger.AuditFile and CommonLogger.MetricsFile, or
+ one of the strings "error", "debug", "audit" or "metrics".
+ May also be set in the config file using a field named
+ <logKey>Style (where <logKey> is the value of the logKey
+ parameter). The keyword value overrides the value in the
+ config file.
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._monitorFlag = False
+
+ # Get configuration parameters
+ self._logKey = str(logKey)
+ self._configFile = str(configFile)
+ self._rotateMethod = 'time'
+ self._timeRotateIntervalType = 'midnight'
+ self._timeRotateInterval = 1
+ self._sizeMaxBytes = 0
+ self._sizeRotateMode = 'a'
+ self._socketHost = None
+ self._socketPort = 0
+ self._typeLogger = 'filelogger'
+ self._backupCount = 6
+ self._logLevelThreshold = self._intLogLevel('')
+ self._logFile = None
+ self._begTime = None
+ self._begMsec = 0
+ self._fields = {}
+ self._fields["style"] = CommonLogger.UnknownFile
+ try:
+ self._configFileModified = os.path.getmtime(self._configFile)
+ for line in open(self._configFile):
+ line = line.split('#',1)[0] # remove comments
+ if '=' in line:
+ key, value = [x.strip() for x in line.split('=',1)]
+ if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']:
+ self._rotateMethod = value.lower()
+ elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
+ self._timeRotateIntervalType = value
+ elif key == 'timeRotateInterval' and int( value ) > 0:
+ self._timeRotateInterval = int( value )
+ elif key == 'sizeMaxBytes' and int( value ) >= 0:
+ self._sizeMaxBytes = int( value )
+ elif key == 'sizeRotateMode' and value in ['a']:
+ self._sizeRotateMode = value
+ elif key == 'backupCount' and int( value ) >= 0:
+ self._backupCount = int( value )
+ elif key == self._logKey + 'SocketHost':
+ self._socketHost = value
+ elif key == self._logKey + 'SocketPort' and int( value ) == 0:
+ self._socketPort = int( value )
+ elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
+ self._typeLogger = value.lower()
+ elif key == self._logKey + 'LogLevel':
+ self._logLevelThreshold = self._intLogLevel(value.upper())
+ elif key == self._logKey + 'Style':
+ self._fields["style"] = value
+ elif key == self._logKey:
+ self._logFile = value
+ except Exception as x:
+ print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)), file=sys.stderr)
+ sys.exit(2)
+ except:
+ print("exception reading '%s' configuration file" %(self._configFile), file=sys.stderr)
+ sys.exit(2)
+
+ if self._logFile is None:
+ print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey), file=sys.stderr)
+ sys.exit(2)
+
+
+ # initialize default log fields
+ # timestamp will automatically be generated
+ for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
+ 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
+ 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
+ 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
+ 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
+ 'errorDescription' ]:
+ if key in kwargs and kwargs[key] != None:
+ self._fields[key] = kwargs[key]
+
+ self._resetStyleField()
+
+ # Set up logger
+ self._logLock = threading.Lock()
+ with self._logLock:
+ self._logger = logging.getLogger(self._logKey)
+ self._logger.propagate = False
+ self._createLogger()
+
+ self._defaultServerInfo()
+
+ # spawn a thread to monitor configFile for logLevel and logFile changes
+ self._monitorFlag = True
+ self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=())
+ self._monitorThread.daemon = True
+ self._monitorThread.start()
+
+
+ def _createLogger(self):
+ if self._typeLogger == 'filelogger':
+ self._mkdir_p(self._logFile)
+ if self._rotateMethod == 'time':
+ self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \
+ when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \
+ backupCount=self._backupCount, encoding=None, delay=False, utc=True)
+ elif self._rotateMethod == 'size':
+ self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
+ mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
+ backupCount=self._backupCount, encoding=None, delay=False)
+
+ else:
+ self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
+ mode=self._sizeRotateMode, \
+ encoding=None, delay=False)
+ elif self._typeLogger == 'stderrlogger':
+ self._logHandler = logging.handlers.StreamHandler(sys.stderr)
+ elif self._typeLogger == 'stdoutlogger':
+ self._logHandler = logging.handlers.StreamHandler(sys.stdout)
+ elif self._typeLogger == 'socketlogger':
+ self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort)
+ elif self._typeLogger == 'nulllogger':
+ self._logHandler = logging.handlers.NullHandler()
+
+ if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile:
+ self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt)
+ else:
+ self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
+ self._logFormatter.converter = time.gmtime
+ self._logHandler.setFormatter(self._logFormatter)
+ self._logger.addHandler(self._logHandler)
+
+ def _resetStyleField(self):
+ styleFields = ["error", "debug", "audit", "metrics"]
+ if self._fields['style'] in styleFields:
+ self._fields['style'] = styleFields.index(self._fields['style'])
+
+ def __del__(self):
+ if self._monitorFlag == False:
+ return
+
+ self._monitorFlag = False
+
+ if self._monitorThread is not None and self._monitorThread.is_alive():
+ self._monitorThread.join()
+
+ self._monitorThread = None
+
+
+ def _defaultServerInfo(self):
+
+ # If not set or purposely set = None, then set default
+ if self._fields.get('server') is None:
+ try:
+ self._fields['server'] = socket.getfqdn()
+ except Exception as err:
+ try:
+ self._fields['server'] = socket.gethostname()
+ except Exception as err:
+ self._fields['server'] = ""
+
+ # If not set or purposely set = None, then set default
+ if self._fields.get('serverIPAddress') is None:
+ try:
+ self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server'])
+ except Exception as err:
+ self._fields['serverIPAddress'] = ""
+
+
+ def _monitorConfigFile(self):
+ while self._monitorFlag:
+ try:
+ fileTime = os.path.getmtime(self._configFile)
+ if fileTime > self._configFileModified:
+ self._configFileModified = fileTime
+ ReopenLogFile = False
+ logFile = self._logFile
+ with open(self._configFile) as fp:
+ for line in fp:
+ line = line.split('#',1)[0] # remove comments
+ if '=' in line:
+ key, value = [x.strip() for x in line.split('=',1)]
+ if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value:
+ self._rotateMethod = value.lower()
+ ReopenLogFile = True
+ elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
+ self._timeRotateIntervalType = value
+ ReopenLogFile = True
+ elif key == 'timeRotateInterval' and int( value ) > 0:
+ self._timeRotateInterval = int( value )
+ ReopenLogFile = True
+ elif key == 'sizeMaxBytes' and int( value ) >= 0:
+ self._sizeMaxBytes = int( value )
+ ReopenLogFile = True
+ elif key == 'sizeRotateMode' and value in ['a']:
+ self._sizeRotateMode = value
+ ReopenLogFile = True
+ elif key == 'backupCount' and int( value ) >= 0:
+ self._backupCount = int( value )
+ ReopenLogFile = True
+ elif key == self._logKey + 'SocketHost' and self._socketHost != value:
+ self._socketHost = value
+ ReopenLogFile = True
+ elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ):
+ self._socketPort = int( value )
+ ReopenLogFile = True
+ elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ):
+ self._logLevelThreshold = self._intLogLevel(value.upper())
+ elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
+ self._typeLogger = value.lower()
+ ReopenLogFile = True
+ elif key == self._logKey + 'Style':
+ self._fields["style"] = value
+ self._resetStyleField()
+ elif key == self._logKey and self._logFile != value:
+ logFile = value
+ ReopenLogFile = True
+ if ReopenLogFile:
+ with self._logLock:
+ self._logger.removeHandler(self._logHandler)
+ self._logFile = logFile
+ self._createLogger()
+ except Exception as err:
+ pass
+
+ time.sleep(5)
+
+
+ def setFields(self, **kwargs):
+ """Set default values for log fields.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
+ 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
+ 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
+ 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
+ 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
+ 'errorDescription' ]:
+ if key in kwargs:
+ if kwargs[key] != None:
+ self._fields[key] = kwargs[key]
+ elif key in self._fields:
+ del self._fields[key]
+
+ self._defaultServerInfo()
+
+
+ def debug(self, message, **kwargs):
+ """Write a DEBUG level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs)
+
+ def info(self, message, **kwargs):
+ """Write an INFO level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('INFO', message, errorCategory = 'INFO', **kwargs)
+
+ def warn(self, message, **kwargs):
+ """Write a WARN level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('WARN', message, errorCategory = 'WARN', **kwargs)
+
+ def error(self, message, **kwargs):
+ """Write an ERROR level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('ERROR', message, errorCategory = 'ERROR', **kwargs)
+
+ def fatal(self, message, **kwargs):
+ """Write a FATAL level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('FATAL', message, errorCategory = 'FATAL', **kwargs)
+
+ def _log(self, logLevel, message, **kwargs):
+ """Write a message to the log file.
+
+ Arguments:
+ logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record.
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ # timestamp will automatically be inserted
+ style = int(self._getVal('style', '', **kwargs))
+ requestID = self._getVal('requestID', '', **kwargs)
+ serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs)
+ threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs)
+ serverName = self._getVal('serverName', '', **kwargs)
+ serviceName = self._getVal('serviceName', '', **kwargs)
+ instanceUUID = self._getVal('instanceUUID', '', **kwargs)
+ upperLogLevel = self._noSep(logLevel.upper())
+ severity = self._getVal('severity', '', **kwargs)
+ serverIPAddress = self._getVal('serverIPAddress', '', **kwargs)
+ server = self._getVal('server', '', **kwargs)
+ IPAddress = self._getVal('IPAddress', '', **kwargs)
+ className = self._getVal('className', '', **kwargs)
+ timer = self._getVal('timer', '', **kwargs)
+ partnerName = self._getVal('partnerName', '', **kwargs)
+ targetEntity = self._getVal('targetEntity', '', **kwargs)
+ targetServiceName = self._getVal('targetServiceName', '', **kwargs)
+ statusCode = self._getVal('statusCode', '', **kwargs)
+ responseCode = self._getVal('responseCode', '', **kwargs)
+ responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs))
+ processKey = self._getVal('processKey', '', **kwargs)
+ targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs)
+ customField1 = self._getVal('customField1', '', **kwargs)
+ customField2 = self._getVal('customField2', '', **kwargs)
+ customField3 = self._getVal('customField3', '', **kwargs)
+ customField4 = self._getVal('customField4', '', **kwargs)
+ errorCategory = self._getVal('errorCategory', '', **kwargs)
+ errorCode = self._getVal('errorCode', '', **kwargs)
+ errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs))
+ nbegTime = self._getArg('begTime', {}, **kwargs)
+
+ detailMessage = self._noSep(message)
+ if bool(re.match(r" *$", detailMessage)):
+ return # don't log empty messages
+
+ useLevel = self._intLogLevel(upperLogLevel)
+ if CommonLogger.verbose: print("logger STYLE=%s" % style)
+ if useLevel < self._logLevelThreshold:
+ if CommonLogger.verbose: print("skipping because of level")
+ pass
+ else:
+ with self._logLock:
+ if style == CommonLogger.ErrorFile:
+ if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
+ errorCategory, errorCode, errorDescription, detailMessage))
+ elif style == CommonLogger.DebugFile:
+ if CommonLogger.verbose: print("using CommonLogger.DebugFile")
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
+ severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
+ elif style == CommonLogger.AuditFile:
+ if CommonLogger.verbose: print("using CommonLogger.AuditFile")
+ endAuditTime, endAuditMsec = self._getTime()
+ if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
+ d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
+ elif self._begTime is not None:
+ d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
+ else:
+ d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
+ self._begTime = None
+ unused = ""
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+ statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
+ severity, serverIPAddress, timer, server, IPAddress, className, unused,
+ processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d)
+ elif style == CommonLogger.MetricsFile:
+ if CommonLogger.verbose: print("using CommonLogger.MetricsFile")
+ endMetricsTime, endMetricsMsec = self._getTime()
+ if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
+ d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
+ elif self._begTime is not None:
+ d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
+ else:
+ d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
+ self._begTime = None
+ unused = ""
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+ targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
+ instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress,
+ className, unused, processKey, targetVirtualEntity, customField1, customField2,
+ customField3, customField4, detailMessage), extra=d)
+ else:
+ print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"])
+
+ def _getTime(self):
+ ct = time.time()
+ lt = time.localtime(ct)
+ return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000)
+
+ def setStartRecordEvent(self):
+ """
+ Set the start time to be saved for both audit and metrics records
+ """
+ self._begTime, self._begMsec = self._getTime()
+
+ def getStartRecordEvent(self):
+ """
+ Retrieve the start time to be used for either audit and metrics records
+ """
+ begTime, begMsec = self._getTime()
+ return {'begTime':begTime, 'begMsec':begMsec}
+
+ def _getVal(self, key, default, **kwargs):
+ val = self._fields.get(key)
+ if key in kwargs: val = kwargs[key]
+ if val is None: val = default
+ return self._noSep(val)
+
+ def _getArg(self, key, default, **kwargs):
+ val = None
+ if key in kwargs: val = kwargs[key]
+ if val is None: val = default
+ return val
+
+ def _noSep(self, message):
+ if message is None: return ''
+ return re.sub(r'[\|\n]', ' ', str(message))
+
+ def _intLogLevel(self, logLevel):
+ if logLevel == 'FATAL': useLevel = 50
+ elif logLevel == 'ERROR': useLevel = 40
+ elif logLevel == 'WARN': useLevel = 30
+ elif logLevel == 'INFO': useLevel = 20
+ elif logLevel == 'DEBUG': useLevel = 10
+ else: useLevel = 0
+ return useLevel
+
+ def _mkdir_p(self, filename):
+ """Create missing directories from a full filename path like mkdir -p"""
+
+ if filename is None:
+ return
+
+ folder=os.path.dirname(filename)
+
+ if folder == "":
+ return
+
+ if not os.path.exists(folder):
+ try:
+ os.makedirs(folder)
+ except OSError as err:
+ print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror), file=sys.stderr)
+ sys.exit(2)
+ except Exception as err:
+ print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)), file=sys.stderr)
+ sys.exit(2)
+
+if __name__ == "__main__":
+
+ def __checkOneTime(line):
+ format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
+ m = re.match(format, line)
+ if not m:
+ print("ERROR: time string did not match proper time format, %s" %line)
+ print("\t: format=%s" % format)
+ return 1
+ return 0
+
+ def __checkTwoTimes(line, different):
+ format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]'
+ m = re.match(format, line)
+ if not m:
+ print("ERROR: time strings did not match proper time format, %s" %line)
+ print("\t: format=%s" % format)
+ return 1
+ second1 = int(m.group(1))
+ msec1 = int(m.group(2))
+ second2 = int(m.group(3))
+ msec2 = int(m.group(4))
+ if second1 > second2: second2 += 60
+ t1 = second1 * 1000 + msec1
+ t2 = second2 * 1000 + msec2
+ diff = t2 - t1
+ # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff))
+ if different:
+ if diff < 500:
+ print("ERROR: times did not differ enough: %s" % line)
+ return 1
+ else:
+ if diff > 10:
+ print("ERROR: times were too far apart: %s" % line)
+ return 1
+ return 0
+
+ def __checkBegTime(line):
+ format = "begTime should be ([-0-9T:]+)"
+ # print("checkBegTime(%s)" % line)
+ strt = 'begTime should be '
+ i = line.index(strt)
+ rest = line[i+len(strt):].rstrip()
+ if not line.startswith(rest + ","):
+ print("ERROR: line %s should start with %s" % (line,rest))
+ return 1
+ return 0
+
+ def __checkLog(logfile, numLines, numFields):
+ lineCount = 0
+ errorCount = 0
+ with open(logfile, "r") as fp:
+ for line in fp:
+ # print("saw line %s" % line)
+ lineCount += 1
+ c = line.count('|')
+ if c != numFields:
+ print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line))
+ errorCount += 1
+ if re.search("should not appear", line):
+ print("ERROR: a line appeared that should not have appeared, %s" % line)
+ errorCount += 1
+ elif re.search("single time", line):
+ errorCount += __checkOneTime(line)
+ elif re.search("time should be the same", line):
+ errorCount += __checkTwoTimes(line, different=False)
+ elif re.search("time should be ", line):
+ errorCount += __checkTwoTimes(line, different=True)
+ elif re.search("begTime should be ", line):
+ errorCount += __checkBegTime(line)
+ else:
+ print("ERROR: an unknown message appeared, %s" % line)
+ errorCount += 1
+
+ if lineCount != numLines:
+ print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount))
+ errorCount += 1
+ return errorCount
+
+ import os, argparse
+ parser = argparse.ArgumentParser(description="test the CommonLogger functions")
+ parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
+ parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
+ args = parser.parse_args()
+
+ spid = str(os.getpid())
+ if args.keeplogs:
+ spid = ""
+ logcfg = "/tmp/cl.log" + spid + ".cfg"
+ errorLog = "/tmp/cl.error" + spid + ".log"
+ metricsLog = "/tmp/cl.metrics" + spid + ".log"
+ auditLog = "/tmp/cl.audit" + spid + ".log"
+ debugLog = "/tmp/cl.debug" + spid + ".log"
+ if args.verbose: CommonLogger.verbose = True
+
+ import atexit
+ def cleanupTmps():
+ for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]:
+ try:
+ os.remove(f)
+ except:
+ pass
+ if not args.keeplogs:
+ atexit.register(cleanupTmps)
+
+ with open(logcfg, "w") as o:
+ o.write("error = " + errorLog + "\n" +
+ "errorLogLevel = WARN\n" +
+ "metrics = " + metricsLog + "\n" +
+ "metricsLogLevel = INFO\n" +
+ "audit = " + auditLog + "\n" +
+ "auditLogLevel = INFO\n" +
+ "debug = " + debugLog + "\n" +
+ "debugLogLevel = DEBUG\n")
+
+ import uuid
+ instanceUUID = uuid.uuid1()
+ serviceName = "testharness"
+ errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName)
+
+ testsRun = 0
+ errorCount = 0
+ errorLogger.debug("error calling debug (should not appear)")
+ errorLogger.info("error calling info (should not appear)")
+ errorLogger.warn("error calling warn (single time)")
+ errorLogger.error("error calling error (single time)")
+ errorLogger.setStartRecordEvent()
+ time.sleep(1)
+ errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)")
+ testsRun += 6
+ errorCount += __checkLog(errorLog, 3, 10)
+
+ auditLogger.debug("audit calling debug (should not appear)")
+ auditLogger.info("audit calling info (time should be the same)")
+ auditLogger.warn("audit calling warn (time should be the same)")
+ auditLogger.error("audit calling error (time should be the same)")
+ bt = auditLogger.getStartRecordEvent()
+ # print("bt=%s" % bt)
+ time.sleep(1)
+ auditLogger.setStartRecordEvent()
+ time.sleep(1)
+ auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)")
+ time.sleep(1)
+ auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
+ testsRun += 7
+ errorCount += __checkLog(auditLog, 5, 25)
+
+ debugLogger.debug("debug calling debug (single time)")
+ debugLogger.info("debug calling info (single time)")
+ debugLogger.warn("debug calling warn (single time)")
+ debugLogger.setStartRecordEvent()
+ time.sleep(1)
+ debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)")
+ debugLogger.fatal("debug calling fatal (single time)")
+ errorCount += __checkLog(debugLog, 5, 13)
+ testsRun += 6
+
+ metricsLogger.debug("metrics calling debug (should not appear)")
+ metricsLogger.info("metrics calling info (time should be the same)")
+ metricsLogger.warn("metrics calling warn (time should be the same)")
+ bt = metricsLogger.getStartRecordEvent()
+ time.sleep(1)
+ metricsLogger.setStartRecordEvent()
+ time.sleep(1)
+ metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different")
+ metricsLogger.fatal("metrics calling fatal (time should be the same)")
+ time.sleep(1)
+ metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
+ testsRun += 6
+ errorCount += __checkLog(metricsLog, 5, 28)
+
+ print("%d tests run, %d errors found" % (testsRun, errorCount))
diff --git a/policyhandler/onap/__init__.py b/policyhandler/onap/__init__.py
new file mode 100644
index 0000000..a3220c4
--- /dev/null
+++ b/policyhandler/onap/__init__.py
@@ -0,0 +1,18 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
new file mode 100644
index 0000000..a1df861
--- /dev/null
+++ b/policyhandler/onap/audit.py
@@ -0,0 +1,320 @@
+"""generic class to keep track of request handling
+ from receiving it through reponse and log all the activities
+
+ call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests
+
+ start each outside request with creation of the Audit object
+ audit = Audit(request_id=None, headers=None, msg=None)
+"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import os
+import json
+import uuid
+import time
+import copy
+from threading import Lock
+from enum import Enum
+
+from .CommonLogger import CommonLogger
+
+REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID"
+REQUEST_REMOTE_ADDR = "Remote-Addr"
+REQUEST_HOST = "Host"
+HOSTNAME = "HOSTNAME"
+
+AUDIT_REQUESTID = 'requestID'
+AUDIT_IPADDRESS = 'IPAddress'
+AUDIT_SERVER = 'server'
+
+HEADER_CLIENTAUTH = "clientauth"
+HEADER_AUTHORIZATION = "authorization"
+
+class AuditHttpCode(Enum):
+ """audit http codes"""
+ HTTP_OK = 200
+ DATA_NOT_FOUND_ERROR = 400
+ PERMISSION_UNAUTHORIZED_ERROR = 401
+ PERMISSION_FORBIDDEN_ERROR = 403
+ SERVER_INTERNAL_ERROR = 500
+ SERVICE_UNAVAILABLE_ERROR = 503
+ DATA_ERROR = 1030
+ SCHEMA_ERROR = 1040
+
+class AuditResponseCode(Enum):
+ """audit response codes"""
+ SUCCESS = 0
+ PERMISSION_ERROR = 100
+ AVAILABILITY_ERROR = 200
+ DATA_ERROR = 300
+ SCHEMA_ERROR = 400
+ BUSINESS_PROCESS_ERROR = 500
+ UNKNOWN_ERROR = 900
+
+ @staticmethod
+ def get_response_code(http_status_code):
+ """calculates the response_code from max_http_status_code"""
+ if http_status_code <= AuditHttpCode.HTTP_OK.value:
+ return AuditResponseCode.SUCCESS
+
+ if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \
+ AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
+ return AuditResponseCode.PERMISSION_ERROR
+ if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
+ return AuditResponseCode.AVAILABILITY_ERROR
+ if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ return AuditResponseCode.BUSINESS_PROCESS_ERROR
+ if http_status_code in [AuditHttpCode.DATA_ERROR.value, \
+ AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
+ return AuditResponseCode.DATA_ERROR
+ if http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
+ return AuditResponseCode.SCHEMA_ERROR
+
+ return AuditResponseCode.UNKNOWN_ERROR
+
+ @staticmethod
+ def get_human_text(response_code):
+ """convert enum name into human readable text"""
+ if not response_code:
+ return "unknown"
+ return response_code.name.lower().replace("_", " ")
+
+class Audit(object):
+ """put the audit object on stack per each initiating request in the system
+
+ :request_id: is the X-ECOMP-RequestID for tracing
+
+ :req_message: is the request message string for logging
+
+ :aud_parent: is the parent request - used for sub-query metrics to other systems
+
+ :kwargs: - put any request related params into kwargs
+ """
+ _service_name = ""
+ _service_instance_UUID = str(uuid.uuid4())
+ _logger_debug = None
+ _logger_error = None
+ _logger_metrics = None
+ _logger_audit = None
+
+ @staticmethod
+ def init(service_name, config_file_path):
+ """init static invariants and loggers"""
+ Audit._service_name = service_name
+ Audit._logger_debug = CommonLogger(config_file_path, "debug", \
+ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
+ Audit._logger_error = CommonLogger(config_file_path, "error", \
+ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
+ Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
+ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
+ Audit._logger_audit = CommonLogger(config_file_path, "audit", \
+ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name)
+
+ def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs):
+ """create audit object per each request in the system
+
+ :request_id: is the X-ECOMP-RequestID for tracing
+ :req_message: is the request message string for logging
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ self.request_id = request_id
+ self.req_message = req_message or ""
+ self.aud_parent = aud_parent
+ self.kwargs = kwargs or {}
+
+ self.retry_get_config = False
+ self.max_http_status_code = 0
+ self._lock = Lock()
+
+ if self.aud_parent:
+ if not self.request_id:
+ self.request_id = self.aud_parent.request_id
+ if not self.req_message:
+ self.req_message = self.aud_parent.req_message
+ self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
+ else:
+ headers = self.kwargs.get("headers", {})
+ if headers:
+ if not self.request_id:
+ self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+ if AUDIT_IPADDRESS not in self.kwargs:
+ self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
+
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
+
+ created_req = ""
+ if not self.request_id:
+ created_req = " with new"
+ self.request_id = str(uuid.uuid4())
+
+ self.kwargs[AUDIT_REQUESTID] = self.request_id
+
+ self._started = time.time()
+ self._start_event = Audit._logger_audit.getStartRecordEvent()
+ self.metrics_start()
+
+ if not self.aud_parent:
+ self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
+ .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+
+ def merge_all_kwargs(self, **kwargs):
+ """returns the merge of copy of self.kwargs with the param kwargs"""
+ all_kwargs = self.kwargs.copy()
+ if kwargs:
+ all_kwargs.update(kwargs)
+ return all_kwargs
+
+ def set_http_status_code(self, http_status_code):
+ """accumulate the highest(worst) http status code"""
+ self._lock.acquire()
+ if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ self.max_http_status_code = max(http_status_code, self.max_http_status_code)
+ self._lock.release()
+
+ @staticmethod
+ def get_status_code(success):
+ """COMPLETE versus ERROR"""
+ if success:
+ return 'COMPLETE'
+ return 'ERROR'
+
+ @staticmethod
+ def hide_secrets(obj):
+ """hides the known secret field values of the dictionary"""
+ if not isinstance(obj, dict):
+ return obj
+
+ for key in obj:
+ if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
+ obj[key] = "*"
+ elif isinstance(obj[key], dict):
+ obj[key] = Audit.hide_secrets(obj[key])
+
+ return obj
+
+ @staticmethod
+ def log_json_dumps(obj, **kwargs):
+ """hide the known secret field values of the dictionary and return json.dumps"""
+ if not isinstance(obj, dict):
+ return json.dumps(obj, **kwargs)
+
+ return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
+
+ def get_response_code(self):
+ """calculates the response_code from max_http_status_code"""
+ self._lock.acquire()
+ max_http_status_code = self.max_http_status_code
+ self._lock.release()
+ return AuditResponseCode.get_response_code(max_http_status_code)
+
+ def debug(self, log_line, **kwargs):
+ """debug - the debug=lowest level of logging"""
+ Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def info(self, log_line, **kwargs):
+ """debug - the info level of logging"""
+ Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def info_requested(self, result=None, **kwargs):
+ """info "requested ..." - the info level of logging"""
+ self.info("requested {0} {1}".format(self.req_message, result or ""), \
+ **self.merge_all_kwargs(**kwargs))
+
+ def warn(self, log_line, **kwargs):
+ """debug+error - the warn level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ Audit._logger_debug.warn(log_line, **all_kwargs)
+ Audit._logger_error.warn(log_line, **all_kwargs)
+
+ def error(self, log_line, **kwargs):
+ """debug+error - the error level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ Audit._logger_debug.error(log_line, **all_kwargs)
+ Audit._logger_error.error(log_line, **all_kwargs)
+
+ def fatal(self, log_line, **kwargs):
+ """debug+error - the fatal level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ Audit._logger_debug.fatal(log_line, **all_kwargs)
+ Audit._logger_error.fatal(log_line, **all_kwargs)
+
+ @staticmethod
+ def get_elapsed_time(started):
+ """returns the elapsed time since started in milliseconds"""
+ return int(round(1000 * (time.time() - started)))
+
+ def metrics_start(self, log_line=None, **kwargs):
+ """reset metrics timing"""
+ self._metrics_started = time.time()
+ self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
+ if log_line:
+ self.info(log_line, **self.merge_all_kwargs(**kwargs))
+
+ def metrics(self, log_line, **kwargs):
+ """debug+metrics - the metrics=sub-audit level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ response_code = self.get_response_code()
+ success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ metrics_func = None
+ if success:
+ log_line = "done: {0}".format(log_line)
+ self.info(log_line, **all_kwargs)
+ metrics_func = Audit._logger_metrics.info
+ else:
+ log_line = "failed: {0}".format(log_line)
+ self.error(log_line, errorCode=response_code.value, \
+ errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs)
+ metrics_func = Audit._logger_metrics.error
+
+ metrics_func(log_line, begTime=self._metrics_start_event, \
+ timer=Audit.get_elapsed_time(self._metrics_started), \
+ statusCode=Audit.get_status_code(success), responseCode=response_code.value, \
+ responseDescription=AuditResponseCode.get_human_text(response_code), \
+ **all_kwargs)
+
+ self.metrics_start()
+
+ def audit_done(self, result=None, **kwargs):
+ """debug+audit - the audit=top level of logging"""
+ all_kwargs = self.merge_all_kwargs(**kwargs)
+ response_code = self.get_response_code()
+ success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ log_line = "{0} {1}".format(self.req_message, result or "").strip()
+ audit_func = None
+ if success:
+ log_line = "done: {0}".format(log_line)
+ self.info(log_line, **all_kwargs)
+ audit_func = Audit._logger_audit.info
+ else:
+ log_line = "failed: {0}".format(log_line)
+ self.error(log_line, errorCode=response_code.value, \
+ errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs)
+ audit_func = Audit._logger_audit.error
+
+ audit_func(log_line, begTime=self._start_event, \
+ timer=Audit.get_elapsed_time(self._started), \
+ statusCode=Audit.get_status_code(success), responseCode=response_code.value, \
+ responseDescription=AuditResponseCode.get_human_text(response_code), \
+ **all_kwargs)
diff --git a/policyhandler/onap/crypto.py b/policyhandler/onap/crypto.py
new file mode 100644
index 0000000..e2d58db
--- /dev/null
+++ b/policyhandler/onap/crypto.py
@@ -0,0 +1,72 @@
+"""ONAP specific encryption-decryption for passwords"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import base64
+from Crypto.Cipher import AES
+from Crypto.Protocol.KDF import PBKDF2
+from Crypto import Random
+
+class Cipher(object):
+ """class for AES-256 encryption and decryption of text using the salted password"""
+ KEY_SIZE = 32 # AES-256
+ KDF_ITERATIONS = 16384
+ AES_MODE = AES.MODE_CFB
+
+ @staticmethod
+ def encrypt(password, plain_text):
+ """
+ encrypt the :plain_text: into :cipher_text: using the password
+
+ :cipher_text: formatted as pbkdf2_salt + init_vector + encrypt(:plain_text:)
+ then cipher_text is encoded as base64 to make it textable (non-binary)
+
+ :pbkdf2_salt: has the fixed length of 32 (AES-256)
+ :init_vector: has the fixed length of AES.block_size
+ """
+ pbkdf2_salt = Random.new().read(Cipher.KEY_SIZE)
+ init_vector = Random.new().read(AES.block_size)
+ derived_key = PBKDF2(password, pbkdf2_salt, Cipher.KEY_SIZE, Cipher.KDF_ITERATIONS)
+
+ cipher = AES.new(derived_key, Cipher.AES_MODE, init_vector)
+ cipher_text = base64.b64encode(pbkdf2_salt + init_vector + cipher.encrypt(plain_text))
+ return cipher_text
+
+ @staticmethod
+ def decrypt(password, cipher_text):
+ """
+ decrypt the :cipher_text: into :plain_text: using the password
+
+ :cipher_text: is expected to be encoded as base64 to make it textable (non-binary)
+ inside of that it is expected to be formatted as
+ pbkdf2_salt + init_vector + encrypt(:plain_text:)
+
+ :pbkdf2_salt: has the fixed length of 32 (AES-256)
+ :init_vector: has the fixed length of AES.block_size
+ """
+ cipher_text = base64.b64decode(cipher_text)
+ pbkdf2_salt = cipher_text[: Cipher.KEY_SIZE]
+ init_vector = cipher_text[Cipher.KEY_SIZE : Cipher.KEY_SIZE + AES.block_size]
+ cipher_text = cipher_text[Cipher.KEY_SIZE + AES.block_size :]
+ derived_key = PBKDF2(password, pbkdf2_salt, Cipher.KEY_SIZE, Cipher.KDF_ITERATIONS)
+
+ cipher = AES.new(derived_key, Cipher.AES_MODE, init_vector)
+ plain_text = cipher.decrypt(cipher_text).decode('utf-8')
+ return plain_text
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
new file mode 100644
index 0000000..640b724
--- /dev/null
+++ b/policyhandler/policy_consts.py
@@ -0,0 +1,28 @@
+"""contants of policy-handler"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+POLICY_ID = 'policy_id'
+POLICY_VERSION = "policyVersion"
+POLICY_NAME = "policyName"
+POLICY_BODY = 'policy_body'
+POLICY_CONFIG = 'config'
+
+POLICY_GET_CONFIG = 'getConfig'
diff --git a/policyhandler/policy_engine.py b/policyhandler/policy_engine.py
new file mode 100644
index 0000000..838ccc7
--- /dev/null
+++ b/policyhandler/policy_engine.py
@@ -0,0 +1,100 @@
+"""policy-engine-client communicates with policy-engine thru PolicyEngine client object"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import re
+
+from .config import Config, PolicyEngineConfig
+from .onap.audit import Audit
+from .PolicyEngine import PolicyEngine, NotificationHandler, NotificationScheme
+from .policy_updater import PolicyUpdater
+
+class PolicyNotificationHandler(NotificationHandler):
+ """handler of the policy-engine push notifications"""
+ _logger = logging.getLogger("policy_handler.policy_notification")
+
+ def __init__(self, policy_updater):
+ scope_prefixes = [scope_prefix.replace(".", "[.]") \
+ for scope_prefix in Config.config["scope_prefixes"]]
+ self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
+ PolicyNotificationHandler._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
+ self._policy_updater = policy_updater
+ self._policy_updater.start()
+
+ def notificationReceived(self, notification):
+ if not notification or not notification._loadedPolicies:
+ return
+
+ policy_names = [loaded._policyName \
+ for loaded in notification._loadedPolicies \
+ if self._policy_scopes.match(loaded._policyName)]
+
+ if not policy_names:
+ PolicyNotificationHandler._logger.info("no policy updated for scopes %s", \
+ self._policy_scopes.pattern)
+ return
+
+ audit = Audit(req_message="notificationReceived from PDP")
+ audit.retry_get_config = True
+ self._policy_updater.enqueue(audit, policy_names)
+
+class PolicyEngineClient(object):
+ """ policy-engine client"""
+ _logger = logging.getLogger("policy_handler.policy_engine")
+ _policy_updater = None
+ _pdp_notification_handler = None
+ _policy_engine = None
+
+ @staticmethod
+ def shutdown(audit):
+ """Shutdown the notification-handler"""
+ PolicyEngineClient._policy_updater.shutdown(audit)
+
+ @staticmethod
+ def catch_up(audit):
+ """bring the latest policies from policy-engine"""
+ PolicyEngineClient._policy_updater.catch_up(audit)
+
+ @staticmethod
+ def create_policy_engine_properties():
+ """create the policy_engine.properties file from config.json"""
+ pass
+
+ @staticmethod
+ def run():
+ """Using policy-engine client to talk to policy engine"""
+ audit = Audit(req_message="start PDP client")
+ PolicyEngineClient._policy_updater = PolicyUpdater()
+ PolicyEngineClient._pdp_notification_handler = PolicyNotificationHandler(
+ PolicyEngineClient._policy_updater)
+
+ sub_aud = Audit(aud_parent=audit)
+ sub_aud.metrics_start("create client to PDP")
+ PolicyEngineConfig.save_to_file()
+ PolicyEngineClient._policy_engine = PolicyEngine(PolicyEngineConfig.PATH_TO_PROPERTIES, \
+ scheme=NotificationScheme.AUTO_ALL_NOTIFICATIONS.name,\
+ handler=PolicyEngineClient._pdp_notification_handler)
+ sub_aud.metrics("created client to PDP")
+ seed_scope = Config.config["scope_prefixes"][0] + ".*"
+ PolicyEngineClient._policy_engine.getConfig(policyName=seed_scope)
+ sub_aud.metrics("seeded client by PDP.getConfig for policyName={0}".format(seed_scope))
+
+ PolicyEngineClient.catch_up(audit)
diff --git a/policyhandler/policy_handler.py b/policyhandler/policy_handler.py
new file mode 100644
index 0000000..10633cd
--- /dev/null
+++ b/policyhandler/policy_handler.py
@@ -0,0 +1,87 @@
+"""run as server: python -m policyhandler/policy_handler"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import sys
+import logging
+
+from policyhandler.config import Config
+from policyhandler.onap.audit import Audit
+from policyhandler.web_server import PolicyWeb
+from policyhandler.policy_engine import PolicyEngineClient
+
+class LogWriter(object):
+ """redirect the standard out + err to the logger"""
+ def __init__(self, logger_func):
+ self.logger_func = logger_func
+
+ def write(self, log_line):
+ """actual writer to be used in place of stdout or stderr"""
+ log_line = log_line.rstrip()
+ if log_line:
+ self.logger_func(log_line)
+
+ def flush(self):
+ """no real flushing of the buffer"""
+ pass
+
+def run_policy_handler():
+ """main run function for policy-handler"""
+ Config.load_from_file()
+ Config.discover()
+
+ logger = logging.getLogger("policy_handler")
+ sys.stdout = LogWriter(logger.info)
+ sys.stderr = LogWriter(logger.error)
+
+ logger.info("========== run_policy_handler ==========")
+ Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
+
+ logger.info("starting policy_handler with config:")
+ logger.info(Audit.log_json_dumps(Config.config))
+
+ PolicyEngineClient.run()
+ PolicyWeb.run()
+
+def upload_config_to_discovery():
+ """read the config from file and upload it to discovery"""
+ logger = logging.getLogger("policy_handler")
+ sys.stdout = LogWriter(logger.info)
+ sys.stderr = LogWriter(logger.error)
+
+ Config.load_from_file()
+
+ if not Config.load_from_file(Config.UPLOAD_CONFIG_FILE_PATH):
+ logger.info("not found config %s", Config.UPLOAD_CONFIG_FILE_PATH)
+ return
+
+ logger.info("========== upload_config_to_discovery ==========")
+ Config.upload_to_discovery()
+
+ logger.info("========== upload_config_to_discovery - get it back ==========")
+ Config.config = None
+ Config.load_from_file()
+ Config.discover()
+ logger.info("========== upload_config_to_discovery - done ==========")
+ return True
+
+if __name__ == "__main__":
+ if not upload_config_to_discovery():
+ run_policy_handler()
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
new file mode 100644
index 0000000..d49164c
--- /dev/null
+++ b/policyhandler/policy_rest.py
@@ -0,0 +1,342 @@
+"""policy-client communicates with policy-engine thru REST API"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+import copy
+import re
+import time
+from multiprocessing.dummy import Pool as ThreadPool
+import requests
+
+from .config import Config
+from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \
+ POLICY_BODY, POLICY_CONFIG
+from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode
+
+class PolicyUtils(object):
+ """policy-client utils"""
+ _logger = logging.getLogger("policy_handler.policy_utils")
+ _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
+
+ @staticmethod
+ def safe_json_parse(json_str):
+ """try parsing json without exception - returns the json_str back if fails"""
+ if not json_str:
+ return json_str
+ try:
+ return json.loads(json_str)
+ except ValueError as err:
+ PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
+ return json_str
+
+ @staticmethod
+ def extract_policy_id(policy_name):
+ """ policy_name = policy_id + "." + <version> + "." + <extension>
+ For instance,
+ policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
+ policy_id = DCAE_alex.Config_alex_policy_number_1
+ policy_scope = DCAE_alex
+ policy_class = Config
+ policy_version = 3
+ type = extension = xml
+ delimiter = "."
+ policy_class_delimiter = "_"
+ policy_name in PAP = DCAE_alex.alex_policy_number_1
+ """
+ if not policy_name:
+ return
+ return PolicyUtils._policy_name_ext.sub('', policy_name)
+
+ @staticmethod
+ def parse_policy_config(policy):
+ """try parsing the config in policy."""
+ if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]:
+ policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(
+ policy[POLICY_BODY][POLICY_CONFIG])
+ return policy
+
+ @staticmethod
+ def convert_to_policy(policy_config):
+ """wrap policy_config received from policy-engine with policy_id."""
+ if not policy_config or POLICY_NAME not in policy_config \
+ or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]:
+ return
+ policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME])
+ if not policy_id:
+ return
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
+
+ @staticmethod
+ def select_latest_policy(policy_configs):
+ """For some reason, the policy-engine returns all version of the policy_configs.
+ DCAE-Controller is only interested in the latest version
+ """
+ if not policy_configs:
+ return
+ latest_policy_config = {}
+ for policy_config in policy_configs:
+ if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \
+ or not policy_config[POLICY_VERSION].isdigit():
+ continue
+ if not latest_policy_config \
+ or int(policy_config[POLICY_VERSION]) \
+ > int(latest_policy_config[POLICY_VERSION]):
+ latest_policy_config = policy_config
+
+ return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
+
+ @staticmethod
+ def select_latest_policies(policy_configs):
+ """For some reason, the policy-engine returns all version of the policy_configs.
+ DCAE-Controller is only interested in the latest versions
+ """
+ if not policy_configs:
+ return {}
+ policies = {}
+ for policy_config in policy_configs:
+ policy = PolicyUtils.convert_to_policy(policy_config)
+ if not policy or POLICY_ID not in policy or POLICY_BODY not in policy:
+ continue
+ if POLICY_VERSION not in policy[POLICY_BODY] \
+ or not policy[POLICY_BODY][POLICY_VERSION] \
+ or not policy[POLICY_BODY][POLICY_VERSION].isdigit():
+ continue
+ if policy[POLICY_ID] not in policies:
+ policies[policy[POLICY_ID]] = policy
+ continue
+ if int(policy[POLICY_BODY][POLICY_VERSION]) \
+ > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]):
+ policies[policy[POLICY_ID]] = policy
+
+ for policy_id in policies:
+ policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
+
+ return policies
+
+class PolicyRest(object):
+ """ policy-engine """
+ _logger = logging.getLogger("policy_handler.policy_rest")
+ _lazy_inited = False
+ _url = None
+ _headers = None
+ _target_entity = None
+ _thread_pool_size = 4
+ _scope_prefixes = None
+ _scope_thread_pool_size = 4
+ _policy_retry_count = 1
+ _policy_retry_sleep = 0
+
+ @staticmethod
+ def _lazy_init():
+ """init static config"""
+ if PolicyRest._lazy_inited:
+ return
+ PolicyRest._lazy_inited = True
+ config = Config.config[Config.FIELD_POLICY_ENGINE]
+ PolicyRest._url = config["url"] + config["path_api"]
+ PolicyRest._headers = config["headers"]
+ PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
+ PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4)
+ if PolicyRest._thread_pool_size < 2:
+ PolicyRest._thread_pool_size = 2
+ PolicyRest._scope_prefixes = Config.config["scope_prefixes"]
+ PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \
+ len(PolicyRest._scope_prefixes))
+
+ PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1
+ PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
+
+ PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \
+ PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \
+ json.dumps(PolicyRest._scope_prefixes))
+
+ @staticmethod
+ def _post(audit, path, json_body):
+ """Communication with the policy-engine"""
+ full_path = PolicyRest._url + path
+ sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \
+ targetServiceName=full_path)
+
+ msg = json.dumps(json_body)
+ headers = copy.copy(PolicyRest._headers)
+ headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id
+ headers_str = Audit.log_json_dumps(headers)
+
+ log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str)
+ sub_aud.metrics_start(log_line)
+ PolicyRest._logger.info(log_line)
+ res = None
+ try:
+ res = requests.post(full_path, json=json_body, headers=headers)
+ except requests.exceptions.RequestException as ex:
+ error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \
+ .format(full_path, str(ex), msg, headers_str)
+
+ PolicyRest._logger.exception(error_msg)
+ sub_aud.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ sub_aud.metrics(error_msg)
+ return (error_code, None)
+
+ log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \
+ full_path, res.status_code, msg, res.text, \
+ Audit.log_json_dumps(dict(res.request.headers.items())))
+ sub_aud.set_http_status_code(res.status_code)
+ sub_aud.metrics(log_line)
+ PolicyRest._logger.info(log_line)
+
+ if res.status_code == requests.codes.ok:
+ return res.status_code, res.json()
+
+ return res.status_code, None
+
+ @staticmethod
+ def get_latest_policy(aud_policy_name):
+ """Get the latest policy for the policy_name from the policy-engine"""
+ PolicyRest._lazy_init()
+ audit, policy_name = aud_policy_name
+
+ status_code = 0
+ latest_policy = None
+ for retry in xrange(1, PolicyRest._policy_retry_count + 1):
+ PolicyRest._logger.debug("%s", policy_name)
+ status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
+ {POLICY_NAME:policy_name})
+ PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \
+ json.dumps(policy_configs or []))
+ latest_policy = PolicyUtils.select_latest_policy(policy_configs)
+ if not latest_policy:
+ audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \
+ .format(policy_name, json.dumps(policy_configs or [])), \
+ errorCode=AuditResponseCode.DATA_ERROR.value, \
+ errorDescription=AuditResponseCode.get_human_text( \
+ AuditResponseCode.DATA_ERROR))
+
+ if latest_policy or not audit.retry_get_config \
+ or not PolicyRest._policy_retry_sleep \
+ or AuditResponseCode.PERMISSION_ERROR.value \
+ == AuditResponseCode.get_response_code(status_code).value:
+ break
+
+ if retry == PolicyRest._policy_retry_count:
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \
+ .format(POLICY_GET_CONFIG, retry, policy_name), \
+ errorCode=AuditResponseCode.DATA_ERROR.value, \
+ errorDescription=AuditResponseCode.get_human_text( \
+ AuditResponseCode.DATA_ERROR))
+ break
+
+ audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \
+ .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \
+ errorCode=AuditResponseCode.DATA_ERROR.value, \
+ errorDescription=AuditResponseCode.get_human_text( \
+ AuditResponseCode.DATA_ERROR))
+ time.sleep(PolicyRest._policy_retry_sleep)
+
+ audit.set_http_status_code(status_code)
+ if not latest_policy:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ return latest_policy
+
+ @staticmethod
+ def get_latest_policies_by_names(aud_policy_names):
+ """Get the latest policies of the list of policy_names from the policy-engine"""
+ PolicyRest._lazy_init()
+ audit, policy_names = aud_policy_names
+ if not policy_names:
+ return
+
+ audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \
+ len(policy_names), json.dumps(policy_names)))
+ PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names))
+
+ thread_count = min(PolicyRest._thread_pool_size, len(policy_names))
+ apns = [(audit, policy_name) for policy_name in policy_names]
+ policies = None
+ if thread_count == 1:
+ policies = [PolicyRest.get_latest_policy(apns[0])]
+ else:
+ pool = ThreadPool(thread_count)
+ policies = pool.map(PolicyRest.get_latest_policy, apns)
+ pool.close()
+ pool.join()
+
+ audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \
+ len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \
+ targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
+ policies = dict([(policy[POLICY_ID], policy) \
+ for policy in policies if policy and POLICY_ID in policy])
+ PolicyRest._logger.debug("policies %s", json.dumps(policies))
+ if not policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ return policies
+
+ @staticmethod
+ def _get_latest_policies(aud_scope_prefix):
+ """Get the latest policies of the same scope from the policy-engine"""
+ audit, scope_prefix = aud_scope_prefix
+ PolicyRest._logger.debug("%s", scope_prefix)
+ status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
+ {POLICY_NAME:scope_prefix + ".*"})
+ audit.set_http_status_code(status_code)
+ PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \
+ scope_prefix, json.dumps(policy_configs or []))
+ latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+
+ if not latest_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \
+ scope_prefix, json.dumps(policy_configs or [])), \
+ errorCode=AuditResponseCode.DATA_ERROR.value, \
+ errorDescription=AuditResponseCode.get_human_text( \
+ AuditResponseCode.DATA_ERROR))
+ return latest_policies
+
+ @staticmethod
+ def get_latest_policies(audit):
+ """Get the latest policies of the same scope from the policy-engine"""
+ PolicyRest._lazy_init()
+ PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes))
+
+ audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \
+ len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes)))
+ asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes]
+ latest_policies = None
+ if PolicyRest._scope_thread_pool_size == 1:
+ latest_policies = [PolicyRest._get_latest_policies(asps[0])]
+ else:
+ pool = ThreadPool(PolicyRest._scope_thread_pool_size)
+ latest_policies = pool.map(PolicyRest._get_latest_policies, asps)
+ pool.close()
+ pool.join()
+
+ audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \
+ len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \
+ len(latest_policies), json.dumps(latest_policies)), \
+ targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
+
+ latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items())
+ PolicyRest._logger.debug("latest_policies: %s %s", \
+ json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies))
+
+ return latest_policies
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
new file mode 100644
index 0000000..1f1539f
--- /dev/null
+++ b/policyhandler/policy_updater.py
@@ -0,0 +1,124 @@
+"""policy-updater thread"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+from Queue import Queue
+from threading import Thread, Lock
+
+from .policy_rest import PolicyRest
+from .deploy_handler import DeployHandler
+
+class PolicyUpdater(Thread):
+ """queue and handle the policy-updates in a separate thread"""
+ _logger = logging.getLogger("policy_handler.policy_updater")
+
+ def __init__(self):
+ """init static config of PolicyUpdater."""
+ Thread.__init__(self)
+ self.name = "policy_updater"
+ self.daemon = True
+
+ self._req_shutdown = None
+ self._req_catch_up = None
+
+ self._lock = Lock()
+ self._queue = Queue()
+
+ def enqueue(self, audit=None, policy_names=None):
+ """enqueue the policy-names"""
+ policy_names = policy_names or []
+ PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names))
+ self._queue.put((audit, policy_names))
+
+ def run(self):
+ """wait and run the policy-update in thread"""
+ while True:
+ PolicyUpdater._logger.info("waiting for policy-updates...")
+ audit, policy_names = self._queue.get()
+ PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names))
+ if not self._keep_running():
+ self._queue.task_done()
+ break
+ if self._on_catch_up():
+ continue
+
+ if not policy_names:
+ self._queue.task_done()
+ continue
+
+ updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names))
+ PolicyUpdater.policy_update(audit, updated_policies)
+ audit.audit_done()
+ self._queue.task_done()
+
+ PolicyUpdater._logger.info("exit policy-updater")
+
+ def _keep_running(self):
+ """thread-safe check whether to continue running"""
+ self._lock.acquire()
+ keep_running = not self._req_shutdown
+ self._lock.release()
+ if self._req_shutdown:
+ self._req_shutdown.audit_done()
+ return keep_running
+
+ def catch_up(self, audit):
+ """need to bring the latest policies to DCAE-Controller"""
+ self._lock.acquire()
+ self._req_catch_up = audit
+ self._lock.release()
+ self.enqueue()
+
+ def _on_catch_up(self):
+ """Bring the latest policies to DCAE-Controller"""
+ self._lock.acquire()
+ req_catch_up = self._req_catch_up
+ if self._req_catch_up:
+ self._req_catch_up = None
+ self._queue.task_done()
+ self._queue = Queue()
+ self._lock.release()
+ if not req_catch_up:
+ return False
+
+ PolicyUpdater._logger.info("catch_up")
+ latest_policies = PolicyRest.get_latest_policies(req_catch_up)
+ PolicyUpdater.policy_update(req_catch_up, latest_policies)
+ req_catch_up.audit_done()
+ return True
+
+ @staticmethod
+ def policy_update(audit, updated_policies):
+ """Invoke deploy-handler"""
+ if updated_policies:
+ PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies))
+ DeployHandler.policy_update(audit, updated_policies)
+
+ def shutdown(self, audit):
+ """Shutdown the policy-updater"""
+ PolicyUpdater._logger.info("shutdown policy-updater")
+ self._lock.acquire()
+ self._req_shutdown = audit
+ self._lock.release()
+ self.enqueue()
+ if self.is_alive():
+ self.join()
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
new file mode 100644
index 0000000..9a5ee19
--- /dev/null
+++ b/policyhandler/web_server.py
@@ -0,0 +1,104 @@
+"""web-service for policy_handler"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+from datetime import datetime
+import cherrypy
+
+from .config import Config
+from .onap.audit import Audit
+from .policy_rest import PolicyRest
+from .policy_engine import PolicyEngineClient
+
+class PolicyWeb(object):
+ """Main static class for REST API of policy-handler"""
+ logger = logging.getLogger("policy_handler.web_cherrypy")
+
+ @staticmethod
+ def run():
+ """run forever the web-server of the policy-handler"""
+ PolicyWeb.logger.info("policy_handler web-service at port(%d)...", \
+ Config.wservice_port)
+ cherrypy.config.update({"server.socket_host": "0.0.0.0", \
+ 'server.socket_port': Config.wservice_port})
+ cherrypy.tree.mount(PolicyLatest(), '/policy_latest')
+ cherrypy.tree.mount(PoliciesLatest(), '/policies_latest')
+ cherrypy.tree.mount(PoliciesCatchUp(), '/catch_up')
+ cherrypy.quickstart(Shutdown(), '/shutdown')
+
+class Shutdown(object):
+ """Shutdown the policy-handler"""
+ @cherrypy.expose
+ def index(self):
+ """shutdown event"""
+ audit = Audit(req_message="get /shutdown", headers=cherrypy.request.headers)
+ PolicyWeb.logger.info("--------- stopping REST API of policy-handler -----------")
+ cherrypy.engine.exit()
+ PolicyEngineClient.shutdown(audit)
+ PolicyWeb.logger.info("--------- the end -----------")
+ res = str(datetime.now())
+ audit.info_requested(res)
+ return "goodbye! shutdown requested {0}".format(res)
+
+class PoliciesLatest(object):
+ """REST API of the policy-hanlder"""
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def index(self):
+ """find the latest policy by policy_id or all latest policies"""
+ audit = Audit(req_message="get /policies_latest", headers=cherrypy.request.headers)
+ res = PolicyRest.get_latest_policies(audit) or {}
+ PolicyWeb.logger.info("PoliciesLatest: %s", json.dumps(res))
+ audit.audit_done(result=json.dumps(res))
+ return res
+
+@cherrypy.popargs('policy_id')
+class PolicyLatest(object):
+ """REST API of the policy-hanlder"""
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def index(self, policy_id):
+ """find the latest policy by policy_id or all latest policies"""
+ audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""), \
+ headers=cherrypy.request.headers)
+ PolicyWeb.logger.info("PolicyLatest policy_id=%s headers=%s", \
+ policy_id, json.dumps(cherrypy.request.headers))
+ res = PolicyRest.get_latest_policy((audit, policy_id)) or {}
+ PolicyWeb.logger.info("PolicyLatest policy_id=%s res=%s", policy_id, json.dumps(res))
+ audit.audit_done(result=json.dumps(res))
+ return res
+
+class PoliciesCatchUp(object):
+ """catch up with all DCAE policies"""
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def index(self):
+ """catch up with all policies"""
+ started = str(datetime.now())
+ audit = Audit(req_message="get /catch_up", headers=cherrypy.request.headers)
+ PolicyEngineClient.catch_up(audit)
+ res = {"catch-up requested": started}
+ PolicyWeb.logger.info("PoliciesCatchUp: %s", json.dumps(res))
+ audit.info_requested(started)
+ return res