diff options
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r-- | policyhandler/policy_rest.py | 655 |
1 files changed, 419 insertions, 236 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index bf8a31d..c8018f6 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -1,8 +1,5 @@ -"""policy-client communicates with policy-engine thru REST API""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,126 +16,43 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging -import json +"""policy-client communicates with policy-engine thru REST API""" + import copy -import re +import json +import logging import time from multiprocessing.dummy import Pool as ThreadPool + import requests from .config import Config -from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \ - POLICY_BODY, POLICY_CONFIG -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode - -class PolicyUtils(object): - """policy-client utils""" - _logger = logging.getLogger("policy_handler.policy_utils") - _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') - - @staticmethod - def safe_json_parse(json_str): - """try parsing json without exception - returns the json_str back if fails""" - if not json_str: - return json_str - try: - return json.loads(json_str) - except ValueError as err: - PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) - return json_str - - @staticmethod - def extract_policy_id(policy_name): - """ policy_name = policy_id + "." + <version> + "." + <extension> - For instance, - policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml - policy_id = DCAE_alex.Config_alex_policy_number_1 - policy_scope = DCAE_alex - policy_class = Config - policy_version = 3 - type = extension = xml - delimiter = "." - policy_class_delimiter = "_" - policy_name in PAP = DCAE_alex.alex_policy_number_1 - """ - if not policy_name: - return - return PolicyUtils._policy_name_ext.sub('', policy_name) - - @staticmethod - def parse_policy_config(policy): - """try parsing the config in policy.""" - if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]: - policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse( - policy[POLICY_BODY][POLICY_CONFIG]) - return policy - - @staticmethod - def convert_to_policy(policy_config): - """wrap policy_config received from policy-engine with policy_id.""" - if not policy_config or POLICY_NAME not in policy_config \ - or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]: - return - policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME]) - if not policy_id: - return - return {POLICY_ID:policy_id, POLICY_BODY:policy_config} - - @staticmethod - def select_latest_policy(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest version - """ - if not policy_configs: - return - latest_policy_config = {} - for policy_config in policy_configs: - if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \ - or not policy_config[POLICY_VERSION].isdigit(): - continue - if not latest_policy_config \ - or int(policy_config[POLICY_VERSION]) \ - > int(latest_policy_config[POLICY_VERSION]): - latest_policy_config = policy_config - - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) - - @staticmethod - def select_latest_policies(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. - DCAE-Controller is only interested in the latest versions - """ - if not policy_configs: - return {} - policies = {} - for policy_config in policy_configs: - policy = PolicyUtils.convert_to_policy(policy_config) - if not policy or POLICY_ID not in policy or POLICY_BODY not in policy: - continue - if POLICY_VERSION not in policy[POLICY_BODY] \ - or not policy[POLICY_BODY][POLICY_VERSION] \ - or not policy[POLICY_BODY][POLICY_VERSION].isdigit(): - continue - if policy[POLICY_ID] not in policies: - policies[policy[POLICY_ID]] = policy - continue - if int(policy[POLICY_BODY][POLICY_VERSION]) \ - > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]): - policies[policy[POLICY_ID]] = policy - - for policy_id in policies: - policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES, + POLICY_BODY, POLICY_CONFIG, POLICY_FILTER, + POLICY_ID, POLICY_NAME, SCOPE_PREFIXES) +from .policy_utils import PolicyUtils - return policies class PolicyRest(object): - """ policy-engine """ + """using the http API to policy-engine""" _logger = logging.getLogger("policy_handler.policy_rest") _lazy_inited = False + POLICY_GET_CONFIG = 'getConfig' + PDP_CONFIG_STATUS = "policyConfigStatus" + PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED" + PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" + PDP_CONFIG_MESSAGE = "policyConfigMessage" + PDP_NO_RESPONSE_RECEIVED = "No Response Received" + PDP_STATUS_CODE_ERROR = 400 + PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." + + MIN_VERSION_EXPECTED = "min_version_expected" + IGNORE_POLICY_NAMES = "ignore_policy_names" _requests_session = None - _url = None + _url_get_config = None _headers = None _target_entity = None _thread_pool_size = 4 @@ -154,7 +68,7 @@ class PolicyRest(object): return PolicyRest._lazy_inited = True - config = Config.config[Config.FIELD_POLICY_ENGINE] + config = Config.settings[Config.FIELD_POLICY_ENGINE] pool_size = config.get("pool_connections", 20) PolicyRest._requests_session = requests.Session() @@ -167,191 +81,460 @@ class PolicyRest(object): requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) - PolicyRest._url = config["url"] + config["path_api"] + PolicyRest._url_get_config = config["url"] \ + + config["path_api"] + PolicyRest.POLICY_GET_CONFIG PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) + PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._scope_prefixes = Config.config["scope_prefixes"] + PolicyRest._scope_prefixes = Config.settings["scope_prefixes"] PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ len(PolicyRest._scope_prefixes)) - PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) + PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 + PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) - PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \ - PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \ + PolicyRest._logger.info( + "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", + PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers), json.dumps(PolicyRest._scope_prefixes)) @staticmethod - def _post(audit, path, json_body): + def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" - full_path = PolicyRest._url + path - sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \ - targetServiceName=full_path) + metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, + targetServiceName=PolicyRest._url_get_config) msg = json.dumps(json_body) headers = copy.copy(PolicyRest._headers) - headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id - headers_str = Audit.log_json_dumps(headers) + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + headers_str = Metrics.log_json_dumps(headers) - log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str) - sub_aud.metrics_start(log_line) + log_line = "post to PDP {0} msg={1} headers={2}".format( + PolicyRest._url_get_config, msg, headers_str) + metrics.metrics_start(log_line) PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers) - except requests.exceptions.RequestException as ex: - error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \ - .format(full_path, str(ex), msg, headers_str) + res = PolicyRest._requests_session.post( + PolicyRest._url_get_config, json=json_body, headers=headers) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ( + "failed to post to PDP {0} {1}: {2} msg={3} headers={4}" + .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str)) PolicyRest._logger.exception(error_msg) - sub_aud.set_http_status_code(error_code) + metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) - sub_aud.metrics(error_msg) + metrics.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \ - full_path, res.status_code, msg, res.text, \ - Audit.log_json_dumps(dict(res.request.headers.items()))) - sub_aud.set_http_status_code(res.status_code) - sub_aud.metrics(log_line) + log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( + PolicyRest._url_get_config, res.status_code, msg, res.text, + Metrics.log_json_dumps(dict(res.request.headers.items()))) + + status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) + + if status_code: + return status_code, res_data + + metrics.set_http_status_code(res.status_code) + metrics.metrics(log_line) PolicyRest._logger.info(log_line) + return res.status_code, res_data + @staticmethod + def _extract_pdp_res_data(audit, metrics, log_line, res): + """special treatment of pdp response""" + res_data = None if res.status_code == requests.codes.ok: - return res.status_code, res.json() + res_data = res.json() + + if res_data and isinstance(res_data, list) and len(res_data) == 1: + rslt = res_data[0] + if rslt and not rslt.get(POLICY_NAME): + res_data = None + if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: + error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + error_msg = "unexpected {0}".format(log_line) + + PolicyRest._logger.error(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return error_code, None + return None, res_data + + if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR: + try: + res_data = res.json() + except ValueError: + return None, None + + if not res_data or not isinstance(res_data, list) or len(res_data) != 1: + return None, None + + rslt = res_data[0] + if (rslt and not rslt.get(POLICY_NAME) + and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND + and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): + status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value + info_msg = "not found {0}".format(log_line) + + PolicyRest._logger.info(info_msg) + metrics.set_http_status_code(status_code) + metrics.metrics(info_msg) + return status_code, None + return None, None - return res.status_code, None @staticmethod - def get_latest_policy(aud_policy_name): - """Get the latest policy for the policy_name from the policy-engine""" - PolicyRest._lazy_init() - audit, policy_name = aud_policy_name + def _validate_policy(policy): + """Validates the config on policy""" + if not policy: + return - status_code = 0 + policy_body = policy.get(POLICY_BODY) + + return bool( + policy_body + and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED + and policy_body.get(POLICY_CONFIG) + ) + + @staticmethod + def get_latest_policy(aud_policy_id): + """safely try retrieving the latest policy for the policy_id from the policy-engine""" + audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id + str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format( + policy_id, min_version_expected, json.dumps(ignore_policy_names)) + + try: + return PolicyRest._get_latest_policy( + audit, policy_id, min_version_expected, ignore_policy_names, str_metrics) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policy", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None + + + @staticmethod + def _get_latest_policy(audit, policy_id, + min_version_expected, ignore_policy_names, str_metrics): + """retry several times getting the latest policy for the policy_id from the policy-engine""" + PolicyRest._lazy_init() latest_policy = None - for retry in xrange(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("%s", policy_name) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:policy_name}) - PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \ - json.dumps(policy_configs or [])) - latest_policy = PolicyUtils.select_latest_policy(policy_configs) - if not latest_policy: - audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \ - .format(policy_name, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) - - if latest_policy or not audit.retry_get_config \ - or not PolicyRest._policy_retry_sleep \ - or AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value: + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") + expect_policy_removed = (ignore_policy_names and not min_version_expected) + + for retry in range(1, PolicyRest._policy_retry_count + 1): + PolicyRest._logger.debug(str_metrics) + + done, latest_policy, status_code = PolicyRest._get_latest_policy_once( + audit, policy_id, min_version_expected, ignore_policy_names, + expect_policy_removed) + + if done or not retry_get_config or not PolicyRest._policy_retry_sleep: break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \ - .format(POLICY_GET_CONFIG, retry, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + error_code=AuditResponseCode.DATA_ERROR) break - audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \ - .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) + audit.warn( + "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, + PolicyRest._policy_retry_sleep, policy_id), + error_code=AuditResponseCode.DATA_ERROR) time.sleep(PolicyRest._policy_retry_sleep) + if (expect_policy_removed and not latest_policy + and AuditHttpCode.RESPONSE_ERROR.value == status_code): + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + audit.set_http_status_code(status_code) - if not latest_policy: + if not PolicyRest._validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR) + return latest_policy @staticmethod - def get_latest_policies_by_names(aud_policy_names): + def _get_latest_policy_once(audit, policy_id, + min_version_expected, ignore_policy_names, + expect_policy_removed): + """single attempt to get the latest policy for the policy_id from the policy-engine""" + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) + + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) + + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) + + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + + done = bool(latest_policy + or (expect_policy_removed and not policy_configs) + or audit.is_serious_error(status_code)) + + return done, latest_policy, status_code + + @staticmethod + def get_latest_updated_policies(aud_policy_updates): + """safely try retrieving the latest policies for the list of policy_names""" + audit, policies_updated, policies_removed = aud_policy_updates + if not policies_updated and not policies_removed: + return None, None + + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) + + try: + return PolicyRest._get_latest_updated_policies( + audit, str_metrics, policies_updated, policies_removed) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_updated_policies", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None + + @staticmethod + def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): """Get the latest policies of the list of policy_names from the policy-engine""" PolicyRest._lazy_init() - audit, policy_names = aud_policy_names - if not policy_names: - return + metrics = Metrics( + aud_parent=audit, + targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True - audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \ - len(policy_names), json.dumps(policy_names))) - PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names)) + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.items()] - thread_count = min(PolicyRest._thread_pool_size, len(policy_names)) - apns = [(audit, policy_name) for policy_name in policy_names] policies = None - if thread_count == 1: + apns_length = len(apns) + if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] else: - pool = ThreadPool(thread_count) + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) policies = pool.map(PolicyRest.get_latest_policy, apns) pool.close() pool.join() - audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \ - len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) - policies = dict([(policy[POLICY_ID], policy) \ - for policy in policies if policy and POLICY_ID in policy]) - PolicyRest._logger.debug("policies %s", json.dumps(policies)) - if not policies: + metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.items() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.items() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - return policies + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + @staticmethod - def _get_latest_policies(aud_scope_prefix): - """Get the latest policies of the same scope from the policy-engine""" - audit, scope_prefix = aud_scope_prefix - PolicyRest._logger.debug("%s", scope_prefix) - status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \ - {POLICY_NAME:scope_prefix + ".*"}) - audit.set_http_status_code(status_code) - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \ - scope_prefix, json.dumps(policy_configs or [])) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) + def _get_latest_policies(aud_policy_filter): + """ + get the latest policies by policy_filter + or all the latest policies of the same scope from the policy-engine + """ + audit, policy_filter, scope_prefix = aud_policy_filter + try: + str_policy_filter = json.dumps(policy_filter) + PolicyRest._logger.debug("%s", str_policy_filter) + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + + PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, + str_policy_filter, json.dumps(policy_configs or [])) + + latest_policies = PolicyUtils.select_latest_policies(policy_configs) + + if (scope_prefix and not policy_configs + and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value): + audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies, scope_prefix + + if not latest_policies: + if not scope_prefix: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies, None + + audit.set_http_status_code(status_code) + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.items(): + if PolicyRest._validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + return valid_policies, errored_policies, None + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})" + .format(audit.request_id, type(ex).__name__, str(ex), + "_get_latest_policies", json.dumps(policy_filter), scope_prefix)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None, scope_prefix - if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \ - scope_prefix, json.dumps(policy_configs or [])), \ - errorCode=AuditResponseCode.DATA_ERROR.value, \ - errorDescription=AuditResponseCode.get_human_text( \ - AuditResponseCode.DATA_ERROR)) - return latest_policies @staticmethod - def get_latest_policies(audit): + def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" - PolicyRest._lazy_init() - PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes)) - - audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes))) - asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] - latest_policies = None - if PolicyRest._scope_thread_pool_size == 1: - latest_policies = [PolicyRest._get_latest_policies(asps[0])] - else: - pool = ThreadPool(PolicyRest._scope_thread_pool_size) - latest_policies = pool.map(PolicyRest._get_latest_policies, asps) - pool.close() - pool.join() + result = {} + aud_policy_filters = None + str_policy_filters = None + str_metrics = None + target_entity = None - audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \ - len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \ - len(latest_policies), json.dumps(latest_policies)), \ - targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG) - - latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items()) - PolicyRest._logger.debug("latest_policies: %s %s", \ - json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies)) + try: + PolicyRest._lazy_init() + if policy_filter is not None: + aud_policy_filters = [(audit, policy_filter, None)] + str_policy_filters = json.dumps(policy_filter) + str_metrics = "get_latest_policies for policy_filter {0}".format( + str_policy_filters) + target_entity = ("{0} total get_latest_policies by policy_filter" + .format(PolicyRest._target_entity)) + result[POLICY_FILTER] = copy.deepcopy(policy_filter) + else: + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) + for scope_prefix in PolicyRest._scope_prefixes] + str_policy_filters = json.dumps(PolicyRest._scope_prefixes) + str_metrics = "get_latest_policies for scopes {0} {1}".format( \ + len(PolicyRest._scope_prefixes), str_policy_filters) + target_entity = ("{0} total get_latest_policies by scope_prefixes" + .format(PolicyRest._target_entity)) + result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) + + PolicyRest._logger.debug("%s", str_policy_filters) + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start(str_metrics) + + latest_policies = None + apfs_length = len(aud_policy_filters) + if apfs_length == 1: + latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] + else: + pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) + pool.close() + pool.join() + + metrics.metrics("total result {0}: {1} {2}".format( + str_metrics, len(latest_policies), json.dumps(latest_policies))) + + # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + result[LATEST_POLICIES] = dict( + pair for (vps, _, _) in latest_policies if vps for pair in vps.items()) + + result[ERRORED_POLICIES] = dict( + pair for (_, eps, _) in latest_policies if eps for pair in eps.items()) + + result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) + + PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", + str_policy_filters, json.dumps(result)) + return result + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policies", str_metrics)) - return latest_policies + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None |