From 1369bea8b3c24ef063799acefbfc01659878f034 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Wed, 10 Jan 2018 11:00:50 -0500 Subject: variable collection of policies per component * new feature variable collection of policies per component in DCAE * massive refactoring * dissolved the external PolicyEngine.py into policy_receiver.py - kept only the web-socket communication to PolicyEngine * new /healthcheck - shows some stats of service running * Unit Test coverage 75% Change-Id: I816b7d5713ae0dd88fa73d3656f272b4f3e7946e Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov --- .coveragerc | 3 +- Dockerfile | 3 - etc/config.json | 2 +- policyhandler/PolicyEngine.py | 1219 -------------------------------------- policyhandler/config.py | 61 +- policyhandler/deploy_handler.py | 35 +- policyhandler/discovery.py | 30 +- policyhandler/onap/audit.py | 142 +++-- policyhandler/onap/health.py | 104 ++++ policyhandler/policy_consts.py | 2 - policyhandler/policy_engine.py | 103 ---- policyhandler/policy_handler.py | 13 +- policyhandler/policy_receiver.py | 195 ++++++ policyhandler/policy_rest.py | 459 ++++++++------ policyhandler/policy_updater.py | 107 ++-- policyhandler/policy_utils.py | 134 +++++ policyhandler/web_server.py | 215 +++++-- pom.xml | 6 +- requirements.txt | 4 +- run_policy.sh | 3 + setup.py | 4 +- tests/test_policyhandler.py | 421 +++++++++++-- tox-local.ini | 1 + version.properties | 4 +- 24 files changed, 1452 insertions(+), 1818 deletions(-) delete mode 100644 policyhandler/PolicyEngine.py create mode 100644 policyhandler/onap/health.py delete mode 100644 policyhandler/policy_engine.py create mode 100644 policyhandler/policy_receiver.py create mode 100644 policyhandler/policy_utils.py diff --git a/.coveragerc b/.coveragerc index f059bcf..b4635f9 100644 --- a/.coveragerc +++ b/.coveragerc @@ -5,8 +5,7 @@ cover_pylib = False include = */policyhandler/*.py omit = # omit client libs - policyhandler/PolicyEngine.py - policyhandler/onap/CommonLogger.py + # policyhandler/onap/CommonLogger.py [report] # Regexes for lines to exclude from consideration diff --git a/Dockerfile b/Dockerfile index 10886f1..9d77db6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,11 +21,8 @@ COPY ./policyhandler/ ./policyhandler/ COPY ./etc/ ./etc/ RUN mkdir -p ${APPDIR}/logs \ - && mkdir -p ${APPDIR}/tmp \ - && mkdir -p ${APPDIR}/etc \ && chown -R ${APPUSER}:${APPUSER} ${APPDIR} \ && chmod a+w ${APPDIR}/logs \ - && chmod 700 ${APPDIR}/tmp \ && chmod 500 ${APPDIR}/etc \ && chmod 500 ${APPDIR}/run_policy.sh \ && ls -la && ls -la ./policyhandler diff --git a/etc/config.json b/etc/config.json index b83cd09..ea51ab9 100644 --- a/etc/config.json +++ b/etc/config.json @@ -1,5 +1,5 @@ { - "version" : "1.0.0", + "version" : "2.0.0", "wservice_port" : 25577, "policy_handler" : { "system" : "policy_handler" diff --git a/policyhandler/PolicyEngine.py b/policyhandler/PolicyEngine.py deleted file mode 100644 index 7a69894..0000000 --- a/policyhandler/PolicyEngine.py +++ /dev/null @@ -1,1219 +0,0 @@ -""" -PolicyEngine API for Python - -@author: Tarun, Mike -@version: 0.9 -@change: - added Enum 'Other' Type and supported it in PolicyConfig class - 1/13 - supporting Remote URL capability to the PolicyEngine as the parameter - 1/13 - Request format has been updated accordingly. No need to send the PDP URLs anymore - 1/26 - Feature where the PolicyEngine chooses available PyPDP among different URLs. 1/26 - Major feature addition required for Notifications 2/17 , Fixed Session issues. 2/25 - Major change in API structure for combining results 3/4. - Added Security support for Notifications and clearNotification Method 3/18 - newMethod for retrieving configuration using policyFileName 3/20 - logging 3/24 - Notification Bug Fixes 7/21 - basic Auth 7/22 - Notification Changes 9/3 - ECOMP Error codes included 10/1 - -2016 - Major Changes to the Policy Engine API 3/29 - DeletePolicy API and Changes to pushPolicy and getConfig 5/27 - ConfigRequestParmeters and Error code Change 7/11 - New Environment Variable and Client Authorizations Change 7/21 - Changes to the Policy Parameters 8/21 - Allow Clients to use their own Password Protection. - Dictionary Types and its Fixes. -""" - -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -import json,sys, os, collections, websocket , logging, time, base64, uuid -from websocket import create_connection -from enum import Enum -from xml.etree.ElementTree import XML -try: - # For Python 3.0 and Later - from pip._vendor import requests -except ImportError: - # For backend Support to Python 2's urllib2 - import requests -try: - # For Python 3.0 and Later - from urllib.request import urlopen -except ImportError: - # Fall back for Python 2.* - from urllib2 import urlopen -try: - import thread -except ImportError: - #Fall Back for Python 2.x - import _thread as thread - -PolicyConfigStatus = Enum('PolicyConfigStatus', 'CONFIG_NOT_FOUND CONFIG_RETRIEVED') -PolicyType = Enum('PolicyType', 'JSON XML PROPERTIES OTHER') -PolicyResponseStatus = Enum('PolicyResponseStatus', 'ACTION_ADVISED ACTION_TAKEN NO_ACTION_REQUIRED') -NotificationType = Enum('NotificationType', 'BOTH UPDATE REMOVE') -UpdateType = Enum('UpdateType', 'UPDATE NEW') -NotificationScheme = Enum('NotificationScheme', 'AUTO_ALL_NOTIFICATIONS AUTO_NOTIFICATIONS MANUAL_ALL_NOTIFICATIONS MANUAL_NOTIFICATIONS') -AttributeType = Enum('AttributeType', 'MATCHING MICROSERVICE RULE SETTINGS') -PolicyClass = Enum('PolicyClass', 'Action Config Decision') -PolicyConfigType = Enum('PolicyConfigType', 'Base BRMS_PARAM BRMS_RAW ClosedLoop_Fault ClosedLoop_PM Firewall MicroService Extended') -DeletePolicyCondition = Enum('DeletePolicyCondition', 'ONE ALL') -RuleProvider = Enum('RuleProvider', 'Custom AAF GUARD_YAML') -DictionaryType = Enum('DictionaryType', 'Common Action ClosedLoop Firewall Decision BRMS GOC MicroService DescriptiveScope PolicyScope Enforcer SafePolicy' ) -ImportType = Enum('ImportType', 'MICROSERVICE') - -class PolicyEngine: - """ - PolicyEngine is the Class which needs to be instantiated to call the PDP server. - It needs the *.properties* file path as the parameter to the constructor. - """ - def __init__(self, filename, scheme=None, handler=None, clientKey=None, basic_client_auth=True): - """ - @param filename: String format of the path location of .properties file Could also be A remote URL eg: http://localhost:8080/config.properties - @param scheme: NotificationScheme to select the scheme required for notification updates. - @param handler: NotificationHandler object which will be called when an event is occurred. - @param clientKey: Decoded Client Key to be used by PolicyEngine. - @attention:The .properties file must contain the PYPDP_URL parameter in it. The parameter can have multiple URLs the PolicyEngine chooses the available PyPDP among them. - """ - self.filename = filename - self.urldict = {} - self.matchStore = [] - self.autoURL = None - self.scheme = None - self.handler = None - self.thread = thread - self.autows = None - self.mclose = False - self.restart = False - self.logger = logging.getLogger() - self.resturl= [] - self.encoded= [] - self.clientInfo = None - self.environment = None - self.policyheader = {} - if(filename.startswith("http")): - try: - policy_data = urlopen(filename) - for line in policy_data : - line = line.decode('utf-8') - line = line.rstrip() # removes trailing whitespace and '\n' chars - line = line.replace(" ","") # removing spaces - if "=" not in line: continue #skips blanks and comments w/o = - if line.startswith("#"): continue #skips comments which contain = - key, value = line.split("=",1) - key = key.rstrip().lstrip() - value = value.lstrip() - #print("key= "+key+" Value =" +value ) - self.urldict[key] = value - except: - self.logger.error("PE300 - Data Issue: Error with the Config URL: %s" , filename ) - print("PE300 - Data Issue: Config Properties URL Error") - sys.exit(0) - else : - fileExtension = os.path.splitext(filename) - if(fileExtension[1]!=".properties"): - self.logger.error("PE300 - Data Issue: File is not in properties format: %s", filename) - print("PE300 - Data Issue: Not a .properties file!") - sys.exit(0) - try : - with open(self.filename, 'r') as f: - for line in f: - line = line.rstrip() # removes trailing whitespace and '\n' chars - line = line.replace(" ","") # removing spaces - if "=" not in line: continue #skips blanks and comments w/o = - if line.startswith("#"): continue #skips comments which contain = - key, value = line.split("=",1) - key = key.rstrip().lstrip() - value = value.lstrip() - # self.logger.info("key=%s Value=%s", key, value) - self.urldict[key] = value - except FileNotFoundError: - self.logger.error("PE300 - Data Issue: File Not found: %s", filename) - print("PE300 - Data Issue: File Doesn't exist in the given Location") - sys.exit(0) - #TODO logic for best available PyPDP servers - try: - self.urldict = collections.OrderedDict(sorted(self.urldict.items())) - clientID = self.urldict.get("CLIENT_ID") - # self.logger.info("clientID decoded %s", base64.b64decode(clientID).decode("utf-8")) - # client_parts = base64.b64decode(clientID).split(":") - # client_parts = clientID.split(":") - # self.logger.info("ClientAuth:Basic %s", base64.b64encode(clientID)) - # self.logger.info("CLIENT_ID[0] = %s", client_parts[0]) - # self.logger.info("CLIENT_ID[0] base64 = %s", base64.b64encode(client_parts[0])) - # self.logger.info("CLIENT_KEY base64 = %s", base64.b64encode(client_parts[1])) - if(clientKey is None): - try: - client = base64.b64decode(self.urldict.get("CLIENT_KEY")).decode("utf-8") - except Exception: - self.logger.warn("PE300 - Data Issue: CLIENT_KEY parameter is not in the required encoded Format taking Value as clear Text") - client = self.urldict.get("CLIENT_KEY") - else: - client = clientKey - if(clientID is None or client is None): - self.logger.error("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file: %s ", filename) - print("PE300 - Data Issue: No CLIENT_ID and/or CLIENT_KEY parameter found in the properties file") - sys.exit(0) - else: - uid = clientID.encode('ascii') - password = client.encode('ascii') - self.clientInfo = base64.b64encode(uid+ b':'+password).decode('utf-8') - self.environment = self.urldict.get("ENVIRONMENT") - if(self.environment is None): - self.logger.info("Missing Environment Variable setting to Default Value.") - self.environment = "DEVL" - self.policyheader = { - "Content-type" : "application/json", - "Accept" : "application/json", - "ClientAuth" : ("Basic " if basic_client_auth else "") + self.clientInfo, - "Environment" : self.environment - } - for key in self.urldict.keys(): - if(key.startswith("PYPDP_URL")): - pypdpVal = self.urldict.get(key) - if pypdpVal is None: - self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename) - print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file") - sys.exit(0) - if ";" in pypdpVal: - pdpDefault = pypdpVal.split(";") - if pdpDefault is None: - self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename) - print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file") - sys.exit(0) - else: - for count in range(0, len(pdpDefault)): - self.__pdpParam(pdpDefault[count]) - else: - self.__pdpParam(pypdpVal) - - self.logger.info("PolicyEngine url: %s policyheader: %s urldict: %s", \ - self.resturl, json.dumps(self.policyheader), json.dumps(self.urldict)) - if len(self.resturl)==0: - self.logger.error("PE300 - Data Issue: No PYPDP_URL Parameter found in the properties file: %s ", filename) - print("PE300 - Data Issue: No PYPDP_URL parameter found in the properties file") - sys.exit(0) - except: - self.logger.error("PE300 - Data Issue: missing parameter(s) in the properties file: %s ", filename) - print("PE300 - Data Issue: missing parameter(s) in the properties file") - sys.exit(0) - # Scheme and Handler code to be handled from here. - if handler is not None: - #if type(handler) is NotificationHandler: - self.handler = handler - #else: - # print("handler should be a object of NotificationHandler class") - # sys.exit(0) - if scheme is not None: - if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)): - # setup the Auto settings. - self.scheme = scheme - elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)): - # setup the Manual Settings - self.scheme = scheme - else: - self.logger.error("PE300 - Data Issue: Scheme not a type of NotificationScheme: %s", scheme.name) - print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ") - sys.exit(0) - - def __pdpParam(self,pdpValue): - """ - Internal Usage for reading PyPDP Parameters - """ - if pdpValue is None: - self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request") - print("PE100 - Permissions Error: No Enough Credentials to send Request") - sys.exit(0) - elif "," in pdpValue: - pdpValues = pdpValue.split(",") - if (len(pdpValues)==3): - # 0 is pypdp URL - self.resturl.append(pdpValues[0]) - # 1 and 2 are user name password - if pdpValues[1] and pdpValues[2]: - uid = pdpValues[1].encode('ascii') - password = pdpValues[2].encode('ascii') - encoded = base64.b64encode(uid+ b':'+password).decode('utf-8') - self.encoded.append(encoded) - else: - self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request") - print("PE100 - Permissions Error: No Enough Credentials to send Request") - sys.exit(0) - else: - self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request") - print("PE100 - Permissions Error: No Enough Credentials to send Request") - sys.exit(0) - else: - self.logger.error("PE100 - Permissions Error: No Enough Credentials to send Request") - print("PE100 - Permissions Error: No Enough Credentials to send Request") - sys.exit(0) - - def getConfigByPolicyName(self, policyName, requestID=None): - """ - @param policyName: String format of the PolicyFile Name whose configuration is required. - @return: Returns a List of PolicyConfig Object(s). - @deprecated: use getConfig instead. - """ - __policyNameURL = "/getConfigByPolicyName" - __headers = self.policyheader - if requestID is not None: - __headers["X-ECOMP-RequestID"] = str(requestID) - else: - __headers["X-ECOMP-RequestID"] = str(uuid.uuid4()) - self.__policyNamejson = {} - self.__policyNamejson['policyName'] = policyName - self.__cpnResponse = self.__callPDP(__policyNameURL, json.dumps(self.__policyNamejson), __headers, "POST") - self.__cpnJSON = self.__cpnResponse.json() - policyConfigs= self.__configResponse(self.__cpnJSON) - return policyConfigs - - def listConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None): - """ - listConfig function calls the PDP for the configuration required using the parameters and returns the PDP response. - @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required. - @param configName: String of the configName whose configuration is required. Not Mandatory field. - @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field. - @param policyName: String of the policyName whose configuration is required. - @param unique: Boolean value which can be set to True if Unique results are required. - @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated. - @return: Returns a List of PolicyNames. - """ - __configURL = "/listConfig" - __headers = self.policyheader - if requestID is not None: - __headers["X-ECOMP-RequestID"] = str(requestID) - else: - __headers["X-ECOMP-RequestID"] = str(uuid.uuid4()) - __configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique) - #self.__configjson['pdp_URL'] = self.pdp_url - __cResponse = self.__callPDP(__configURL, json.dumps(__configjson), __headers, "POST") - return __cResponse.json() - - - def getConfig(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False, requestID=None): - """ - getConfig function calls the PDP for the configuration required using the parameters and returns the PDP response. - @param eCOMPComponentName: String of the eCOMPComponentName whose configuration is required. - @param configName: String of the configName whose configuration is required. Not Mandatory field. - @param configAttributes: Dictionary of the config attributes in Key and Value String format. Not mandatory field. - @param policyName: String of the policyName whose configuration is required. - @param unique: Boolean value which can be set to True if Unique results are required. - @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated. - @return: Returns a List of PolicyConfig Object(s). - """ - __configURL = "/getConfig" - __headers = self.policyheader - if requestID is not None: - __headers["X-ECOMP-RequestID"] = str(requestID) - else: - __headers["X-ECOMP-RequestID"] = str(uuid.uuid4()) - self.__configjson = self.__configRequestParametersJSON(eCOMPComponentName, configName, configAttributes, policyName, unique) - #self.__configjson['pdp_URL'] = self.pdp_url - self.__cResponse = self.__callPDP(__configURL, json.dumps(self.__configjson), __headers, "POST") - #self.__configURL = self.resturl+__configURL - #self.__cResponse = requests.post(self.__configURL, data=json.dumps(self.__configjson), headers = __headers) - self.__cJSON = self.__cResponse.json() - policyConfigs= self.__configResponse(self.__cJSON) - # if we have successfully retrieved a policy we will store the match values. - matchFound = False - for policyConfig in policyConfigs: - if policyConfig._policyConfigStatus == PolicyConfigStatus.CONFIG_RETRIEVED.name: - # self.logger.info("Policy has been Retrieved !!") - matchFound = True - if matchFound: - __match = {} - __match["ECOMPName"] = eCOMPComponentName - if configName is not None: - __match["ConfigName"] = configName - if configAttributes is not None: - __match.update(configAttributes) - if not self.matchStore: - self.matchStore.append(__match) - else: - __booMatch = False - for eachDict in self.matchStore: - if eachDict==__match: - __booMatch = True - break - if __booMatch==False: - self.matchStore.append(__match) - return policyConfigs - - def __configRequestParametersJSON(self, eCOMPComponentName=None, configName=None, configAttributes=None, policyName=None, unique= False): - """ Internal Function to set JSON from configRequestParameters - """ - json= {} - if eCOMPComponentName is not None: - json['ecompName'] = eCOMPComponentName - if configName is not None: - json['configName'] = configName - if configAttributes is not None: - json['configAttributes'] = configAttributes - if policyName is not None: - json['policyName'] = policyName - json['unique'] = unique - return json - - def __configResponse(self, cJSON): - """ - Internal function to take the convert JSON to Response Object. - """ - policyConfigs=[] - for configJSON in cJSON: - policyConfig = PolicyConfig() - policyConfig._policyConfigMessage = configJSON['policyConfigMessage'] - policyConfig._policyConfigStatus = configJSON['policyConfigStatus'] - policyConfig._policyType = configJSON['type'] - policyConfig._policyName = configJSON['policyName'] - policyConfig._policyVersion = configJSON['policyVersion'] - policyConfig._matchingConditions = configJSON['matchingConditions'] - policyConfig._responseAttributes = configJSON['responseAttributes'] - if PolicyType.JSON.name == policyConfig._policyType: - policyConfig._json = configJSON['config'] - elif PolicyType.XML.name == policyConfig._policyType: - policyConfig._xml = XML(configJSON['config']) - elif PolicyType.PROPERTIES.name == policyConfig._policyType: - policyConfig._properties = configJSON['property'] - elif PolicyType.OTHER.name == policyConfig._policyType: - policyConfig._other = configJSON['config'] - policyConfigs.append(policyConfig) - return policyConfigs - - def getDecision(self, decisionAttributes, ecompcomponentName, requestID = None): - """ - getDecision function sends the Decision Attributes to the PDP server and gets the response to the client from PDP. - @param decisionAttributes: Dictionary of Decision Attributes in Key and Value String formats. - @param ecompcomponentName: - @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated. - @return: Returns a DecisionResponse Object. - """ - __decisionurl = "/getDecision" - __headers = self.policyheader - if requestID is not None: - __headers["X-ECOMP-RequestID"] = str(requestID) - else: - __headers["X-ECOMP-RequestID"] = str(uuid.uuid4()) - self.__decisionjson={} - self.__decisionjson['decisionAttributes'] = decisionAttributes - self.__decisionjson['ecompcomponentName'] = ecompcomponentName - self.__dResponse = self.__callPDP(__decisionurl, json.dumps(self.__decisionjson), __headers, "POST") - self.__dJSON = self.__dResponse.json() - decisionResponse = DecisionResponse() - decisionResponse._decision = self.__dJSON['decision'] - decisionResponse._details = self.__dJSON['details'] - return decisionResponse - - def sendEvent(self, eventAttributes, requestID=None): - """ - sendEvent function sends the Event to the PDP server and gets the response to the client from the PDP. - @param eventAttributes:Dictonary of the EventAttributes in Key and Value String formats. - @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated. - @return: Returns a List of PolicyResponse Object(s). - """ - __eventurl = "/sendEvent" - __headers = self.policyheader - if requestID is not None: - __headers["X-ECOMP-RequestID"] = str(requestID) - else: - __headers["X-ECOMP-RequestID"] = str(uuid.uuid4()) - self.__eventjson = {} - self.__eventjson['eventAttributes'] = eventAttributes - #self.__eventjson['pdp_URL'] = self.pdp_url - self.__eResponse = self.__callPDP(__eventurl, json.dumps(self.__eventjson), __headers, "POST") - #self.__eventurl = self.resturl+__eventurl - #self.__eResponse = requests.post(self.__eventurl, data=json.dumps(self.__eventjson), headers = __headers) - self.__eJSON = self.__eResponse.json() - policyResponses=[] - for eventJSON in self.__eJSON: - policyResponse = PolicyResponse() - policyResponse._policyResponseMessage = eventJSON['policyResponseMessage'] - policyResponse._policyResponseStatus = eventJSON['policyResponseStatus'] - policyResponse._actionAdvised = eventJSON['actionAdvised'] - policyResponse._actionTaken = eventJSON['actionTaken'] - policyResponse._requestAttributes = eventJSON['requestAttributes'] - policyResponses.append(policyResponse) - return policyResponses - - def createPolicy(self, policyParameters): - """ - 'createPolicy creates Policy using the policyParameters sent' - @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method. - @return: Returns a PolicyChangeResponse Object - """ - __createurl = "/createPolicy" - __headers = self.policyheader - try: - if policyParameters._requestID is None: - policyParameters._requestID = str(uuid.uuid4()) - self.__createJson = {} - self.__createJson = self.__policyParametersJSON(policyParameters) - self.__createResponse = self.__callPDP(__createurl, json.dumps(self.__createJson), __headers, "PUT") - policyChangeResponse = PolicyChangeResponse() - policyChangeResponse._responseCode = self.__createResponse.status_code - policyChangeResponse._responseMessage = self.__createResponse.text - return policyChangeResponse - except: - self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ") - print("PE300 - Data Issue: policyParamters object Error") - - def updatePolicy(self, policyParameters): - """ - 'updatePolicy updates Policy using the policyParameters sent.' - @param policyParameters: This is an object of PolicyParameters which is required as a parameter to this method. - @return: Returns a PolicyChangeResponse Object - """ - __updateurl = "/updatePolicy" - __headers = self.policyheader - try: - if policyParameters._requestID is None: - policyParameters._requestID = str(uuid.uuid4()) - self.__updateJson = {} - self.__updateJson = self.__policyParametersJSON(policyParameters) - self.__updateResponse = self.__callPDP(__updateurl, json.dumps(self.__updateJson), __headers, "PUT") - policyChangeResponse = PolicyChangeResponse() - policyChangeResponse._responseCode = self.__updateResponse.status_code - policyChangeResponse._responseMessage = self.__updateResponse.text - return policyChangeResponse - except: - self.logger.error("PE300 - Data Issue: Error with the policyParameters Object. It needs to be object of PolicyParameters ") - print("PE300 - Data Issue: policyParamters object Error") - - def __policyParametersJSON(self, policyParameters): - """ Internal Function to set JSON from policyParameters Object - """ - json= {} - if policyParameters._actionAttribute is not None: - json['actionAttribute'] = policyParameters._actionAttribute - if policyParameters._actionPerformer is not None: - json['actionPerformer'] = policyParameters._actionPerformer - if policyParameters._attributes is not None: - json['attributes'] = policyParameters._attributes - if policyParameters._configBody is not None: - json['configBody'] = policyParameters._configBody - if policyParameters._configBodyType is not None: - json['configBodyType'] = policyParameters._configBodyType - if policyParameters._configName is not None: - json['configName'] = policyParameters._configName - if policyParameters._controllerName is not None: - json['controllerName'] = policyParameters._controllerName - if policyParameters._dependencyNames is not None: - json['dependencyNames'] = policyParameters._dependencyNames - if policyParameters._dynamicRuleAlgorithmLabels is not None: - json['dynamicRuleAlgorithmLabels'] = policyParameters._dynamicRuleAlgorithmLabels - if policyParameters._dynamicRuleAlgorithmField1 is not None: - json['dynamicRuleAlgorithmField1'] = policyParameters._dynamicRuleAlgorithmField1 - if policyParameters._dynamicRuleAlgorithmField2 is not None: - json['dynamicRuleAlgorithmField2'] = policyParameters._dynamicRuleAlgorithmField2 - if policyParameters._dynamicRuleAlgorithmFunctions is not None: - json['dynamicRuleAlgorithmFunctions'] = policyParameters._dynamicRuleAlgorithmFunctions - if policyParameters._ecompName is not None: - json['ecompName'] = policyParameters._ecompName - if policyParameters._extendedOption is not None: - json['extendedOption'] = policyParameters._extendedOption - if policyParameters._guard is not None: - json['guard'] = policyParameters._guard - if policyParameters._policyClass is not None: - json['policyClass'] = policyParameters._policyClass - if policyParameters._policyConfigType is not None: - json['policyConfigType'] = policyParameters._policyConfigType - if policyParameters._policyName is not None: - json['policyName'] = policyParameters._policyName - if policyParameters._policyDescription is not None: - json['policyDescription'] = policyParameters._policyDescription - if policyParameters._priority is not None: - json['priority'] = policyParameters._priority - if policyParameters._requestID is not None: - json['requestID'] = policyParameters._requestID - if policyParameters._riskLevel is not None: - json['riskLevel'] = policyParameters._riskLevel - if policyParameters._riskType is not None: - json['riskType'] = policyParameters._riskType - if policyParameters._ruleProvider is not None: - json['ruleProvider'] = policyParameters._ruleProvider - if policyParameters._ttlDate is not None: - json['ttlDate'] = policyParameters._ttlDate - return json - - def pushPolicy(self, pushPolicyParameters, requestID = None): - """ - 'pushPolicy pushes a policy based on the given Push Policy Parameters. ' - @param pushPolicyParameters: This is an object of PushPolicyParameters which is required as a parameter to this method. - @param requestID: unique UUID for the request. Not mandatory field. If not provided, a value will be automatically generated. - @return: Returns a PolicyChangeResponse Object - """ - __pushurl = "/pushPolicy" - __headers = self.policyheader - if requestID is not None: - __headers["X-ECOMP-RequestID"] = str(requestID) - else: - __headers["X-ECOMP-RequestID"] = str(uuid.uuid4()) - try: - self.__pushJson = {} - self.__pushJson['pdpGroup'] = pushPolicyParameters._pdpGroup - self.__pushJson['policyName'] = pushPolicyParameters._policyName - self.__pushJson['policyType'] = pushPolicyParameters._policyType - self.__pushResponse = self.__callPDP(__pushurl, json.dumps(self.__pushJson), __headers, "PUT") - policyChangeResponse = PolicyChangeResponse() - policyChangeResponse._responseCode = self.__pushResponse.status_code - policyChangeResponse._responseMessage = self.__pushResponse.text - return policyChangeResponse - except: - self.logger.error("PE300 - Data Issue: Error with the pushPolicyParameters Object. It needs to be object of PushPolicyParameters ") - print("PE300 - Data Issue: pushPolicyParamters object Error") - - def deletePolicy(self, deletePolicyParameters): - """ - 'deletePolicy Deletes a policy or all its version according to the given deletePolicyParameters' - @param deletePolicyParameters: This is an Object of DeletePolicyParameters which is required as a parameter to this method. - @return: Returns a PolicyChangeResponse Object - """ - __deleteurl = "/deletePolicy" - __createdictionaryurl = "/createDictionaryItem" - __headers = self.policyheader - try: - if deletePolicyParameters._requestID is None: - deletePolicyParameters._requestID = str(uuid.uuid4()) - self.__deleteJson = {} - self.__deleteJson['deleteCondition'] = deletePolicyParameters._deleteCondition - self.__deleteJson['pdpGroup'] = deletePolicyParameters._pdpGroup - self.__deleteJson['policyComponent'] = deletePolicyParameters._policyComponent - self.__deleteJson['policyName'] = deletePolicyParameters._policyName - self.__deleteJson['policyType'] = deletePolicyParameters._policyType - self.__deleteJson['requestID'] = deletePolicyParameters._requestID - self.__deleteResponse = self.__callPDP(__deleteurl, json.dumps(self.__deleteJson), self.policyheader, "DELETE") - policyChangeResponse = PolicyChangeResponse() - policyChangeResponse._responseCode = self.__deleteResponse.status_code - policyChangeResponse._responseMessage = self.__deleteResponse.text - return policyChangeResponse - except: - self.logger.error("PE300 - Data Issue: Error with the deletePolicyParameters Object. It needs to be object of DeletePolicyParameters ") - print("PE300 - Data Issue: deletePolicyParameters object Error") - - def createDictionaryItems(self, dictionaryParameters): - """ - 'createDictionaryItems adds dictionary items to the database for a specific dictionary' - @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method - @return: Returns a DictionaryResponse object - """ - __createdictionaryurl = '/createDictionaryItem' - __headers = self.policyheader - try: - if dictionaryParameters._requestID is None: - dictionaryParameters._requestID = str(uuid.uuid4()) - self.__json={} - self.__json['dictionaryType'] = dictionaryParameters._dictionaryType - self.__json['dictionary'] = dictionaryParameters._dictionary - self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson - self.__json['requestID'] = dictionaryParameters._requestID - self.__createResponse = self.__callPDP(__createdictionaryurl, json.dumps(self.__json), __headers, "PUT") - dictionaryResponse = DictionaryResponse() - dictionaryResponse._responseCode = self.__createResponse.status_code - dictionaryResponse._responseMessage = self.__createResponse.text - return dictionaryResponse - except: - self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ") - print("PE300 - Data Issue: dictionaryParameters object Error") - - - def updateDictionaryItems(self, dictionaryParameters): - """ - 'updateDictionaryItems edits dictionary items in the database for a specific dictionary' - @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method - @return: Returns a DictionaryResponse object - """ - __updatedictionaryurl = '/updateDictionaryItem' - __headers = self.policyheader - try: - if dictionaryParameters._requestID is None: - dictionaryParameters._requestID = str(uuid.uuid4()) - self.__json={} - self.__json['dictionaryType'] = dictionaryParameters._dictionaryType - self.__json['dictionary'] = dictionaryParameters._dictionary - self.__json['dictionaryJson'] = dictionaryParameters._dictionaryJson - self.__json['requestID'] = dictionaryParameters._requestID - self.__updateResponse = self.__callPDP(__updatedictionaryurl, json.dumps(self.__json), __headers, "PUT") - dictionaryResponse = DictionaryResponse() - dictionaryResponse._responseCode = self.__updateResponse.status_code - dictionaryResponse._responseMessage = self.__updateResponse.text - return dictionaryResponse - except: - self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ") - print("PE300 - Data Issue: dictionaryParameters object Error") - - def getDictionaryItems(self, dictionaryParameters): - """ - 'getDictionaryItems gets all the dictionary items stored in the database for a specified dictionary' - @param dictionaryParameters: This is an Object of DictionaryParameters which is required as a parameter to this method. - @return: Returns a DictionaryResponse object - """ - __retrievedictionaryurl = "/getDictionaryItems" - __headers = self.policyheader - try: - if dictionaryParameters._requestID is None: - dictionaryParameters._requestID = str(uuid.uuid4()) - self.__json = {} - self.__json['dictionaryType'] = dictionaryParameters._dictionaryType - self.__json['dictionary'] = dictionaryParameters._dictionary - self.__json['requestID'] = dictionaryParameters._requestID - self.__getResponse = self.__callPDP(__retrievedictionaryurl, json.dumps(self.__json), __headers, "POST") - dictionaryResponse = DictionaryResponse() - dictionaryResponse._responseCode = self.__getResponse.status_code - dictionaryResponse._responseMessage = self.__getResponse.text - return dictionaryResponse - except: - self.logger.error("PE300 - Data Issue: Error with the dictionaryParameters object. It needs to be object of DictionaryParameters ") - print("PE300 - Data Issue: dictionaryParameters object Error") - - def getNotification(self): - """ - gets the PDPNotification if the appropriate NotificationScheme is selected. - @return: Returns a PDPNotification Object. - """ - if ((self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)): - # Manual Client for websocket Code in here. - if(self.resturl[0].startswith("https")): - __man_url = self.resturl[0].replace("https","wss")+"notifications" - else: - __man_url = self.resturl[0].replace("http","ws")+"notifications" - __result = self.__manualRequest(__man_url) - self.logger.debug("Manual Notification with server: %s \n result is: %s" , __man_url , __result) - # TODO convert the result to PDP Notifications. - if (self.scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name): - # need to add all the values to the PDPNotification.. - pDPNotification = PDPNotification() - boo_Remove = False - boo_Update = False - if __result is None: - return None - if __result['removedPolicies']: - removedPolicies = [] - for removed in __result['removedPolicies']: - removedPolicy = RemovedPolicy() - removedPolicy._policyName = removed['policyName'] - removedPolicy._policyVersion = removed['versionNo'] - removedPolicies.append(removedPolicy) - pDPNotification._removedPolicies= removedPolicies - boo_Remove = True - if __result['loadedPolicies']: - updatedPolicies = [] - for updated in __result['loadedPolicies']: - updatedPolicy = LoadedPolicy() - updatedPolicy._policyName = updated['policyName'] - updatedPolicy._policyVersion = updated['versionNo'] - updatedPolicy._matchingConditions = updated['matches'] - updatedPolicy._updateType = updated['updateType'] - updatedPolicies.append(updatedPolicy) - pDPNotification._loadedPolicies= updatedPolicies - boo_Update = True - if (boo_Update and boo_Remove): - pDPNotification._notificationType = NotificationType.BOTH.name - elif boo_Update: - pDPNotification._notificationType = NotificationType.UPDATE.name - elif boo_Remove: - pDPNotification._notificationType = NotificationType.REMOVE.name - return pDPNotification - elif (self.scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name): - return self.__checkNotification(__result) - else: - return None - - def setNotification(self, scheme, handler = None): - """ - setNotification allows changes to the NotificationScheme and the NotificationHandler. - @param scheme: NotificationScheme to select the scheme required for notification updates. - @param handler: NotificationHandler object which will be called when an event is occurred. - """ - if handler is not None: - #if type(handler) is NotificationHandler: - self.handler = handler - #else: - # print("Error: handler should be a object of NotificationHandler class") - if scheme is not None: - if ((scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)): - # setup the Auto settings. - self.scheme = scheme - self.__startAuto() - elif ((scheme == NotificationScheme.MANUAL_ALL_NOTIFICATIONS.name)or(scheme == NotificationScheme.MANUAL_NOTIFICATIONS.name)): - # setup the Manual Settings - self.scheme = scheme - else: - print("PE300 - Data Issue: scheme must be a Type of NotificationScheme Enumeration ") - - def clearNotification(self): - """ - clearNotification ShutsDown the AutoNotification service if running. - """ - if self.scheme is not None: - if((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)): - if self.autows.sock is not None: - if(self.autows.sock.connected): - self.mclose = True - self.autows.close() - self.logger.info("Notification Service Stopped.") - print("Notification Service is Stopped!!") - - def __callPDP(self,urlFunction, jsonData, headerData,method, files= None, params = None): - """ - This function call is for internal usage purpose only. - Calls the available PyPDP - """ - connected = False - response = None - errormessage = '' - for count in range(0, len(self.resturl)): - try: - logging.basicConfig(level=logging.DEBUG) - request_url = self.resturl[0]+ urlFunction - self.logger.debug("--- Sending Request to : %s",request_url) - try: - self.logger.debug("Request ID %s :",headerData["X-ECOMP-RequestID"]) - except: - if jsonData is not None: - self.logger.debug("Request ID %s :",json.loads(jsonData)['requestID']) - self.logger.debug("Request Data is: %s" ,jsonData) - headerData["Authorization"]= "Basic " + self.encoded[0] - if(method=="PUT"): - response = requests.put(request_url, data = jsonData, headers = headerData) - elif(method=="DELETE"): - response = requests.delete(request_url, data = jsonData, headers = headerData) - elif(method=="POST"): - if params is not None: - # files = files, params = params, - response = requests.post(request_url, params = params, headers = headerData) - else: - response = requests.post(request_url, data = jsonData, headers = headerData) - # when using self-signed server certificate, comment previous line and uncomment following: - #response = requests.post(request_url, data = jsonData, headers = headerData, verify=False) - self.logger.debug("--- Response is : ---") - self.logger.debug(response.status_code) - self.logger.debug(response.headers) - self.logger.debug(response.text) - if(response.status_code == 200) : - connected = True - self.logger.info("connected to the PyPDP: %s", request_url) - break - elif(response.status_code == 202) : - connected = True - break - elif(response.status_code == 400): - self.logger.debug("PE400 - Schema Issue: Incorrect Params passed: %s %s", self.resturl[0], response.status_code) - errormessage+="\n PE400 - Schema Issue: Incorrect Params passed: "+ self.resturl[0] - self.__rotatePDP() - elif(response.status_code == 401): - self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code) - errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0] - self.__rotatePDP() - elif(response.status_code == 403): - self.logger.debug("PE100 - Permissions Error: PyPDP Error: %s %s", self.resturl[0], response.status_code) - errormessage+="\n PE100 - Permissions Error: PyPDP Error: "+ self.resturl[0] - self.__rotatePDP() - else: - self.logger.debug("PE200 - System Error: PyPDP Error: %s %s", self.resturl[0], response.status_code) - errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0] - self.__rotatePDP() - except Exception as e: - print(str(e)); - self.logger.debug("PE200 - System Error: PyPDP Error: %s", self.resturl[0]) - errormessage+="\n PE200 - System Error: PyPDP Error: "+ self.resturl[0] - self.__rotatePDP() - if(connected): - if(self.autoURL==None): - self.__startAuto() - elif(self.autoURL!= self.resturl[0]): - self.__startAuto() - return response - else: - self.logger.error("PE200 - System Error: cannot connect to given PYPDPServer(s) %s", self.resturl) - print(errormessage) - sys.exit(0) - - def __rotatePDP(self): - self.resturl = collections.deque(self.resturl) - self.resturl.rotate(-1) - self.encoded = collections.deque(self.encoded) - self.encoded.rotate(-1) - - def __checkNotification(self, resultJson): - """ - This function call is for Internal usage purpose only. - Checks the Notification JSON compares it with the MatchStore and returns the PDPNotification object. - """ - if not resultJson: - return None - if not self.matchStore: - return None - pDPNotification = PDPNotification() - boo_Remove = False - boo_Update = False - if resultJson['removedPolicies']: - removedPolicies = [] - for removed in resultJson['removedPolicies']: - removedPolicy = RemovedPolicy() - removedPolicy._policyName = removed['policyName'] - removedPolicy._policyVersion = removed['versionNo'] - removedPolicies.append(removedPolicy) - pDPNotification._removedPolicies= removedPolicies - boo_Remove = True - if resultJson['updatedPolicies']: - updatedPolicies = [] - for updated in resultJson['updatedPolicies']: - updatedPolicy = LoadedPolicy() - # check if it has matches then it is a Config Policy and compare it with Match Store. - if updated['matches']: - # compare the matches with our Stored Matches - for eachDict in self.matchStore: - if eachDict==updated['matches']: - updatedPolicy._policyName = updated['policyName'] - updatedPolicy._policyVersion = updated['versionNo'] - updatedPolicy._matchingConditions = updated['matches'] - updatedPolicy._updateType = updated['updateType'] - updatedPolicies.append(updatedPolicy) - boo_Update = True - else: - updatedPolicy._policyName = updated['policyName'] - updatedPolicy._policyVersion = updated['versionNo'] - updatedPolicies.append(updatedPolicy) - boo_Update = True - pDPNotification._loadedPolicies= updatedPolicies - if (boo_Update and boo_Remove): - pDPNotification._notificationType = NotificationType.BOTH.name - elif boo_Update: - pDPNotification._notificationType = NotificationType.UPDATE.name - elif boo_Remove: - pDPNotification._notificationType = NotificationType.REMOVE.name - return pDPNotification - - def __startAuto(self): - """ - Starts the Auto Notification Feature.. - """ - if self.scheme is not None: - if ((self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name)or(self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name)): - if self.handler is None: - if self.autows.sock is not None: - if(self.autows.sock.connected): - self.mclose= True - self.autows.close() - else: - if self.autoURL is None: - self.autoURL = self.resturl[0] - elif self.autoURL != self.resturl[0]: - self.autoURL = self.resturl[0] - if self.autows.sock is not None: - if(self.autows.sock.connected): - self.mclose= True - self.autows.close() - else: - self.autows = None - if self.autows is None: - if(self.autoURL.startswith("https")): - __auto_url = self.autoURL.replace("https","wss")+"notifications" - else: - __auto_url = self.autoURL.replace("http","ws")+"notifications" - def run(*args): - self.__autoRequest(__auto_url) - self.logger.info("Starting AutoNotification Service with : %s" , __auto_url) - self.thread.start_new_thread(run , ()) - elif self.autows.sock is not None: - if not (self.autows.sock.connected): - self.mclose = True - self.autows.close() - self.restart = True - self.__rotatePDP() - if(self.autoURL.startswith("https")): - __auto_url = self.autoURL.replace("https","wss")+"notifications" - else: - __auto_url = self.autoURL.replace("http","ws")+"notifications" - def run(*args): - self.__autoRequest(__auto_url) - self.logger.info("Starting AutoNotification Service with : %s" , __auto_url) - self.thread.start_new_thread(run , ()) - - else: - #stop the Auto Notification Service if it is running. - if self.autows.sock is not None: - if(self.autows.sock.connected): - self.mclose= True - self.autows.close() - - def __onEvent(self, message): - """ - Handles the event Notification received. - """ - message = json.loads(message) - if self.handler is not None: - if (self.scheme == NotificationScheme.AUTO_ALL_NOTIFICATIONS.name): - pDPNotification = PDPNotification() - boo_Remove = False - boo_Update = False - if message['removedPolicies']: - removedPolicies = [] - for removed in message['removedPolicies']: - removedPolicy = RemovedPolicy() - removedPolicy._policyName = removed['policyName'] - removedPolicy._policyVersion = removed['versionNo'] - removedPolicies.append(removedPolicy) - pDPNotification._removedPolicies= removedPolicies - boo_Remove = True - if message['loadedPolicies']: - updatedPolicies = [] - for updated in message['loadedPolicies']: - updatedPolicy = LoadedPolicy() - updatedPolicy._policyName = updated['policyName'] - updatedPolicy._policyVersion = updated['versionNo'] - updatedPolicy._matchingConditions = updated['matches'] - updatedPolicy._updateType = updated['updateType'] - updatedPolicies.append(updatedPolicy) - pDPNotification._loadedPolicies= updatedPolicies - boo_Update = True - if (boo_Update and boo_Remove): - pDPNotification._notificationType = NotificationType.BOTH.name - elif boo_Update: - pDPNotification._notificationType = NotificationType.UPDATE.name - elif boo_Remove: - pDPNotification._notificationType = NotificationType.REMOVE.name - # call the Handler. - self.handler.notificationReceived(pDPNotification) - elif (self.scheme == NotificationScheme.AUTO_NOTIFICATIONS.name): - # call the handler - self.handler(self.__checkNotification(message)) - - def __manualRequest(self,request_url): - """ - Takes the request_URL given and returns the JSON response back to the Caller. - """ - ws = create_connection(request_url) - # when using self-signed server certificate, comment previous line and uncomment following: - #ws = create_connection(request_url, sslopt={"cert_reqs": ssl.CERT_NONE}) - ws.send("Manual") - try: - return json.loads(ws.recv()) - except: - return None - ws.close() - ws.shutdown() - - def __onMessage(self, ws,message): - """Occurs on Event - """ - self.logger.info("Received AutoNotification message: %s" , message) - self.__onEvent(message) - - def __onError(self, ws, error): - """Self Restart the Notification Service on Error - """ - self.logger.error("PE500 - Process Flow Issue: Auto Notification service Error!! : %s" , error) - - def __onclose(self, ws): - """Occurs on Close ? Try to start again in case User didn't do it. - """ - self.logger.debug("Connection has been Closed. ") - if not self.mclose: - self.__startAuto() - self.mclose = False - - def __autoRequest(self, request_url): - """ - Takes the request_URL and invokes the PolicyEngine method on any receiving a Message - """ - websocket.enableTrace(True) - self.autows = websocket.WebSocketApp(request_url, on_message= self.__onMessage, on_close= self.__onclose, on_error= self.__onError) - # wait for to 5 seconds to restart - if self.restart: - time.sleep(5) - self.autows.run_forever() - -class NotificationHandler: - """ - 'Defines the methods which need to run when an Event or a Notification is received.' - """ - def notificationReceived(self, notification): - """ - Will be triggered automatically whenever a Notification is received by the PEP - @param notification: PDPNotification object which has the information of the Policies. - @attention: This method must be implemented by the user for AUTO type NotificationScheme - """ - raise Exception("Unimplemented abstract method: %s" % __functionId(self, 1)) - -def __functionId(obj, nFramesUp): - """ Internal Usage only.. - Create a string naming the function n frames up on the stack. """ - fr = sys._getframe(nFramesUp+1) - co = fr.f_code - return "%s.%s" % (obj.__class__, co.co_name) - -class PolicyConfig: - """ - 'PolicyConfig is the return object resulted by getConfig Call.' - """ - def __init__(self): - self._policyConfigMessage = None - self._policyConfigStatus = None - self._policyName = None - self._policyVersion = None - self._matchingConditions = None - self._responseAttributes = None - self._policyType = None - self._json = None - self._xml = None - self._prop = None - self._other = None - -class PolicyResponse: - """ - 'PolicyResponse is the return object resulted by sendEvent Call.' - """ - def __init__(self): - self._policyResponseStatus = None - self._policyResponseMessage = None - self._requestAttributes = None - self._actionTaken = None - self._actionAdvised= None - -class PDPNotification: - """ - 'Defines the Notification Event sent from the PDP to PEP Client.' - """ - def __init__(self): - self._removedPolicies = None - self._loadedPolicies = None - self._notificationType = None - -class RemovedPolicy: - """ - 'Defines the structure of the Removed Policy' - """ - def __init__(self): - self._policyName = None - self._policyVersion = None - -class LoadedPolicy: - """ - 'Defines the Structure of the Loaded Policy' - """ - def __init__(self): - self._policyName = None - self._policyVersion = None - self._matchingConditions = None - self._updateType = None - -class PolicyParameters: - """ - 'Defines the Structure of the Policy to Create or Update' - """ - def __init__(self): - self._actionPerformer = None - self._actionAttribute = None - self._attributes = None - self._configBody = None - self._configBodyType = None - self._configName = None - self._controllerName = None - self._dependencyNames = None - self._dynamicRuleAlgorithmLabels = None - self._dynamicRuleAlgorithmFunctions = None - self._dynamicRuleAlgorithmField1 = None - self._dynamicRuleAlgorithmField2 = None - self._ecompName = None - self._extendedOption = None - self._guard = None - self._policyClass = None - self._policyConfigType = None - self._policyName = None - self._policyDescription = None - self._priority = None - self._requestID = None - self._riskLevel = None - self._riskType = None - self._ruleProvider = None - self._ttlDate = None - -class PushPolicyParameters: - """ - 'Defines the Structure of the Push Policy Parameters' - """ - def __init__(self): - self._pdpGroup = None - self._policyName = None - self._policyType = None - -class PolicyChangeResponse: - """ - 'Defines the Structure of the policy Changes made from PDP' - """ - def __init__(self): - self._responseMessage = None - self._responseCode = None - -class DeletePolicyParameters: - """ - 'Defines the Structure of the Delete Policy Parameters' - """ - def __init__(self): - self._deleteCondition = None - self._pdpGroup = None - self._policyComponent = None - self._policyName = None - self._policyType = None - self._requestID = None - -class DictionaryParameters: - """ - 'Defines the Structure of the Dictionary Parameters' - """ - def __init__(self): - self._dictionaryType = None - self._dictionary = None - self._dictionaryJson = None - self._requestID = None - -class DictionaryResponse: - """ - 'Defines the Structure of the dictionary response' - """ - def __init__(self): - self._responseMessage = None - self._responseCode = None - self._dictionaryJson = None - self._dictionaryData = None - -class DecisionResponse: - """ - 'Defines the Structure of Decision Response' - """ - def __init__(self): - self._decision = None - self._details = None - -class ImportParameters: - """ - 'Defines the Structure of Policy Model Import' - """ - def __init__(self): - self._serviceName = None - self._description = None - self._requestID = None - self._filePath = None - self._importBody = None - self._version = None - self._importType = None diff --git a/policyhandler/config.py b/policyhandler/config.py index 7033096..a36f032 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -22,8 +22,6 @@ import os import json import copy -import re -import base64 import logging import logging.config @@ -65,11 +63,7 @@ class Config(object): """find the name of the policy-handler system to be used as the key in consul-kv for config of policy-handler """ - system_name = None - if Config.config: - system_name = Config.config.get(Config.FIELD_SYSTEM) - - return system_name or Config.SERVICE_NAME_POLICY_HANDLER + return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) @staticmethod def discover(): @@ -87,19 +81,6 @@ class Config(object): Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) Config._logger.debug("merged config from discovery: %s", json.dumps(Config.config)) - @staticmethod - def upload_to_discovery(): - """upload the current config settings to the discovery service""" - if not Config.config or not isinstance(Config.config, dict): - Config._logger.error("unexpected config: %s", Config.config) - return - - discovery_key = Config.get_system_name() - latest_config = json.dumps({Config.SERVICE_NAME_POLICY_HANDLER:Config.config}) - DiscoveryClient.put_kv(discovery_key, latest_config) - Config._logger.debug("uploaded config to discovery(%s): %s", \ - discovery_key, latest_config) - @staticmethod def load_from_file(file_path=None): """read and store the config from config file""" @@ -123,43 +104,3 @@ class Config(object): Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) return True - -class PolicyEngineConfig(object): - """main config of the application""" - # PATH_TO_PROPERTIES = r'logs/policy_engine.properties' - PATH_TO_PROPERTIES = r'tmp/policy_engine.properties' - PYPDP_URL = "PYPDP_URL = {0}{1}, {2}, {3}\n" - CLIENT_ID = "CLIENT_ID = {0}\n" - CLIENT_KEY = "CLIENT_KEY = {0}\n" - ENVIRONMENT = "ENVIRONMENT = {0}\n" - _logger = logging.getLogger("policy_handler.pe_config") - - @staticmethod - def save_to_file(): - """create the policy_engine.properties for policy-engine client""" - file_path = PolicyEngineConfig.PATH_TO_PROPERTIES - - try: - config = Config.config[Config.FIELD_POLICY_ENGINE] - headers = config["headers"] - remove_basic = re.compile(r"(^Basic )") - client_auth = headers["ClientAuth"] - basic_client_auth = bool(remove_basic.match(client_auth)) - client_parts = base64.b64decode(remove_basic.sub("", client_auth)).split(":") - auth_parts = base64.b64decode(remove_basic.sub("", headers["Authorization"])).split(":") - - props = PolicyEngineConfig.PYPDP_URL.format(config["url"], config["path_pdp"], - auth_parts[0], auth_parts[1]) - props += PolicyEngineConfig.CLIENT_ID.format(client_parts[0]) - props += PolicyEngineConfig.CLIENT_KEY.format(base64.b64encode(client_parts[1])) - props += PolicyEngineConfig.ENVIRONMENT.format(headers["Environment"]) - - with open(file_path, 'w') as prp_file: - prp_file.write(props) - PolicyEngineConfig._logger.info("created %s basic_client_auth %s", - file_path, basic_client_auth) - return basic_client_auth - except IOError: - PolicyEngineConfig._logger.error("failed to save to %s", file_path) - except KeyError: - PolicyEngineConfig._logger.error("unexpected config for %s", Config.FIELD_POLICY_ENGINE) diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 1d50fc3..a641a95 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -59,14 +59,27 @@ class DeployHandler(object): DeployHandler._target_entity = Config.config["deploy_handler"] DeployHandler._url = DiscoveryClient.get_service_url(DeployHandler._target_entity) - DeployHandler._url_path = DeployHandler._url + '/policy' + DeployHandler._url_path = (DeployHandler._url or "") + '/policy' DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url) @staticmethod - def policy_update(audit, latest_policies): - """ post policy_updated message to deploy-handler """ + def policy_update(audit, latest_policies, removed_policies=None, + errored_policies=None, catch_up=False): + """post policy_updated message to deploy-handler""" + if not latest_policies and not removed_policies and not catch_up: + return + + latest_policies = latest_policies or {} + removed_policies = removed_policies or {} + errored_policies = errored_policies or {} + DeployHandler._lazy_init() - msg = {"latest_policies":latest_policies} + msg = { + "catch_up" : catch_up, + "latest_policies" : latest_policies, + "removed_policies" : removed_policies, + "errored_policies" : errored_policies + } sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_path) headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id} @@ -74,10 +87,22 @@ class DeployHandler(object): msg_str = json.dumps(msg) headers_str = json.dumps(headers) + DeployHandler._logger.info( + "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]", + catch_up, len(latest_policies), len(removed_policies), len(errored_policies)) log_line = "post to deployment-handler {0} msg={1} headers={2}".format( DeployHandler._url_path, msg_str, headers_str) - sub_aud.metrics_start(log_line) + DeployHandler._logger.info(log_line) + sub_aud.metrics_start(log_line) + + if not DeployHandler._url: + error_msg = "no url found to {0}".format(log_line) + DeployHandler._logger.error(error_msg) + sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + sub_aud.metrics(error_msg) + return res = None try: diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index 7e16b90..33c3265 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -41,6 +41,8 @@ class DiscoveryClient(object): CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}" CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}" SERVICE_MASK = "http://{0}:{1}" + SERVICE_ADDRESS = "ServiceAddress" + SERVICE_PORT = "ServicePort" _logger = logging.getLogger("policy_handler.discovery") @@ -51,23 +53,25 @@ class DiscoveryClient(object): DiscoveryClient._logger.info("discover %s", service_path) response = requests.get(service_path) response.raise_for_status() - service = response.json()[0] - return DiscoveryClient.SERVICE_MASK.format( \ - service["ServiceAddress"], service["ServicePort"]) + service = response.json() + if not service: + DiscoveryClient._logger.error("failed discover %s", service_path) + return + service = service[0] + return DiscoveryClient.SERVICE_MASK.format( + service[DiscoveryClient.SERVICE_ADDRESS], service[DiscoveryClient.SERVICE_PORT] + ) @staticmethod def get_value(key): """get the value for the key from consul-kv""" response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key)) response.raise_for_status() - data = response.json()[0] - value = base64.b64decode(data["Value"]).decode("utf-8") - DiscoveryClient._logger.info("consul-kv key=%s data=%s value(%s)", \ - key, json.dumps(data), value) + data = response.json() + if not data: + DiscoveryClient._logger.error("failed get_value %s", key) + return + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) return json.loads(value) - - @staticmethod - def put_kv(key, value): - """put the value under the key in consul-kv""" - response = requests.put(DiscoveryClient.CONSUL_KV_MASK.format(key), data=value) - response.raise_for_status() diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index a1df861..c338b76 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -27,14 +27,18 @@ # ECOMP is a trademark and service mark of AT&T Intellectual Property. import os +import sys import json import uuid import time import copy +from datetime import datetime from threading import Lock from enum import Enum +from pip import utils as pip_utils from .CommonLogger import CommonLogger +from .health import Health REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" REQUEST_REMOTE_ADDR = "Remote-Addr" @@ -44,6 +48,7 @@ HOSTNAME = "HOSTNAME" AUDIT_REQUESTID = 'requestID' AUDIT_IPADDRESS = 'IPAddress' AUDIT_SERVER = 'server' +AUDIT_TARGET_ENTITY = 'targetEntity' HEADER_CLIENTAUTH = "clientauth" HEADER_AUTHORIZATION = "authorization" @@ -51,9 +56,10 @@ HEADER_AUTHORIZATION = "authorization" class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 - DATA_NOT_FOUND_ERROR = 400 PERMISSION_UNAUTHORIZED_ERROR = 401 PERMISSION_FORBIDDEN_ERROR = 403 + RESPONSE_ERROR = 400 + DATA_NOT_FOUND_ERROR = 404 SERVER_INTERNAL_ERROR = 500 SERVICE_UNAVAILABLE_ERROR = 503 DATA_ERROR = 1030 @@ -72,23 +78,25 @@ class AuditResponseCode(Enum): @staticmethod def get_response_code(http_status_code): """calculates the response_code from max_http_status_code""" + response_code = AuditResponseCode.UNKNOWN_ERROR if http_status_code <= AuditHttpCode.HTTP_OK.value: - return AuditResponseCode.SUCCESS - - if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \ - AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: - return AuditResponseCode.PERMISSION_ERROR - if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: - return AuditResponseCode.AVAILABILITY_ERROR - if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: - return AuditResponseCode.BUSINESS_PROCESS_ERROR - if http_status_code in [AuditHttpCode.DATA_ERROR.value, \ - AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: - return AuditResponseCode.DATA_ERROR - if http_status_code == AuditHttpCode.SCHEMA_ERROR.value: - return AuditResponseCode.SCHEMA_ERROR - - return AuditResponseCode.UNKNOWN_ERROR + response_code = AuditResponseCode.SUCCESS + + elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, + AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: + response_code = AuditResponseCode.PERMISSION_ERROR + elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + response_code = AuditResponseCode.AVAILABILITY_ERROR + elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: + response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR + elif http_status_code in [AuditHttpCode.DATA_ERROR.value, + AuditHttpCode.RESPONSE_ERROR.value, + AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + response_code = AuditResponseCode.DATA_ERROR + elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: + response_code = AuditResponseCode.SCHEMA_ERROR + + return response_code @staticmethod def get_human_text(response_code): @@ -109,16 +117,23 @@ class Audit(object): :kwargs: - put any request related params into kwargs """ _service_name = "" + _service_version = "" _service_instance_UUID = str(uuid.uuid4()) + _started = datetime.now() _logger_debug = None _logger_error = None _logger_metrics = None _logger_audit = None + _health = Health() + _py_ver = sys.version.replace("\n", "") + _packages = sorted([pckg.project_name + "==" + pckg.version + for pckg in pip_utils.get_installed_distributions()]) @staticmethod - def init(service_name, config_file_path): + def init(service_name, service_version, config_file_path): """init static invariants and loggers""" Audit._service_name = service_name + Audit._service_version = service_version Audit._logger_debug = CommonLogger(config_file_path, "debug", \ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) Audit._logger_error = CommonLogger(config_file_path, "error", \ @@ -128,6 +143,22 @@ class Audit(object): Audit._logger_audit = CommonLogger(config_file_path, "audit", \ instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) + @staticmethod + def health(): + """returns json for health check""" + now = datetime.now() + return { + "service_name" : Audit._service_name, + "service_version" : Audit._service_version, + "service_instance_UUID" : Audit._service_instance_UUID, + "python" : Audit._py_ver, + "started" : str(Audit._started), + "now" : str(now), + "uptime" : str(now - Audit._started), + "stats" : Audit._health.dump(), + "packages" : Audit._packages + } + def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): """create audit object per each request in the system @@ -193,6 +224,13 @@ class Audit(object): self.max_http_status_code = max(http_status_code, self.max_http_status_code) self._lock.release() + def get_max_http_status_code(self): + """returns the highest(worst) http status code""" + self._lock.acquire() + max_http_status_code = self.max_http_status_code + self._lock.release() + return max_http_status_code + @staticmethod def get_status_code(success): """COMPLETE versus ERROR""" @@ -222,12 +260,24 @@ class Audit(object): return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - def get_response_code(self): - """calculates the response_code from max_http_status_code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() - return AuditResponseCode.get_response_code(max_http_status_code) + def is_serious_error(self, status_code): + """returns whether the response_code is success and a human text for response code""" + return AuditResponseCode.PERMISSION_ERROR.value \ + == AuditResponseCode.get_response_code(status_code).value \ + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value + + def _get_response_status(self): + """calculates the response status fields from max_http_status_code""" + max_http_status_code = self.get_max_http_status_code() + response_code = AuditResponseCode.get_response_code(max_http_status_code) + success = (response_code.value == AuditResponseCode.SUCCESS.value) + response_description = AuditResponseCode.get_human_text(response_code) + return success, max_http_status_code, response_code, response_description + + def is_success(self): + """returns whether the response_code is success and a human text for response code""" + success, _, _, _ = self._get_response_status() + return success def debug(self, log_line, **kwargs): """debug - the debug=lowest level of logging""" @@ -275,46 +325,56 @@ class Audit(object): def metrics(self, log_line, **kwargs): """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() metrics_func = None + timer = Audit.get_elapsed_time(self._metrics_started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) metrics_func = Audit._logger_metrics.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) else: log_line = "failed: {0}".format(log_line) self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) + errorDescription=response_description, **all_kwargs) metrics_func = Audit._logger_metrics.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) - metrics_func(log_line, begTime=self._metrics_start_event, \ - timer=Audit.get_elapsed_time(self._metrics_started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ - **all_kwargs) + metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, + statusCode=Audit.get_status_code(success), responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) self.metrics_start() + return (success, max_http_status_code, response_description) def audit_done(self, result=None, **kwargs): """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() log_line = "{0} {1}".format(self.req_message, result or "").strip() audit_func = None + timer = Audit.get_elapsed_time(self._started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) audit_func = Audit._logger_audit.info + Audit._health.success(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) else: log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) audit_func = Audit._logger_audit.error + Audit._health.error(all_kwargs.get(AUDIT_TARGET_ENTITY, Audit._service_name), timer) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs + ) - audit_func(log_line, begTime=self._start_event, \ - timer=Audit.get_elapsed_time(self._started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ - **all_kwargs) + return (success, max_http_status_code, response_description) diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py new file mode 100644 index 0000000..eefa7d2 --- /dev/null +++ b/policyhandler/onap/health.py @@ -0,0 +1,104 @@ +"""generic class to keep track of app health""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import uuid +from threading import Lock +from datetime import datetime + +class HealthStats(object): + """keep track of stats for calls""" + def __init__(self, name): + """keep track of stats for metrics calls""" + self._name = name or "stats_" + str(uuid.uuid4()) + self._lock = Lock() + self._call_count = 0 + self._error_count = 0 + self._longest_timer = 0 + self._total_timer = 0 + self._last_success = None + self._last_error = None + + def dump(self): + """returns dict of stats""" + dump = None + with self._lock: + dump = { + "call_count" : self._call_count, + "error_count" : self._error_count, + "last_success" : str(self._last_success), + "last_error" : str(self._last_error), + "longest_timer_millisecs" : self._longest_timer, + "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \ + if self._call_count else 0) + } + return dump + + def success(self, timer): + """records the successful execution""" + with self._lock: + self._call_count += 1 + self._last_success = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + + def error(self, timer): + """records the errored execution""" + with self._lock: + self._call_count += 1 + self._error_count += 1 + self._last_error = datetime.now() + self._total_timer += timer + if not self._longest_timer or self._longest_timer < timer: + self._longest_timer = timer + +class Health(object): + """Health stats for multiple requests""" + def __init__(self): + """Health stats for application""" + self._all_stats = {} + self._lock = Lock() + + def _add_or_get_stats(self, stats_name): + """add to or get from the ever growing dict of HealthStats""" + stats = None + with self._lock: + stats = self._all_stats.get(stats_name) + if not stats: + self._all_stats[stats_name] = stats = HealthStats(stats_name) + return stats + + def success(self, stats_name, timer): + """records the successful execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.success(timer) + + def error(self, stats_name, timer): + """records the error execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.error(timer) + + def dump(self): + """returns dict of stats""" + with self._lock: + stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems()) + + return stats diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 640b724..30fe9b2 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -24,5 +24,3 @@ POLICY_VERSION = "policyVersion" POLICY_NAME = "policyName" POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' - -POLICY_GET_CONFIG = 'getConfig' diff --git a/policyhandler/policy_engine.py b/policyhandler/policy_engine.py deleted file mode 100644 index a0ff697..0000000 --- a/policyhandler/policy_engine.py +++ /dev/null @@ -1,103 +0,0 @@ -"""policy-engine-client communicates with policy-engine thru PolicyEngine client object""" - -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -import logging -import re - -from .config import Config, PolicyEngineConfig -from .onap.audit import Audit -from .PolicyEngine import PolicyEngine, NotificationHandler, NotificationScheme -from .policy_updater import PolicyUpdater - -class PolicyNotificationHandler(NotificationHandler): - """handler of the policy-engine push notifications""" - _logger = logging.getLogger("policy_handler.policy_notification") - - def __init__(self, policy_updater): - scope_prefixes = [scope_prefix.replace(".", "[.]") - for scope_prefix in Config.config["scope_prefixes"]] - self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") - PolicyNotificationHandler._logger.info("_policy_scopes %s", self._policy_scopes.pattern) - self._policy_updater = policy_updater - self._policy_updater.start() - - def notificationReceived(self, notification): - if not notification or not notification._loadedPolicies: - return - - policy_names = [loaded._policyName - for loaded in notification._loadedPolicies - if self._policy_scopes.match(loaded._policyName)] - - if not policy_names: - PolicyNotificationHandler._logger.info("no policy updated for scopes %s", - self._policy_scopes.pattern) - return - - audit = Audit(req_message="notificationReceived from PDP") - audit.retry_get_config = True - self._policy_updater.enqueue(audit, policy_names) - -class PolicyEngineClient(object): - """ policy-engine client""" - _logger = logging.getLogger("policy_handler.policy_engine") - _policy_updater = None - _pdp_notification_handler = None - _policy_engine = None - - @staticmethod - def shutdown(audit): - """Shutdown the notification-handler""" - PolicyEngineClient._policy_updater.shutdown(audit) - - @staticmethod - def catch_up(audit): - """bring the latest policies from policy-engine""" - PolicyEngineClient._policy_updater.catch_up(audit) - - @staticmethod - def create_policy_engine_properties(): - """create the policy_engine.properties file from config.json""" - pass - - @staticmethod - def run(): - """Using policy-engine client to talk to policy engine""" - audit = Audit(req_message="start PDP client") - PolicyEngineClient._policy_updater = PolicyUpdater() - PolicyEngineClient._pdp_notification_handler = PolicyNotificationHandler( - PolicyEngineClient._policy_updater) - - sub_aud = Audit(aud_parent=audit) - sub_aud.metrics_start("create client to PDP") - basic_client_auth = PolicyEngineConfig.save_to_file() - PolicyEngineClient._policy_engine = PolicyEngine( - PolicyEngineConfig.PATH_TO_PROPERTIES, - scheme=NotificationScheme.AUTO_ALL_NOTIFICATIONS.name, - handler=PolicyEngineClient._pdp_notification_handler, - basic_client_auth=basic_client_auth - ) - sub_aud.metrics("created client to PDP") - seed_scope = ".*" - PolicyEngineClient._policy_engine.getConfig(policyName=seed_scope) - sub_aud.metrics("seeded client by PDP.getConfig for policyName={0}".format(seed_scope)) - - PolicyEngineClient.catch_up(audit) diff --git a/policyhandler/policy_handler.py b/policyhandler/policy_handler.py index 50d59bc..1cd62db 100644 --- a/policyhandler/policy_handler.py +++ b/policyhandler/policy_handler.py @@ -18,14 +18,14 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. - +import os import sys import logging from policyhandler.config import Config from policyhandler.onap.audit import Audit from policyhandler.web_server import PolicyWeb -from policyhandler.policy_engine import PolicyEngineClient +from policyhandler.policy_receiver import PolicyReceiver class LogWriter(object): """redirect the standard out + err to the logger""" @@ -52,13 +52,16 @@ def run_policy_handler(): sys.stderr = LogWriter(logger.error) logger.info("========== run_policy_handler ==========") - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) + policy_handler_version = os.getenv("APP_VER") + logger.info("policy_handler_version %s", policy_handler_version) + Audit.init(Config.get_system_name(), policy_handler_version, Config.LOGGER_CONFIG_FILE_PATH) logger.info("starting policy_handler with config:") logger.info(Audit.log_json_dumps(Config.config)) - PolicyEngineClient.run() - PolicyWeb.run() + audit = Audit(req_message="start policy handler") + PolicyReceiver.run(audit) + PolicyWeb.run_forever(audit) if __name__ == "__main__": run_policy_handler() diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py new file mode 100644 index 0000000..ec25987 --- /dev/null +++ b/policyhandler/policy_receiver.py @@ -0,0 +1,195 @@ +""" +policy-receiver communicates with policy-engine +thru web-socket to receive push notifications +on updates and removal of policies. + +on receiving the policy-notifications, the policy-receiver +filters them out by the policy scope(s) provided in policy-handler config +and passes the notifications to policy-updater +""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import json +import logging +import re +import time +from threading import Lock, Thread + +import websocket + +from .config import Config +from .onap.audit import Audit +from .policy_updater import PolicyUpdater + +LOADED_POLICIES = 'loadedPolicies' +REMOVED_POLICIES = 'removedPolicies' +POLICY_NAME = 'policyName' +POLICY_VER = 'versionNo' + +class _PolicyReceiver(Thread): + """web-socket to PolicyEngine""" + _logger = logging.getLogger("policy_handler.policy_receiver") + + def __init__(self): + """web-socket inside the thread to receive policy notifications from PolicyEngine""" + Thread.__init__(self, name="policy_receiver") + self.daemon = True + + self._lock = Lock() + self._keep_running = True + + config = Config.config[Config.FIELD_POLICY_ENGINE] + self.web_socket_url = resturl = config["url"] + config["path_pdp"] + + if resturl.startswith("https:"): + self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + self._web_socket = None + + scope_prefixes = [scope_prefix.replace(".", "[.]") + for scope_prefix in Config.config["scope_prefixes"]] + self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") + _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern) + self._policy_updater = PolicyUpdater() + self._policy_updater.start() + + def run(self): + """listen on web-socket and pass the policy notifications to policy-updater""" + websocket.enableTrace(True) + restarting = False + while True: + if not self._get_keep_running(): + break + + self._stop_notifications() + + if restarting: + time.sleep(5) + + _PolicyReceiver._logger.info( + "connecting to policy-notifications at: %s", self.web_socket_url) + self._web_socket = websocket.WebSocketApp( + self.web_socket_url, + on_message=self._on_pdp_message, + on_close=self._on_ws_close, + on_error=self._on_ws_error + ) + + _PolicyReceiver._logger.info("waiting for policy-notifications...") + self._web_socket.run_forever() + restarting = True + + _PolicyReceiver._logger.info("exit policy-receiver") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _stop_notifications(self): + """Shuts down the AutoNotification service if running.""" + with self._lock: + if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: + self._web_socket.close() + _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") + + def _on_pdp_message(self, _, message): + """received the notification from PDP""" + _PolicyReceiver._logger.info("Received notification message: %s", message) + if not message: + return + message = json.loads(message) + + if not message: + return + + policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(LOADED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(REMOVED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + + if not policies_updated and not policies_removed: + _PolicyReceiver._logger.info( + "no policy updated or removed for scopes %s", self._policy_scopes.pattern + ) + return + + audit = Audit(req_message="policy-notification - updated[{0}], removed[{1}]" \ + .format(len(policies_updated), len(policies_removed))) + audit.retry_get_config = True + self._policy_updater.enqueue(audit, policies_updated, policies_removed) + + def _on_ws_error(self, _, error): + """report an error""" + _PolicyReceiver._logger.error("policy-notification error: %s", error) + + def _on_ws_close(self, _): + """restart web-socket on close""" + _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + + def shutdown(self, audit): + """Shutdown the policy-receiver""" + _PolicyReceiver._logger.info("shutdown policy-receiver") + with self._lock: + self._keep_running = False + + self._stop_notifications() + + if self.is_alive(): + self.join() + + self._policy_updater.shutdown(audit) + + def catch_up(self, audit): + """need to bring the latest policies to DCAE-Controller""" + self._policy_updater.catch_up(audit) + +class PolicyReceiver(object): + """policy-receiver - static singleton wrapper""" + _policy_receiver = None + + @staticmethod + def shutdown(audit): + """Shutdown the notification-handler""" + PolicyReceiver._policy_receiver.shutdown(audit) + + @staticmethod + def catch_up(audit): + """bring the latest policies from policy-engine""" + PolicyReceiver._policy_receiver.catch_up(audit) + + @staticmethod + def run(audit): + """Using policy-engine client to talk to policy engine""" + sub_aud = Audit(aud_parent=audit) + sub_aud.metrics_start("start policy receiver") + + PolicyReceiver._policy_receiver = _PolicyReceiver() + PolicyReceiver._policy_receiver.start() + + sub_aud.metrics("started policy receiver") + + PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index bf8a31d..1e50693 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -22,123 +22,30 @@ import logging import json import copy -import re import time from multiprocessing.dummy import Pool as ThreadPool import requests from .config import Config -from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \ - POLICY_BODY, POLICY_CONFIG +from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode - -class PolicyUtils(object): - """policy-client utils""" - _logger = logging.getLogger("policy_handler.policy_utils") - _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') - - @staticmethod - def safe_json_parse(json_str): - """try parsing json without exception - returns the json_str back if fails""" - if not json_str: - return json_str - try: - return json.loads(json_str) - except ValueError as err: - PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) - return json_str - - @staticmethod - def extract_policy_id(policy_name): - """ policy_name = policy_id + "." + + "." + - For instance, - policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml - policy_id = DCAE_alex.Config_alex_policy_number_1 - policy_scope = DCAE_alex - policy_class = Config - policy_version = 3 - type = extension = xml - delimiter = "." - policy_class_delimiter = "_" - policy_name in PAP = DCAE_alex.alex_policy_number_1 - """ - if not policy_name: - return - return PolicyUtils._policy_name_ext.sub('', policy_name) - - @staticmethod - def parse_policy_config(policy): - """try parsing the config in policy.""" - if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]: - policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse( - policy[POLICY_BODY][POLICY_CONFIG]) - return policy - - @staticmethod - def convert_to_policy(policy_config): - """wrap policy_config received from policy-engine with policy_id.""" - if not policy_config or POLICY_NAME not in policy_config \ - or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]: - return - policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME]) - if not policy_id: - return - return {POLICY_ID:policy_id, POLICY_BODY:policy_config} - - @staticmethod - def select_latest_policy(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest version - """ - if not policy_configs: - return - latest_policy_config = {} - for policy_config in policy_configs: - if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \ - or not policy_config[POLICY_VERSION].isdigit(): - continue - if not latest_policy_config \ - or int(policy_config[POLICY_VERSION]) \ - > int(latest_policy_config[POLICY_VERSION]): - latest_policy_config = policy_config - - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) - - @staticmethod - def select_latest_policies(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest versions - """ - if not policy_configs: - return {} - policies = {} - for policy_config in policy_configs: - policy = PolicyUtils.convert_to_policy(policy_config) - if not policy or POLICY_ID not in policy or POLICY_BODY not in policy: - continue - if POLICY_VERSION not in policy[POLICY_BODY] \ - or not policy[POLICY_BODY][POLICY_VERSION] \ - or not policy[POLICY_BODY][POLICY_VERSION].isdigit(): - continue - if policy[POLICY_ID] not in policies: - policies[policy[POLICY_ID]] = policy - continue - if int(policy[POLICY_BODY][POLICY_VERSION]) \ - > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]): - policies[policy[POLICY_ID]] = policy - - for policy_id in policies: - policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) - - return policies +from .policy_utils import PolicyUtils class PolicyRest(object): """ policy-engine """ _logger = logging.getLogger("policy_handler.policy_rest") _lazy_inited = False + POLICY_GET_CONFIG = 'getConfig' + POLICY_CONFIG_STATUS = "policyConfigStatus" + CONFIG_RETRIEVED = "CONFIG_RETRIEVED" + POLICY_CONFIG_MESSAGE = "policyConfigMessage" + NO_RESPONSE_RECEIVED = "No Response Received" + + MIN_VERSION_EXPECTED = "min_version_expected" + IGNORE_POLICY_NAMES = "ignore_policy_names" _requests_session = None - _url = None + _url_get_config = None _headers = None _target_entity = None _thread_pool_size = 4 @@ -167,7 +74,8 @@ class PolicyRest(object): requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) - PolicyRest._url = config["url"] + config["path_api"] + PolicyRest._url_get_config = config["url"] \ + + config["path_api"] + PolicyRest.POLICY_GET_CONFIG PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) @@ -181,31 +89,32 @@ class PolicyRest(object): PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \ - PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \ + PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \ json.dumps(PolicyRest._scope_prefixes)) @staticmethod - def _post(audit, path, json_body): + def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" - full_path = PolicyRest._url + path sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \ - targetServiceName=full_path) + targetServiceName=PolicyRest._url_get_config) msg = json.dumps(json_body) headers = copy.copy(PolicyRest._headers) headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id headers_str = Audit.log_json_dumps(headers) - log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str) + log_line = "post to PDP {0} msg={1} headers={2}".format( + PolicyRest._url_get_config, msg, headers_str) sub_aud.metrics_start(log_line) PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers) + res = PolicyRest._requests_session.post( + PolicyRest._url_get_config, json=json_body, headers=headers) except requests.exceptions.RequestException as ex: error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \ - .format(full_path, str(ex), msg, headers_str) + .format(PolicyRest._url_get_config, str(ex), msg, headers_str) PolicyRest._logger.exception(error_msg) sub_aud.set_http_status_code(error_code) @@ -213,145 +122,301 @@ class PolicyRest(object): sub_aud.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \ - full_path, res.status_code, msg, res.text, \ + log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( + PolicyRest._url_get_config, res.status_code, msg, res.text, Audit.log_json_dumps(dict(res.request.headers.items()))) + + res_data = None + if res.status_code == requests.codes.ok: + res_data = res.json() + + if res_data and isinstance(res_data, list) and len(res_data) == 1: + result = res_data[0] + if result and not result.get(POLICY_NAME): + res_data = None + if result.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED: + error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + error_msg = "unexpected {0}".format(log_line) + + PolicyRest._logger.error(error_msg) + sub_aud.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + sub_aud.metrics(error_msg) + return (error_code, None) + sub_aud.set_http_status_code(res.status_code) sub_aud.metrics(log_line) PolicyRest._logger.info(log_line) + return res.status_code, res_data - if res.status_code == requests.codes.ok: - return res.status_code, res.json() + @staticmethod + def validate_policy(policy): + """Validates the config on policy""" + if not policy: + return - return res.status_code, None + policy_body = policy.get(POLICY_BODY) + + return bool( + policy_body + and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED + and policy_body.get(POLICY_CONFIG) + ) @staticmethod - def get_latest_policy(aud_policy_name): - """Get the latest policy for the policy_name from the policy-engine""" + def validate_policies(policies): + """Validate the config on policies. Returns (valid, errored) tuple""" + if not policies: + return None, policies + + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in policies.iteritems(): + if PolicyRest.validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + + return valid_policies, errored_policies + + @staticmethod + def get_latest_policy(aud_policy_id): + """Get the latest policy for the policy_id from the policy-engine""" PolicyRest._lazy_init() - audit, policy_name = aud_policy_name + audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id status_code = 0 + policy_configs = None latest_policy = None + expect_policy_removed = (ignore_policy_names and not min_version_expected) + for retry in xrange(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("%s", policy_name) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:policy_name}) - PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \ - json.dumps(policy_configs or [])) - latest_policy = PolicyUtils.select_latest_policy(policy_configs) - if not latest_policy: - audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \ - .format(policy_name, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + PolicyRest._logger.debug("%s", policy_id) + + status_code, policy_configs = PolicyRest._pdp_get_config( + audit, {POLICY_NAME:policy_id} + ) + + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) + + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) + + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR)) if latest_policy or not audit.retry_get_config \ + or (expect_policy_removed and not policy_configs) \ or not PolicyRest._policy_retry_sleep \ - or AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value: + or audit.is_serious_error(status_code): break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \ - .format(POLICY_GET_CONFIG, retry, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR)) break - audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \ - .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn( + "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR)) time.sleep(PolicyRest._policy_retry_sleep) + if expect_policy_removed and not latest_policy \ + and AuditHttpCode.RESPONSE_ERROR.value == status_code: + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + audit.set_http_status_code(status_code) - if not latest_policy: + if not PolicyRest.validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR) + ) + return latest_policy @staticmethod - def get_latest_policies_by_names(aud_policy_names): + def get_latest_updated_policies(aud_policy_updates): """Get the latest policies of the list of policy_names from the policy-engine""" PolicyRest._lazy_init() - audit, policy_names = aud_policy_names - if not policy_names: + audit, policies_updated, policies_removed = aud_policy_updates + if not policies_updated and not policies_removed: return - audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \ - len(policy_names), json.dumps(policy_names))) - PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names)) + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) + audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.iteritems()] - thread_count = min(PolicyRest._thread_pool_size, len(policy_names)) - apns = [(audit, policy_name) for policy_name in policy_names] policies = None - if thread_count == 1: + apns_length = len(apns) + if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] else: - pool = ThreadPool(thread_count) + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) policies = pool.map(PolicyRest.get_latest_policy, apns) pool.close() pool.join() - audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \ - len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) - policies = dict([(policy[POLICY_ID], policy) \ - for policy in policies if policy and POLICY_ID in policy]) - PolicyRest._logger.debug("policies %s", json.dumps(policies)) - if not policies: + audit.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies)), + targetEntity=PolicyRest._target_entity, + targetServiceName=PolicyRest._url_get_config) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.iteritems() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.iteritems() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - return policies + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR) + ) + + return updated_policies, removed_policies @staticmethod - def _get_latest_policies(aud_scope_prefix): - """Get the latest policies of the same scope from the policy-engine""" - audit, scope_prefix = aud_scope_prefix - PolicyRest._logger.debug("%s", scope_prefix) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:scope_prefix + ".*"}) - audit.set_http_status_code(status_code) - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \ - scope_prefix, json.dumps(policy_configs or [])) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) + def _get_latest_policies(aud_policy_filter): + """ + get the latest policies by policy_filter + or all the latest policies of the same scope from the policy-engine + """ + audit, policy_filter, error_if_not_found = aud_policy_filter + str_policy_filter = json.dumps(policy_filter) + PolicyRest._logger.debug("%s", str_policy_filter) + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + + PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, + str_policy_filter, json.dumps(policy_configs or [])) + latest_policies = PolicyUtils.select_latest_policies(policy_configs) if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \ - scope_prefix, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) - return latest_policies + if error_if_not_found: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_configs or [])), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR) + ) + return None, latest_policies + + audit.set_http_status_code(status_code) + return PolicyRest.validate_policies(latest_policies) @staticmethod - def get_latest_policies(audit): + def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" PolicyRest._lazy_init() - PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes)) - audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes))) - asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] + aud_policy_filters = None + str_metrics = None + str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes) + if policy_filter is not None: + aud_policy_filters = [(audit, policy_filter, True)] + str_metrics = "get_latest_policies for policy_filter {0}".format( + str_policy_filters) + else: + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False) + for scope_prefix in PolicyRest._scope_prefixes] + str_metrics = "get_latest_policies for scopes {0} {1}".format( \ + len(PolicyRest._scope_prefixes), str_policy_filters) + + PolicyRest._logger.debug("%s", str_policy_filters) + audit.metrics_start(str_metrics) + latest_policies = None - if PolicyRest._scope_thread_pool_size == 1: - latest_policies = [PolicyRest._get_latest_policies(asps[0])] + apfs_length = len(aud_policy_filters) + if apfs_length == 1: + latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] else: - pool = ThreadPool(PolicyRest._scope_thread_pool_size) - latest_policies = pool.map(PolicyRest._get_latest_policies, asps) + pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) pool.close() pool.join() - audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \ - len(latest_policies), json.dumps(latest_policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) + audit.metrics("total result {0}: {1} {2}".format( + str_metrics, len(latest_policies), json.dumps(latest_policies)), \ + targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) + + # latest_policies == [(valid_policies, errored_policies), ...] + valid_policies = dict( + pair for (vps, _) in latest_policies if vps for pair in vps.iteritems()) + + errored_policies = dict( + pair for (_, eps) in latest_policies if eps for pair in eps.iteritems()) - latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items()) - PolicyRest._logger.debug("latest_policies: %s %s", \ - json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies)) + PolicyRest._logger.debug( + "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s", + str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies)) - return latest_policies + return valid_policies, errored_policies diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 1f1539f..9732f69 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -26,6 +26,7 @@ from threading import Thread, Lock from .policy_rest import PolicyRest from .deploy_handler import DeployHandler +from .onap.audit import Audit class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -33,40 +34,45 @@ class PolicyUpdater(Thread): def __init__(self): """init static config of PolicyUpdater.""" - Thread.__init__(self) - self.name = "policy_updater" + Thread.__init__(self, name="policy_updater") self.daemon = True - self._req_shutdown = None - self._req_catch_up = None + self._aud_shutdown = None + self._aud_catch_up = None self._lock = Lock() self._queue = Queue() - def enqueue(self, audit=None, policy_names=None): - """enqueue the policy-names""" - policy_names = policy_names or [] - PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names)) - self._queue.put((audit, policy_names)) + def enqueue(self, audit=None, policies_updated=None, policies_removed=None): + """enqueue the policy-updates""" + policies_updated = policies_updated or [] + policies_removed = policies_removed or [] + + PolicyUpdater._logger.info( + "policies_updated %s policies_removed %s", + json.dumps(policies_updated), json.dumps(policies_removed)) + self._queue.put((audit, policies_updated, policies_removed)) def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policy_names = self._queue.get() - PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names)) + audit, policies_updated, policies_removed = self._queue.get() + PolicyUpdater._logger.info( + "got policies_updated %s policies_removed %s", + json.dumps(policies_updated), json.dumps(policies_removed)) + if not self._keep_running(): self._queue.task_done() break - if self._on_catch_up(): - continue - if not policy_names: - self._queue.task_done() + if self._on_catch_up(audit) or not audit: continue - updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names)) - PolicyUpdater.policy_update(audit, updated_policies) + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (audit, policies_updated, policies_removed)) + + DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies) audit.audit_done() self._queue.task_done() @@ -74,51 +80,60 @@ class PolicyUpdater(Thread): def _keep_running(self): """thread-safe check whether to continue running""" - self._lock.acquire() - keep_running = not self._req_shutdown - self._lock.release() - if self._req_shutdown: - self._req_shutdown.audit_done() + with self._lock: + keep_running = not self._aud_shutdown + + if self._aud_shutdown: + self._aud_shutdown.audit_done() return keep_running def catch_up(self, audit): """need to bring the latest policies to DCAE-Controller""" - self._lock.acquire() - self._req_catch_up = audit - self._lock.release() + PolicyUpdater._logger.info("catch_up requested") + with self._lock: + self._aud_catch_up = audit + self.enqueue() - def _on_catch_up(self): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - req_catch_up = self._req_catch_up - if self._req_catch_up: - self._req_catch_up = None + def _reset_queue(self): + """clear up the queue""" + with self._lock: + self._aud_catch_up = None self._queue.task_done() self._queue = Queue() + + def _on_catch_up(self, audit): + """Bring the latest policies to DCAE-Controller""" + self._lock.acquire() + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None self._lock.release() - if not req_catch_up: + + if not aud_catch_up: return False PolicyUpdater._logger.info("catch_up") - latest_policies = PolicyRest.get_latest_policies(req_catch_up) - PolicyUpdater.policy_update(req_catch_up, latest_policies) - req_catch_up.audit_done() - return True - - @staticmethod - def policy_update(audit, updated_policies): - """Invoke deploy-handler""" - if updated_policies: - PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies)) - DeployHandler.policy_update(audit, updated_policies) + latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up) + + if not aud_catch_up.is_success(): + PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") + if not audit: + self._queue.task_done() + else: + DeployHandler.policy_update( + aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True) + self._reset_queue() + success, _, _ = aud_catch_up.audit_done() + PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + + return success def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") - self._lock.acquire() - self._req_shutdown = audit - self._lock.release() + with self._lock: + self._aud_shutdown = audit self.enqueue() if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py new file mode 100644 index 0000000..d664b21 --- /dev/null +++ b/policyhandler/policy_utils.py @@ -0,0 +1,134 @@ +"""policy-client communicates with policy-engine thru REST API""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import logging +import json +import re + +from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG + +class PolicyUtils(object): + """policy-client utils""" + _logger = logging.getLogger("policy_handler.policy_utils") + _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') + + @staticmethod + def safe_json_parse(json_str): + """try parsing json without exception - returns the json_str back if fails""" + if not json_str: + return json_str + try: + return json.loads(json_str) + except (ValueError, TypeError) as err: + PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) + return json_str + + @staticmethod + def extract_policy_id(policy_name): + """ policy_name = policy_id + "." + + "." + + For instance, + policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml + policy_id = DCAE_alex.Config_alex_policy_number_1 + policy_scope = DCAE_alex + policy_class = Config + policy_version = 3 + type = extension = xml + delimiter = "." + policy_class_delimiter = "_" + policy_name in PAP = DCAE_alex.alex_policy_number_1 + """ + if not policy_name: + return + return PolicyUtils._policy_name_ext.sub('', policy_name) + + @staticmethod + def parse_policy_config(policy): + """try parsing the config in policy.""" + if not policy: + return policy + config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) + if config: + policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config) + return policy + + @staticmethod + def convert_to_policy(policy_config): + """wrap policy_config received from policy-engine with policy_id.""" + if not policy_config: + return + policy_name = policy_config.get(POLICY_NAME) + policy_version = policy_config.get(POLICY_VERSION) + if not policy_name or not policy_version: + return + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + return + return {POLICY_ID:policy_id, POLICY_BODY:policy_config} + + @staticmethod + def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest version + """ + if not policy_configs: + return + latest_policy_config = {} + for policy_config in policy_configs: + policy_name = policy_config.get(POLICY_NAME) + policy_version = policy_config.get(POLICY_VERSION) + if not policy_name or not policy_version or not policy_version.isdigit(): + continue + policy_version = int(policy_version) + if min_version_expected and policy_version < min_version_expected: + continue + if ignore_policy_names and policy_name in ignore_policy_names: + continue + + if not latest_policy_config \ + or int(latest_policy_config[POLICY_VERSION]) < policy_version: + latest_policy_config = policy_config + + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) + + @staticmethod + def select_latest_policies(policy_configs): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest versions + """ + if not policy_configs: + return {} + policies = {} + for policy_config in policy_configs: + policy = PolicyUtils.convert_to_policy(policy_config) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + if not policy_id or not policy_version or not policy_version.isdigit(): + continue + if policy_id not in policies \ + or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]): + policies[policy_id] = policy + + for policy_id in policies: + policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) + + return policies diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 9a5ee19..3d2503a 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -27,78 +27,187 @@ import cherrypy from .config import Config from .onap.audit import Audit from .policy_rest import PolicyRest -from .policy_engine import PolicyEngineClient +from .policy_receiver import PolicyReceiver class PolicyWeb(object): - """Main static class for REST API of policy-handler""" - logger = logging.getLogger("policy_handler.web_cherrypy") + """run REST API of policy-handler""" + logger = logging.getLogger("policy_handler.policy_web") @staticmethod - def run(): - """run forever the web-server of the policy-handler""" - PolicyWeb.logger.info("policy_handler web-service at port(%d)...", \ - Config.wservice_port) - cherrypy.config.update({"server.socket_host": "0.0.0.0", \ - 'server.socket_port': Config.wservice_port}) - cherrypy.tree.mount(PolicyLatest(), '/policy_latest') - cherrypy.tree.mount(PoliciesLatest(), '/policies_latest') - cherrypy.tree.mount(PoliciesCatchUp(), '/catch_up') - cherrypy.quickstart(Shutdown(), '/shutdown') - -class Shutdown(object): - """Shutdown the policy-handler""" - @cherrypy.expose - def index(self): - """shutdown event""" - audit = Audit(req_message="get /shutdown", headers=cherrypy.request.headers) - PolicyWeb.logger.info("--------- stopping REST API of policy-handler -----------") - cherrypy.engine.exit() - PolicyEngineClient.shutdown(audit) - PolicyWeb.logger.info("--------- the end -----------") - res = str(datetime.now()) - audit.info_requested(res) - return "goodbye! shutdown requested {0}".format(res) + def run_forever(audit): + """run the web-server of the policy-handler forever""" + PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port) + cherrypy.config.update({"server.socket_host": "0.0.0.0", + 'server.socket_port': Config.wservice_port}) + cherrypy.tree.mount(_PolicyWeb(), '/') + audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port)) + cherrypy.engine.start() + +class _PolicyWeb(object): + """REST API of policy-handler""" -class PoliciesLatest(object): - """REST API of the policy-hanlder""" + @staticmethod + def _get_request_info(request): + """returns info about the http request""" + return "{0} {1}{2}".format(request.method, request.script_name, request.path_info) @cherrypy.expose + @cherrypy.popargs('policy_id') @cherrypy.tools.json_out() - def index(self): - """find the latest policy by policy_id or all latest policies""" - audit = Audit(req_message="get /policies_latest", headers=cherrypy.request.headers) - res = PolicyRest.get_latest_policies(audit) or {} - PolicyWeb.logger.info("PoliciesLatest: %s", json.dumps(res)) - audit.audit_done(result=json.dumps(res)) + def policy_latest(self, policy_id): + """retireves the latest policy identified by policy_id""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + PolicyWeb.logger.info("%s policy_id=%s headers=%s", \ + req_info, policy_id, json.dumps(cherrypy.request.headers)) + + res = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} + + PolicyWeb.logger.info("res %s policy_id=%s res=%s", req_info, policy_id, json.dumps(res)) + + success, http_status_code, response_description = audit.audit_done(result=json.dumps(res)) + if not success: + raise cherrypy.HTTPError(http_status_code, response_description) return res -@cherrypy.popargs('policy_id') -class PolicyLatest(object): - """REST API of the policy-hanlder""" + def _get_all_policies_latest(self): + """retireves all the latest policies on GET /policies_latest""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + + valid_policies, errored_policies = PolicyRest.get_latest_policies(audit) + + res = {"valid_policies": valid_policies, "errored_policies": errored_policies} + PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res)) + + success, http_status_code, response_description = audit.audit_done(result=json.dumps(res)) + if not success: + raise cherrypy.HTTPError(http_status_code, response_description) + return res @cherrypy.expose @cherrypy.tools.json_out() - def index(self, policy_id): - """find the latest policy by policy_id or all latest policies""" - audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""), \ + @cherrypy.tools.json_in() + def policies_latest(self): + """ + on :GET: retrieves all the latest policies from policy-engine that are + in the scope of the policy-handler. + + on :POST: expects to receive the params that mimic the /getConfig of policy-engine + and retrieves the matching policies from policy-engine and picks the latest on each policy. + + sample request - policies filter + + { + "configAttributes": { "key1":"value1" }, + "configName": "alex_config_name", + "ecompName": "DCAE", + "policyName": "DCAE_alex.Config_alex_.*", + "unique": false + } + + sample response + + { + "DCAE_alex.Config_alex_priority": { + "policy_body": { + "policyName": "DCAE_alex.Config_alex_priority.3.xml", + "policyConfigMessage": "Config Retrieved! ", + "responseAttributes": {}, + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "matchingConditions": { + "priority": "10", + "key1": "value1", + "ECOMPName": "DCAE", + "ConfigName": "alex_config_name" + }, + "property": null, + "config": { + "foo": "bar", + "foo_updated": "2017-10-06T16:54:31.696Z" + }, + "policyVersion": "3" + }, + "policy_id": "DCAE_alex.Config_alex_priority" + } + } + """ + if cherrypy.request.method == "GET": + return self._get_all_policies_latest() + + if cherrypy.request.method != "POST": + raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method)) + + policy_filter = cherrypy.request.json or {} + str_policy_filter = json.dumps(policy_filter) + + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message="{0}: {1}".format(req_info, str_policy_filter), \ headers=cherrypy.request.headers) - PolicyWeb.logger.info("PolicyLatest policy_id=%s headers=%s", \ - policy_id, json.dumps(cherrypy.request.headers)) - res = PolicyRest.get_latest_policy((audit, policy_id)) or {} - PolicyWeb.logger.info("PolicyLatest policy_id=%s res=%s", policy_id, json.dumps(res)) - audit.audit_done(result=json.dumps(res)) + PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \ + req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) + + res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + + PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \ + req_info, str_policy_filter, json.dumps(res)) + + success, http_status_code, response_description = audit.audit_done(result=json.dumps(res)) + if not success: + raise cherrypy.HTTPError(http_status_code, response_description) return res -class PoliciesCatchUp(object): - """catch up with all DCAE policies""" @cherrypy.expose @cherrypy.tools.json_out() - def index(self): - """catch up with all policies""" + def catch_up(self): + """catch up with all DCAE policies""" started = str(datetime.now()) - audit = Audit(req_message="get /catch_up", headers=cherrypy.request.headers) - PolicyEngineClient.catch_up(audit) + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + PolicyReceiver.catch_up(audit) + res = {"catch-up requested": started} - PolicyWeb.logger.info("PoliciesCatchUp: %s", json.dumps(res)) + PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res)) audit.info_requested(started) return res + + @cherrypy.expose + def shutdown(self): + """Shutdown the policy-handler""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s: --- stopping REST API of policy-handler ---", req_info) + + cherrypy.engine.exit() + + PolicyReceiver.shutdown(audit) + + health = json.dumps(Audit.health()) + audit.info("policy_handler health: {0}".format(health)) + PolicyWeb.logger.info("policy_handler health: %s", health) + PolicyWeb.logger.info("%s: --------- the end -----------", req_info) + res = str(datetime.now()) + audit.info_requested(res) + return "goodbye! shutdown requested {0}".format(res) + + @cherrypy.expose + @cherrypy.tools.json_out() + def healthcheck(self): + """returns the healthcheck results""" + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(req_message=req_info, headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + + res = Audit.health() + + PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res)) + + audit.audit_done(result=json.dumps(res)) + return res diff --git a/pom.xml b/pom.xml index be86e0b..e476714 100644 --- a/pom.xml +++ b/pom.xml @@ -30,8 +30,8 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform policy-handler dcaegen2-platform-policy-handler - - 1.2.0-SNAPSHOT + + 2.0.0-SNAPSHOT http://maven.apache.org UTF-8 @@ -113,7 +113,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. - org.apache.maven.plugins diff --git a/requirements.txt b/requirements.txt index 8afa983..dc79454 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,4 @@ CherryPy>=10.2.2,<11.0 enum34>=1.1.6 -future>=0.16.0 requests>=2.13.0,<3.0.0 -six>=1.10.0 -websocket-client>=0.40.0 +websocket-client>=0.40.0,<1.0.0 diff --git a/run_policy.sh b/run_policy.sh index 8fa23a2..4bace3c 100644 --- a/run_policy.sh +++ b/run_policy.sh @@ -23,6 +23,9 @@ mkdir -p logs LOG_FILE=logs/policy_handler.log echo "---------------------------------------------" >> ${LOG_FILE} 2>&1 +export APP_VER=$(python setup.py --version) +echo "APP_VER=${APP_VER}" | tee -a ${LOG_FILE} + echo "/etc/hosts" | tee -a ${LOG_FILE} cat /etc/hosts | tee -a ${LOG_FILE} python -m policyhandler/policy_handler >> ${LOG_FILE} 2>&1 & diff --git a/setup.py b/setup.py index 85e74ba..63c41a2 100644 --- a/setup.py +++ b/setup.py @@ -24,16 +24,14 @@ from setuptools import setup setup( name='policyhandler', description='DCAE-Controller policy-handler to communicate with policy-engine', - version="1.0.0", + version="2.0.0", author='Alex Shatov', packages=['policyhandler'], zip_safe=False, install_requires=[ "CherryPy>=10.2.2,<11.0", "enum34>=1.1.6", - "future>=0.16.0", "requests>=2.13.0,<3.0.0", - "six>=1.10.0", "websocket-client>=0.40.0" ], keywords='policy dcae controller', diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index 61ca00c..c90e521 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -18,44 +18,106 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import sys +import copy import json -import re import logging +import re +import sys +import time +import uuid from datetime import datetime -# import pytest +import pytest + +import cherrypy +from cherrypy.test.helper import CPWebCase from policyhandler.config import Config +from policyhandler.deploy_handler import DeployHandler +from policyhandler.discovery import DiscoveryClient +from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, + AuditHttpCode) +from policyhandler.policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, + POLICY_NAME, POLICY_VERSION) from policyhandler.policy_handler import LogWriter -from policyhandler.onap.audit import Audit -from policyhandler.policy_rest import PolicyRest, PolicyUtils -from policyhandler.policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, \ - POLICY_BODY, POLICY_CONFIG +from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, + REMOVED_POLICIES, PolicyReceiver) +from policyhandler.policy_rest import PolicyRest +from policyhandler.policy_utils import PolicyUtils +from policyhandler.web_server import _PolicyWeb + +POLICY_HANDLER_VERSION = "2.0.0" + +class MonkeyHttpResponse(object): + """Monkey http reposne""" + def __init__(self, headers): + self.headers = headers or {} + +class MonkeyedResponse(object): + """Monkey response""" + def __init__(self, full_path, res_json, json_body=None, headers=None): + self.full_path = full_path + self.req_json = json_body or {} + self.status_code = 200 + self.request = MonkeyHttpResponse(headers) + self.res = res_json + self.text = json.dumps(self.res) + + def json(self): + """returns json of response""" + return self.res + + def raise_for_status(self): + """ignoring""" + pass + +def monkeyed_discovery(full_path): + """monkeypatch for get from consul""" + res_json = {} + if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format(Config.config["deploy_handler"]): + res_json = [{ + DiscoveryClient.SERVICE_ADDRESS: "1.1.1.1", + DiscoveryClient.SERVICE_PORT: "123" + }] + elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.get_system_name()): + res_json = copy.deepcopy(Settings.dicovered_config) + return MonkeyedResponse(full_path, res_json) + +@pytest.fixture() +def fix_discovery(monkeypatch): + """monkeyed discovery request.get""" + Settings.logger.info("setup fix_discovery") + monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery) + yield fix_discovery # provide the fixture value + Settings.logger.info("teardown fix_discovery") class Settings(object): """init all locals""" logger = None RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z' + dicovered_config = None @staticmethod def init(): """init locals""" Config.load_from_file() + + with open("etc_upload/config.json", 'r') as config_json: + Settings.dicovered_config = json.load(config_json) + Config.load_from_file("etc_upload/config.json") - Settings.logger = logging.getLogger("policy_handler") + Settings.logger = logging.getLogger("policy_handler.unit_test") sys.stdout = LogWriter(Settings.logger.info) sys.stderr = LogWriter(Settings.logger.error) + print "print ========== run_policy_handler ==========" Settings.logger.info("========== run_policy_handler ==========") - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) + Audit.init(Config.get_system_name(), POLICY_HANDLER_VERSION, Config.LOGGER_CONFIG_FILE_PATH) Settings.logger.info("starting policy_handler with config:") Settings.logger.info(Audit.log_json_dumps(Config.config)) - PolicyRest._lazy_init() - Settings.init() class MonkeyPolicyBody(object): @@ -80,7 +142,7 @@ class MonkeyPolicyBody(object): POLICY_VERSION: this_ver, POLICY_CONFIG: json.dumps(config), "matchingConditions": { - "ECOMPName": "DCAE", + "ONAPName": "DCAE", "ConfigName": "alex_config_name" }, "responseAttributes": {}, @@ -95,19 +157,22 @@ class MonkeyPolicyBody(object): for key in policy_body_1.keys(): if key not in policy_body_2: return False - if isinstance(policy_body_1[key], dict): - return MonkeyPolicyBody.is_the_same_dict( - policy_body_1[key], policy_body_2[key]) - if (policy_body_1[key] is None and policy_body_2[key] is not None) \ - or (policy_body_1[key] is not None and policy_body_2[key] is None) \ - or (policy_body_1[key] != policy_body_2[key]): + + val_1 = policy_body_1[key] + val_2 = policy_body_2[key] + if isinstance(val_1, dict) \ + and not MonkeyPolicyBody.is_the_same_dict(val_1, val_2): + return False + if (val_1 is None and val_2 is not None) \ + or (val_1 is not None and val_2 is None) \ + or (val_1 != val_2): return False return True class MonkeyPolicyEngine(object): """pretend this is the policy-engine""" _scope_prefix = Config.config["scope_prefixes"][0] - LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur""".split() + LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split() _policies = [] @staticmethod @@ -115,18 +180,19 @@ class MonkeyPolicyEngine(object): """init static vars""" MonkeyPolicyEngine._policies = [ MonkeyPolicyBody.create_policy_body( - MonkeyPolicyEngine._scope_prefix + policy_id, policy_version) + MonkeyPolicyEngine._scope_prefix + policy_id, policy_index + 1) for policy_id in MonkeyPolicyEngine.LOREM_IPSUM - for policy_version in range(1, 1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))] + for policy_index in range(1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))] + Settings.logger.info("MonkeyPolicyEngine._policies: %s", + json.dumps(MonkeyPolicyEngine._policies)) @staticmethod def get_config(policy_name): """find policy the way the policy-engine finds""" if not policy_name: return [] - if policy_name[-2:] == ".*": - policy_name = policy_name[:-2] - return [policy for policy in MonkeyPolicyEngine._policies + return [copy.deepcopy(policy) + for policy in MonkeyPolicyEngine._policies if re.match(policy_name, policy[POLICY_NAME])] @staticmethod @@ -135,49 +201,290 @@ class MonkeyPolicyEngine(object): return MonkeyPolicyEngine._scope_prefix \ + MonkeyPolicyEngine.LOREM_IPSUM[policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)] -MonkeyPolicyEngine.init() + @staticmethod + def gen_policy_latest(policy_index): + """generate the policy response by policy_index = version - 1""" + policy_id = MonkeyPolicyEngine.get_policy_id(policy_index) + expected_policy = { + POLICY_ID : policy_id, + POLICY_BODY : MonkeyPolicyBody.create_policy_body(policy_id, policy_index + 1) + } + return policy_id, PolicyUtils.parse_policy_config(expected_policy) -class MonkeyHttpResponse(object): - """Monkey http reposne""" - def __init__(self, headers): - self.headers = headers or {} + @staticmethod + def gen_all_policies_latest(): + """generate all latest policies""" + return dict( + MonkeyPolicyEngine.gen_policy_latest(policy_index) + for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM)) + ) -class MonkeyedResponse(object): - """Monkey response""" - def __init__(self, full_path, json_body, headers): - self.full_path = full_path - self.req_json = json_body or {} - self.status_code = 200 - self.request = MonkeyHttpResponse(headers) - self.req_policy_name = self.req_json.get(POLICY_NAME) - self.res = MonkeyPolicyEngine.get_config(self.req_policy_name) - self.text = json.dumps(self.res) + @staticmethod + def gen_policies_latest(match_to_policy_name): + """generate all latest policies""" + return dict( + (k, v) + for k, v in MonkeyPolicyEngine.gen_all_policies_latest().iteritems() + if re.match(match_to_policy_name, k) + ) - def json(self): - """returns json of response""" - return self.res +MonkeyPolicyEngine.init() -def monkeyed_policy_rest_post(full_path, json={}, headers={}): +def monkeyed_policy_rest_post(full_path, json=None, headers=None): """monkeypatch for the POST to policy-engine""" - return MonkeyedResponse(full_path, json, headers) + res_json = MonkeyPolicyEngine.get_config(json.get(POLICY_NAME)) + return MonkeyedResponse(full_path, res_json, json, headers) + +@pytest.fixture() +def fix_pdp_post(monkeypatch): + """monkeyed request /getConfig to PDP""" + Settings.logger.info("setup fix_pdp_post") + PolicyRest._lazy_init() + monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', + monkeyed_policy_rest_post) + yield fix_pdp_post # provide the fixture value + Settings.logger.info("teardown fix_pdp_post") + +def monkeyed_deploy_handler(full_path, json=None, headers=None): + """monkeypatch for deploy_handler""" + return MonkeyedResponse(full_path, {}, json, headers) -def test_get_policy_latest(monkeypatch): +@pytest.fixture() +def fix_deploy_handler(monkeypatch, fix_discovery): + """monkeyed discovery request.get""" + Settings.logger.info("setup fix_deploy_handler") + DeployHandler._lazy_init() + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', + monkeyed_deploy_handler) + yield fix_deploy_handler # provide the fixture value + Settings.logger.info("teardown fix_deploy_handler") + +def monkeyed_cherrypy_engine_exit(): + """monkeypatch for deploy_handler""" + Settings.logger.info("monkeyed_cherrypy_engine_exit()") + +@pytest.fixture() +def fix_cherrypy_engine_exit(monkeypatch): + """monkeyed cherrypy.engine.exit()""" + Settings.logger.info("setup fix_cherrypy_engine_exit") + monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit', + monkeyed_cherrypy_engine_exit) + yield fix_cherrypy_engine_exit # provide the fixture value + Settings.logger.info("teardown fix_cherrypy_engine_exit") + +class MonkeyedWebSocket(object): + """Monkey websocket""" + on_message = None + + @staticmethod + def send_notification(updated_indexes): + """fake notification through the web-socket""" + if not MonkeyedWebSocket.on_message: + return + message = { + LOADED_POLICIES : [ + {POLICY_NAME: "{0}.{1}.xml".format( + MonkeyPolicyEngine.get_policy_id(policy_index), policy_index + 1), + POLICY_VER: str(policy_index + 1)} + for policy_index in updated_indexes or [] + ], + REMOVED_POLICIES : [] + } + message = json.dumps(message) + Settings.logger.info("send_notification: %s", message) + MonkeyedWebSocket.on_message(None, message) + + @staticmethod + def enableTrace(yes_no): + """ignore""" + pass + + class MonkeyedSocket(object): + """Monkey websocket""" + def __init__(self): + self.connected = True + + class WebSocketApp(object): + """Monkeyed WebSocketApp""" + def __init__(self, web_socket_url, on_message=None, on_close=None, on_error=None): + self.web_socket_url = web_socket_url + self.on_message = MonkeyedWebSocket.on_message = on_message + self.on_close = on_close + self.on_error = on_error + self.sock = MonkeyedWebSocket.MonkeyedSocket() + Settings.logger.info("MonkeyedWebSocket for: %s", self.web_socket_url) + + def run_forever(self): + """forever in the loop""" + while self.sock.connected: + Settings.logger.info("MonkeyedWebSocket sleep...") + time.sleep(5) + Settings.logger.info("MonkeyedWebSocket exit") + + def close(self): + """close socket""" + self.sock.connected = False + +@pytest.fixture() +def fix_policy_receiver_websocket(monkeypatch): + """monkeyed websocket for policy_receiver""" + Settings.logger.info("setup fix_policy_receiver_websocket") + monkeypatch.setattr('policyhandler.policy_receiver.websocket', MonkeyedWebSocket) + yield fix_policy_receiver_websocket # provide the fixture value + Settings.logger.info("teardown fix_policy_receiver_websocket") + +def test_get_policy_latest(fix_pdp_post): """test /policy_latest/""" - monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', \ - monkeyed_policy_rest_post) - policy_index = 3 - policy_id = MonkeyPolicyEngine.get_policy_id(policy_index) - expected_policy = { - POLICY_ID : policy_id, - POLICY_BODY : MonkeyPolicyBody.create_policy_body(policy_id, policy_index) - } - expected_policy = PolicyUtils.parse_policy_config(expected_policy) + policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or "")) - policy_latest = PolicyRest.get_latest_policy((audit, policy_id)) or {} + policy_latest = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} audit.audit_done(result=json.dumps(policy_latest)) - Settings.logger.info("expected_policy: {0}".format(json.dumps(expected_policy))) - Settings.logger.info("policy_latest: {0}".format(json.dumps(policy_latest))) + Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) + Settings.logger.info("policy_latest: %s", json.dumps(policy_latest)) assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy) assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest) + +def test_healthcheck(): + """test /healthcheck""" + audit = Audit(req_message="get /healthcheck") + audit.metrics_start("test /healthcheck") + time.sleep(0.1) + + audit.metrics("test /healthcheck") + health = Audit.health() or {} + audit.audit_done(result=json.dumps(health)) + + Settings.logger.info("healthcheck: %s", json.dumps(health)) + assert bool(health) + +def test_healthcheck_with_error(): + """test /healthcheck""" + audit = Audit(req_message="get /healthcheck") + audit.metrics_start("test /healthcheck") + time.sleep(0.2) + audit.error("error from test_healthcheck_with_error") + audit.fatal("fatal from test_healthcheck_with_error") + audit.debug("debug from test_healthcheck_with_error") + audit.warn("debug from test_healthcheck_with_error") + audit.info_requested("debug from test_healthcheck_with_error") + if audit.is_success(): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + audit.metrics("test /healthcheck") + + health = Audit.health() or {} + audit.audit_done(result=json.dumps(health)) + + Settings.logger.info("healthcheck: %s", json.dumps(health)) + assert bool(health) + +@pytest.mark.usefixtures("fix_pdp_post") +class WebServerTest(CPWebCase): + """testing the web-server - runs tests in alphabetical order of method names""" + def setup_server(): + """setup the web-server""" + cherrypy.tree.mount(_PolicyWeb(), '/') + + setup_server = staticmethod(setup_server) + + def test_web_healthcheck(self): + """test /healthcheck""" + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + Settings.logger.info("got healthcheck: %s", self.body) + self.assertStatus('200 OK') + + def test_web_policy_latest(self): + """test /policy_latest/""" + policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + + self.getPage("/policy_latest/{0}".format(policy_id or "")) + self.assertStatus('200 OK') + + policy_latest = json.loads(self.body) + + Settings.logger.info("policy_latest: %s", self.body) + Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) + assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy) + assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest) + + def test_web_all_policies_latest(self): + """test GET /policies_latest""" + expected_policies = MonkeyPolicyEngine.gen_all_policies_latest() + + result = self.getPage("/policies_latest") + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus('200 OK') + + policies_latest = json.loads(self.body) + self.assertIn("valid_policies", policies_latest) + policies_latest = policies_latest["valid_policies"] + + Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) + Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) + assert MonkeyPolicyBody.is_the_same_dict(policies_latest, expected_policies) + assert MonkeyPolicyBody.is_the_same_dict(expected_policies, policies_latest) + + def test_web_policies_latest(self): + """test POST /policies_latest with policyName""" + match_to_policy_name = Config.config["scope_prefixes"][0] + "amet.*" + expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name) + + body = json.dumps({POLICY_NAME: match_to_policy_name}) + result = self.getPage("/policies_latest", method='POST', + body=body, + headers=[ + (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), + ("Content-Type", "application/json"), + ('Content-Length', str(len(body))) + ]) + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus('200 OK') + + policies_latest = json.loads(self.body) + + Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) + Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) + assert MonkeyPolicyBody.is_the_same_dict(policies_latest, expected_policies) + assert MonkeyPolicyBody.is_the_same_dict(expected_policies, policies_latest) + + @pytest.mark.usefixtures( + "fix_deploy_handler", + "fix_policy_receiver_websocket", + "fix_cherrypy_engine_exit") + def test_zzz_run_policy_handler(self): + """test run policy handler""" + Settings.logger.info("start policy handler") + audit = Audit(req_message="start policy handler") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep before shutdown...") + time.sleep(1) + result = self.getPage("/shutdown") + Settings.logger.info("shutdown result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got shutdown: %s", self.body) + time.sleep(1) + + # @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + # def test_zzz_web_catch_up(self): + # """test /catch_up""" + # Settings.logger.info("start policy handler") + # audit = Audit(req_message="start policy handler") + # PolicyReceiver.run(audit) + # time.sleep(5) + # result = self.getPage("/catch_up") + # Settings.logger.info("catch_up result: %s", result) + # self.assertStatus('200 OK') + # Settings.logger.info("got catch_up: %s", self.body) diff --git a/tox-local.ini b/tox-local.ini index 6700caa..f08b661 100644 --- a/tox-local.ini +++ b/tox-local.ini @@ -1,3 +1,4 @@ +# tox -c tox-local.ini [tox] envlist = py27 diff --git a/version.properties b/version.properties index 07578e5..fbd1b27 100644 --- a/version.properties +++ b/version.properties @@ -1,5 +1,5 @@ -major=1 -minor=2 +major=2 +minor=0 patch=0 base_version=${major}.${minor}.${patch} release_version=${base_version} -- cgit 1.2.3-korg