diff options
author | Alex Shatov <alexs@att.com> | 2019-04-01 11:32:06 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2019-04-01 11:32:06 -0400 |
commit | 9a4d3c5b8dc9c7697275cab38ee45b014dff9e55 (patch) | |
tree | d4d55bcc8bc237ee3199d0e6a13f5e7cd95fadea /policyhandler/policy_rest.py | |
parent | ebc1a062328e53e97e4d24ed111534cfc567a809 (diff) |
5.0.0 policy-handler - new PDP API or old PDP API4.0.0-ONAPdublin
- in R4 Dublin the policy-engine introduced a totally new API
- policy-handler now has a startup option to either use the new PDP API
or the old PDP API that was created-updated before the end of 2018
- see README.md and README_pdp_api_v0.md for instructions on how to
setup the policy-handler running either with the new PDP API
or the old (pdp_api_v0) PDP API
- this is a massive refactoring that changed almost all the source files,
but kept the old logic when using the old (pdp_api_v0) PDP API
- all the code related to PDP API version is split into two subfolders
= pdp_api/ contains the new PDP API source code
= pdp_api_v0/ contains the old (2018) PDP API source code
= pdp_client.py imports from either pdp_api or pdp_api_v0
= the rest of the code is only affected when it needs to branch
the logic
- logging to policy_handler.log now shows the path of the source file to
allow tracing which PDP API is actually used
- when the new PDP API is used, the policy-update flow is disabled
= passive mode of operation
= no web-socket
= no periodic catch_up
= no policy-filters
= reduced web-API - only a single /policy_latest endpoint is available
/policies_latest returns 404
/catch_up request is accepted, but ignored
- on new PDP API: http /policy_latest returns the new data from the
new PDP API with the following fields added by the policy-handler
to keep other policy related parts intact in R4
(see pdp_api/policy_utils.py)
= "policyName" = policy_id + "." + "policyVersion" + ".xml"
= "policyVersion" = str("metadata"."policy-version")
= "config" - is the renamed "properties" from the new PDP API response
- unit tests are split into two subfolders as well
= main/ for the new PDP API testing
= pdp_api_v0/ for the old (2018) PDP API
- removed the following line from the license text of changed files
ECOMP is a trademark and service mark of AT&T Intellectual Property.
- the new PDP API is expected to be extended and redesigned in R5 El Alto
- on retiring the old PDP API - the intention is to be able to remove
the pdp_api_v0/ subfolder and minimal related cleanup of the code
that imports that as well as the cleanup of the config.py, etc.
Change-Id: Ief9a2ae4541300308caaf97377f4ed051535dbe4
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-1128
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r-- | policyhandler/policy_rest.py | 586 |
1 files changed, 0 insertions, 586 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py deleted file mode 100644 index 85dd914..0000000 --- a/policyhandler/policy_rest.py +++ /dev/null @@ -1,586 +0,0 @@ -# ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -"""policy-client communicates with policy-engine thru REST API""" - -import copy -import json -import logging -import time -import urllib.parse -from multiprocessing.dummy import Pool as ThreadPool -from threading import Lock - -import requests - -from .config import Config, Settings -from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, - AuditResponseCode, Metrics) -from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, - POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS, - POLICY_ID, POLICY_NAME) -from .policy_utils import PolicyUtils - - -class PolicyRest(object): - """using the http API to policy-engine""" - _logger = logging.getLogger("policy_handler.policy_rest") - _lazy_inited = False - POLICY_GET_CONFIG = 'getConfig' - PDP_CONFIG_STATUS = "policyConfigStatus" - PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED" - PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" - PDP_CONFIG_MESSAGE = "policyConfigMessage" - PDP_NO_RESPONSE_RECEIVED = "No Response Received" - PDP_STATUS_CODE_ERROR = 400 - PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." - - EXPECTED_VERSIONS = "expected_versions" - IGNORE_POLICY_NAMES = "ignore_policy_names" - DEFAULT_TIMEOUT_IN_SECS = 60 - - _lock = Lock() - _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, - Config.THREAD_POOL_SIZE, - Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) - - _requests_session = None - _url_get_config = None - _headers = None - _target_entity = None - _custom_kwargs = {} - _thread_pool_size = 4 - _policy_retry_count = 1 - _policy_retry_sleep = 0 - _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS - - @staticmethod - def _init(): - """init static config""" - PolicyRest._custom_kwargs = {} - - _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) - - if not PolicyRest._requests_session: - PolicyRest._requests_session = requests.Session() - - changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) - if changed: - PolicyRest._requests_session.mount( - 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, - pool_maxsize=pool_size)) - PolicyRest._requests_session.mount( - 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, - pool_maxsize=pool_size)) - - get_config_path = urllib.parse.urljoin( - config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG) - PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path) - PolicyRest._headers = config.get("headers", {}) - PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( - Config.THREAD_POOL_SIZE, 4) - if PolicyRest._thread_pool_size < 2: - PolicyRest._thread_pool_size = 2 - - _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( - Config.POLICY_RETRY_COUNT, 1) - _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( - Config.POLICY_RETRY_SLEEP, 0) - - tls_ca_mode = config.get(Config.TLS_CA_MODE) - PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) - PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) - if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1: - PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS - - PolicyRest._logger.info( - "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s", - PolicyRest._target_entity, PolicyRest._url_get_config, - Metrics.json_dumps(PolicyRest._headers), tls_ca_mode, - PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs), - PolicyRest._settings) - - PolicyRest._settings.commit_change() - PolicyRest._lazy_inited = True - - @staticmethod - def reconfigure(): - """reconfigure""" - with PolicyRest._lock: - PolicyRest._settings.set_config(Config.discovered_config) - if not PolicyRest._settings.is_changed(): - PolicyRest._settings.commit_change() - return False - - PolicyRest._lazy_inited = False - PolicyRest._init() - return True - - @staticmethod - def _lazy_init(): - """init static config""" - if PolicyRest._lazy_inited: - return - - with PolicyRest._lock: - if PolicyRest._lazy_inited: - return - - PolicyRest._settings.set_config(Config.discovered_config) - PolicyRest._init() - - @staticmethod - def _pdp_get_config(audit, json_body): - """Communication with the policy-engine""" - with PolicyRest._lock: - session = PolicyRest._requests_session - target_entity = PolicyRest._target_entity - url = PolicyRest._url_get_config - timeout_in_secs = PolicyRest._timeout_in_secs - headers = copy.deepcopy(PolicyRest._headers) - custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) - - metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) - - headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id - - log_action = "post to {} at {}".format(target_entity, url) - log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( - json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs), - timeout_in_secs) - log_line = log_action + " " + log_data - - PolicyRest._logger.info(metrics.metrics_start(log_line)) - - res = None - try: - res = session.post(url, json=json_body, headers=headers, timeout=timeout_in_secs, - **custom_kwargs) - except Exception as ex: - error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - if isinstance(ex, requests.exceptions.RequestException) - else AuditHttpCode.SERVER_INTERNAL_ERROR.value) - error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) - - PolicyRest._logger.exception(error_msg) - metrics.set_http_status_code(error_code) - audit.set_http_status_code(error_code) - metrics.metrics(error_msg) - return (error_code, None) - - log_line = "response {} from {}: text={} headers={}".format( - res.status_code, log_line, res.text, - Metrics.json_dumps(dict(res.request.headers.items()))) - - status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) - - if status_code: - return status_code, res_data - - metrics.set_http_status_code(res.status_code) - metrics.metrics(log_line) - PolicyRest._logger.info(log_line) - return res.status_code, res_data - - @staticmethod - def _extract_pdp_res_data(audit, metrics, log_line, res): - """special treatment of pdp response""" - res_data = None - if res.status_code == requests.codes.ok: - res_data = res.json() - - if res_data and isinstance(res_data, list) and len(res_data) == 1: - rslt = res_data[0] or {} - if not rslt.get(POLICY_NAME): - res_data = None - if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: - error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - error_msg = "{} unexpected {}".format(error_code, log_line) - - PolicyRest._logger.error(error_msg) - metrics.set_http_status_code(error_code) - audit.set_http_status_code(error_code) - metrics.metrics(error_msg) - return error_code, None - return None, res_data - - if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR: - try: - res_data = res.json() - except ValueError: - return None, None - - if not res_data or not isinstance(res_data, list) or len(res_data) != 1: - return None, None - - rslt = res_data[0] - if (rslt and not rslt.get(POLICY_NAME) - and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND - and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): - status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value - info_msg = "{} not found {}".format(status_code, log_line) - - PolicyRest._logger.info(info_msg) - metrics.set_http_status_code(status_code) - metrics.metrics(info_msg) - return status_code, None - return None, None - - - @staticmethod - def _validate_policy(policy): - """Validates the config on policy""" - if not policy: - return - - policy_body = policy.get(POLICY_BODY) - - return bool( - policy_body - and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED - and policy_body.get(POLICY_CONFIG) - ) - - @staticmethod - def get_latest_policy(aud_policy_id): - """safely try retrieving the latest policy for the policy_id from the policy-engine""" - audit, policy_id, expected_versions, ignore_policy_names = aud_policy_id - str_metrics = "policy_id({0}), expected_versions({1}) ignore_policy_names({2})".format( - policy_id, json.dumps(expected_versions), json.dumps(ignore_policy_names)) - - try: - return PolicyRest._get_latest_policy( - audit, policy_id, expected_versions, ignore_policy_names, str_metrics) - - except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: {4}" - .format(audit.request_id, type(ex).__name__, str(ex), - "get_latest_policy", str_metrics)) - - PolicyRest._logger.exception(error_msg) - audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None - - - @staticmethod - def _get_latest_policy(audit, policy_id, - expected_versions, ignore_policy_names, str_metrics): - """retry several times getting the latest policy for the policy_id from the policy-engine""" - PolicyRest._lazy_init() - latest_policy = None - status_code = 0 - retry_get_config = audit.kwargs.get("retry_get_config") - expect_policy_removed = (ignore_policy_names and not expected_versions) - - for retry in range(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s", - retry, retry_get_config, str_metrics) - - done, latest_policy, status_code = PolicyRest._get_latest_policy_once( - audit, policy_id, expected_versions, ignore_policy_names, - expect_policy_removed) - - if done or not retry_get_config or not PolicyRest._policy_retry_sleep: - break - - if retry == PolicyRest._policy_retry_count: - PolicyRest._logger.error( - audit.error("gave up retrying after #{} for policy_id({}) from PDP {}" - .format(retry, policy_id, PolicyRest._url_get_config), - error_code=AuditResponseCode.DATA_ERROR)) - break - - PolicyRest._logger.warning(audit.warn( - "will retry({}) for policy_id({}) in {} secs from PDP {}".format( - retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config), - error_code=AuditResponseCode.DATA_ERROR)) - time.sleep(PolicyRest._policy_retry_sleep) - - if (expect_policy_removed and not latest_policy - and AuditHttpCode.RESPONSE_ERROR.value == status_code): - audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) - return None - - audit.set_http_status_code(status_code) - if not PolicyRest._validate_policy(latest_policy): - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - PolicyRest._logger.error(audit.error( - "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR)) - - return latest_policy - - @staticmethod - def _get_latest_policy_once(audit, policy_id, - expected_versions, ignore_policy_names, - expect_policy_removed): - """single attempt to get the latest policy for the policy_id from the policy-engine""" - - status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) - - PolicyRest._logger.debug("%s %s policy_bodies: %s", - status_code, policy_id, json.dumps(policy_bodies or [])) - - latest_policy = PolicyUtils.select_latest_policy( - policy_bodies, expected_versions, ignore_policy_names - ) - - if not latest_policy and not expect_policy_removed: - PolicyRest._logger.error( - audit.error("received unexpected policy data from PDP for policy_id={}: {}" - .format(policy_id, json.dumps(policy_bodies or [])), - error_code=AuditResponseCode.DATA_ERROR)) - - done = bool(latest_policy - or (expect_policy_removed and not policy_bodies) - or audit.is_serious_error(status_code)) - - return done, latest_policy, status_code - - @staticmethod - def get_latest_updated_policies(aud_policy_updates): - """safely try retrieving the latest policies for the list of policy_names""" - audit, policies_updated, policies_removed = aud_policy_updates - if not policies_updated and not policies_removed: - return None, None - - str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( - len(policies_updated), json.dumps(policies_updated), - len(policies_removed), json.dumps(policies_removed)) - - try: - return PolicyRest._get_latest_updated_policies( - audit, str_metrics, policies_updated, policies_removed) - - except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: {4}" - .format(audit.request_id, type(ex).__name__, str(ex), - "get_latest_updated_policies", str_metrics)) - - PolicyRest._logger.exception(error_msg) - audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None, None - - @staticmethod - def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): - """Get the latest policies of the list of policy_names from the policy-engine""" - PolicyRest._lazy_init() - metrics_total = Metrics( - aud_parent=audit, - targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), - targetServiceName=PolicyRest._url_get_config) - - metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) - PolicyRest._logger.debug(str_metrics) - - policies_to_find = {} - for (policy_id, policy_version) in policies_updated: - if not policy_id or not policy_version or not policy_version.isdigit(): - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.EXPECTED_VERSIONS: {policy_version: True}, - PolicyRest.IGNORE_POLICY_NAMES: {} - } - continue - policy[PolicyRest.EXPECTED_VERSIONS][policy_version] = True - - for (policy_id, policy_names) in policies_removed: - if not policy_id: - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.IGNORE_POLICY_NAMES: policy_names - } - continue - policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names) - - apns = [(audit, policy_id, - policy_to_find.get(PolicyRest.EXPECTED_VERSIONS), - policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) - for (policy_id, policy_to_find) in policies_to_find.items()] - - policies = None - apns_length = len(apns) - PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length, - json.dumps(policies_to_find)) - - if apns_length == 1: - policies = [PolicyRest.get_latest_policy(apns[0])] - else: - pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) - policies = pool.map(PolicyRest.get_latest_policy, apns) - pool.close() - pool.join() - - metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}" - .format(apns_length, str_metrics, - len(policies), json.dumps(policies))) - - updated_policies = dict((policy[POLICY_ID], policy) - for policy in policies - if policy and policy.get(POLICY_ID)) - - removed_policies = dict((policy_id, True) - for (policy_id, policy_to_find) in policies_to_find.items() - if not policy_to_find.get(PolicyRest.EXPECTED_VERSIONS) - and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) - and policy_id not in updated_policies) - - errored_policies = dict((policy_id, policy_to_find) - for (policy_id, policy_to_find) in policies_to_find.items() - if policy_id not in updated_policies - and policy_id not in removed_policies) - - PolicyRest._logger.debug( - "result(%s) updated_policies %s, removed_policies %s, errored_policies %s", - apns_length, json.dumps(updated_policies), json.dumps(removed_policies), - json.dumps(errored_policies)) - - if errored_policies: - audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) - audit.error( - "errored_policies in PDP: {}".format(json.dumps(errored_policies)), - error_code=AuditResponseCode.DATA_ERROR) - - return updated_policies, removed_policies - - - @staticmethod - def _get_latest_policies(aud_policy_filter): - """get the latest policies by policy_filter from the policy-engine""" - audit, policy_filter = aud_policy_filter - try: - str_policy_filter = json.dumps(policy_filter) - PolicyRest._logger.debug("%s", str_policy_filter) - - status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter) - audit.set_http_status_code(status_code) - - PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code, - str_policy_filter, json.dumps(policy_bodies or [])) - - latest_policies = PolicyUtils.select_latest_policies(policy_bodies) - - if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - PolicyRest._logger.warning(audit.warn( - "received no policies from PDP for policy_filter {}: {}" - .format(str_policy_filter, json.dumps(policy_bodies or [])), - error_code=AuditResponseCode.DATA_ERROR)) - return None, latest_policies - - valid_policies = {} - errored_policies = {} - for (policy_id, policy) in latest_policies.items(): - if PolicyRest._validate_policy(policy): - valid_policies[policy_id] = policy - else: - errored_policies[policy_id] = policy - return valid_policies, errored_policies - - except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})" - .format(audit.request_id, type(ex).__name__, str(ex), - "_get_latest_policies", json.dumps(policy_filter))) - - PolicyRest._logger.exception(error_msg) - audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None, None - - - @staticmethod - def get_latest_policies(audit, policy_filter=None, policy_filters=None): - """Get the latest policies by policy-filter(s) from the policy-engine""" - result = {} - aud_policy_filters = None - str_policy_filters = None - str_metrics = None - target_entity = None - - try: - PolicyRest._lazy_init() - if policy_filter: - aud_policy_filters = [(audit, policy_filter)] - str_policy_filters = json.dumps(policy_filter) - str_metrics = "get_latest_policies for policy_filter {0}".format( - str_policy_filters) - target_entity = ("{0} total get_latest_policies by policy_filter" - .format(PolicyRest._target_entity)) - result[POLICY_FILTER] = copy.deepcopy(policy_filter) - elif policy_filters: - aud_policy_filters = [ - (audit, policy_filter) - for policy_filter in policy_filters - ] - str_policy_filters = json.dumps(policy_filters) - str_metrics = "get_latest_policies for policy_filters {0}".format( - str_policy_filters) - target_entity = ("{0} total get_latest_policies by policy_filters" - .format(PolicyRest._target_entity)) - result[POLICY_FILTERS] = copy.deepcopy(policy_filters) - else: - return result - - PolicyRest._logger.debug("%s", str_policy_filters) - metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity, - targetServiceName=PolicyRest._url_get_config) - - metrics_total.metrics_start(str_metrics) - - latest_policies = None - apfs_length = len(aud_policy_filters) - if apfs_length == 1: - latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] - else: - pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length)) - latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) - pool.close() - pool.join() - - metrics_total.metrics("total result {0}: {1} {2}".format( - str_metrics, len(latest_policies), json.dumps(latest_policies))) - - # latest_policies == [(valid_policies, errored_policies), ...] - result[LATEST_POLICIES] = dict( - pair for (vps, _) in latest_policies if vps for pair in vps.items()) - - result[ERRORED_POLICIES] = dict( - pair for (_, eps) in latest_policies if eps for pair in eps.items()) - - PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", - str_policy_filters, json.dumps(result)) - return result - - except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: {4}" - .format(audit.request_id, type(ex).__name__, str(ex), - "get_latest_policies", str_metrics)) - - PolicyRest._logger.exception(error_msg) - audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None |