diff options
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r-- | policyhandler/policy_rest.py | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py new file mode 100644 index 0000000..d49164c --- /dev/null +++ b/policyhandler/policy_rest.py @@ -0,0 +1,342 @@ +"""policy-client communicates with policy-engine thru REST API""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import logging +import json +import copy +import re +import time +from multiprocessing.dummy import Pool as ThreadPool +import requests + +from .config import Config +from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \ + POLICY_BODY, POLICY_CONFIG +from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode + +class PolicyUtils(object): + """policy-client utils""" + _logger = logging.getLogger("policy_handler.policy_utils") + _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') + + @staticmethod + def safe_json_parse(json_str): + """try parsing json without exception - returns the json_str back if fails""" + if not json_str: + return json_str + try: + return json.loads(json_str) + except ValueError as err: + PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) + return json_str + + @staticmethod + def extract_policy_id(policy_name): + """ policy_name = policy_id + "." + <version> + "." + <extension> + For instance, + policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml + policy_id = DCAE_alex.Config_alex_policy_number_1 + policy_scope = DCAE_alex + policy_class = Config + policy_version = 3 + type = extension = xml + delimiter = "." + policy_class_delimiter = "_" + policy_name in PAP = DCAE_alex.alex_policy_number_1 + """ + if not policy_name: + return + return PolicyUtils._policy_name_ext.sub('', policy_name) + + @staticmethod + def parse_policy_config(policy): + """try parsing the config in policy.""" + if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]: + policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse( + policy[POLICY_BODY][POLICY_CONFIG]) + return policy + + @staticmethod + def convert_to_policy(policy_config): + """wrap policy_config received from policy-engine with policy_id.""" + if not policy_config or POLICY_NAME not in policy_config \ + or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]: + return + policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME]) + if not policy_id: + return + return {POLICY_ID:policy_id, POLICY_BODY:policy_config} + + @staticmethod + def select_latest_policy(policy_configs): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest version + """ + if not policy_configs: + return + latest_policy_config = {} + for policy_config in policy_configs: + if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \ + or not policy_config[POLICY_VERSION].isdigit(): + continue + if not latest_policy_config \ + or int(policy_config[POLICY_VERSION]) \ + > int(latest_policy_config[POLICY_VERSION]): + latest_policy_config = policy_config + + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) + + @staticmethod + def select_latest_policies(policy_configs): + """For some reason, the policy-engine returns all version of the policy_configs. + DCAE-Controller is only interested in the latest versions + """ + if not policy_configs: + return {} + policies = {} + for policy_config in policy_configs: + policy = PolicyUtils.convert_to_policy(policy_config) + if not policy or POLICY_ID not in policy or POLICY_BODY not in policy: + continue + if POLICY_VERSION not in policy[POLICY_BODY] \ + or not policy[POLICY_BODY][POLICY_VERSION] \ + or not policy[POLICY_BODY][POLICY_VERSION].isdigit(): + continue + if policy[POLICY_ID] not in policies: + policies[policy[POLICY_ID]] = policy + continue + if int(policy[POLICY_BODY][POLICY_VERSION]) \ + > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]): + policies[policy[POLICY_ID]] = policy + + for policy_id in policies: + policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) + + return policies + +class PolicyRest(object): + """ policy-engine """ + _logger = logging.getLogger("policy_handler.policy_rest") + _lazy_inited = False + _url = None + _headers = None + _target_entity = None + _thread_pool_size = 4 + _scope_prefixes = None + _scope_thread_pool_size = 4 + _policy_retry_count = 1 + _policy_retry_sleep = 0 + + @staticmethod + def _lazy_init(): + """init static config""" + if PolicyRest._lazy_inited: + return + PolicyRest._lazy_inited = True + config = Config.config[Config.FIELD_POLICY_ENGINE] + PolicyRest._url = config["url"] + config["path_api"] + PolicyRest._headers = config["headers"] + PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) + PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) + if PolicyRest._thread_pool_size < 2: + PolicyRest._thread_pool_size = 2 + PolicyRest._scope_prefixes = Config.config["scope_prefixes"] + PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ + len(PolicyRest._scope_prefixes)) + + PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1 + PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) + + PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \ + PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \ + json.dumps(PolicyRest._scope_prefixes)) + + @staticmethod + def _post(audit, path, json_body): + """Communication with the policy-engine""" + full_path = PolicyRest._url + path + sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \ + targetServiceName=full_path) + + msg = json.dumps(json_body) + headers = copy.copy(PolicyRest._headers) + headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id + headers_str = Audit.log_json_dumps(headers) + + log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str) + sub_aud.metrics_start(log_line) + PolicyRest._logger.info(log_line) + res = None + try: + res = requests.post(full_path, json=json_body, headers=headers) + except requests.exceptions.RequestException as ex: + error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \ + .format(full_path, str(ex), msg, headers_str) + + PolicyRest._logger.exception(error_msg) + sub_aud.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + sub_aud.metrics(error_msg) + return (error_code, None) + + log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \ + full_path, res.status_code, msg, res.text, \ + Audit.log_json_dumps(dict(res.request.headers.items()))) + sub_aud.set_http_status_code(res.status_code) + sub_aud.metrics(log_line) + PolicyRest._logger.info(log_line) + + if res.status_code == requests.codes.ok: + return res.status_code, res.json() + + return res.status_code, None + + @staticmethod + def get_latest_policy(aud_policy_name): + """Get the latest policy for the policy_name from the policy-engine""" + PolicyRest._lazy_init() + audit, policy_name = aud_policy_name + + status_code = 0 + latest_policy = None + for retry in xrange(1, PolicyRest._policy_retry_count + 1): + PolicyRest._logger.debug("%s", policy_name) + status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ + {POLICY_NAME:policy_name}) + PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \ + json.dumps(policy_configs or [])) + latest_policy = PolicyUtils.select_latest_policy(policy_configs) + if not latest_policy: + audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \ + .format(policy_name, json.dumps(policy_configs or [])), \ + errorCode=AuditResponseCode.DATA_ERROR.value, \ + errorDescription=AuditResponseCode.get_human_text( \ + AuditResponseCode.DATA_ERROR)) + + if latest_policy or not audit.retry_get_config \ + or not PolicyRest._policy_retry_sleep \ + or AuditResponseCode.PERMISSION_ERROR.value \ + == AuditResponseCode.get_response_code(status_code).value: + break + + if retry == PolicyRest._policy_retry_count: + audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \ + .format(POLICY_GET_CONFIG, retry, policy_name), \ + errorCode=AuditResponseCode.DATA_ERROR.value, \ + errorDescription=AuditResponseCode.get_human_text( \ + AuditResponseCode.DATA_ERROR)) + break + + audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \ + .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \ + errorCode=AuditResponseCode.DATA_ERROR.value, \ + errorDescription=AuditResponseCode.get_human_text( \ + AuditResponseCode.DATA_ERROR)) + time.sleep(PolicyRest._policy_retry_sleep) + + audit.set_http_status_code(status_code) + if not latest_policy: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + return latest_policy + + @staticmethod + def get_latest_policies_by_names(aud_policy_names): + """Get the latest policies of the list of policy_names from the policy-engine""" + PolicyRest._lazy_init() + audit, policy_names = aud_policy_names + if not policy_names: + return + + audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \ + len(policy_names), json.dumps(policy_names))) + PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names)) + + thread_count = min(PolicyRest._thread_pool_size, len(policy_names)) + apns = [(audit, policy_name) for policy_name in policy_names] + policies = None + if thread_count == 1: + policies = [PolicyRest.get_latest_policy(apns[0])] + else: + pool = ThreadPool(thread_count) + policies = pool.map(PolicyRest.get_latest_policy, apns) + pool.close() + pool.join() + + audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \ + len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \ + targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) + policies = dict([(policy[POLICY_ID], policy) \ + for policy in policies if policy and POLICY_ID in policy]) + PolicyRest._logger.debug("policies %s", json.dumps(policies)) + if not policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + return policies + + @staticmethod + def _get_latest_policies(aud_scope_prefix): + """Get the latest policies of the same scope from the policy-engine""" + audit, scope_prefix = aud_scope_prefix + PolicyRest._logger.debug("%s", scope_prefix) + status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ + {POLICY_NAME:scope_prefix + ".*"}) + audit.set_http_status_code(status_code) + PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \ + scope_prefix, json.dumps(policy_configs or [])) + latest_policies = PolicyUtils.select_latest_policies(policy_configs) + + if not latest_policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \ + scope_prefix, json.dumps(policy_configs or [])), \ + errorCode=AuditResponseCode.DATA_ERROR.value, \ + errorDescription=AuditResponseCode.get_human_text( \ + AuditResponseCode.DATA_ERROR)) + return latest_policies + + @staticmethod + def get_latest_policies(audit): + """Get the latest policies of the same scope from the policy-engine""" + PolicyRest._lazy_init() + PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes)) + + audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \ + len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes))) + asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] + latest_policies = None + if PolicyRest._scope_thread_pool_size == 1: + latest_policies = [PolicyRest._get_latest_policies(asps[0])] + else: + pool = ThreadPool(PolicyRest._scope_thread_pool_size) + latest_policies = pool.map(PolicyRest._get_latest_policies, asps) + pool.close() + pool.join() + + audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \ + len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \ + len(latest_policies), json.dumps(latest_policies)), \ + targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) + + latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items()) + PolicyRest._logger.debug("latest_policies: %s %s", \ + json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies)) + + return latest_policies |