aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_rest.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-01-10 11:00:50 -0500
committerAlex Shatov <alexs@att.com>2018-01-10 11:07:30 -0500
commit1369bea8b3c24ef063799acefbfc01659878f034 (patch)
tree95fa3e5580f62be9c1e1d630ed0c6496b9fb03a2 /policyhandler/policy_rest.py
parentdc5da5bf63ae4a4ac11b4b5c46407e58da16fbfe (diff)
variable collection of policies per component
* new feature variable collection of policies per component in DCAE * massive refactoring * dissolved the external PolicyEngine.py into policy_receiver.py - kept only the web-socket communication to PolicyEngine * new /healthcheck - shows some stats of service running * Unit Test coverage 75% Change-Id: I816b7d5713ae0dd88fa73d3656f272b4f3e7946e Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r--policyhandler/policy_rest.py459
1 files changed, 262 insertions, 197 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index bf8a31d..1e50693 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -22,123 +22,30 @@
import logging
import json
import copy
-import re
import time
from multiprocessing.dummy import Pool as ThreadPool
import requests
from .config import Config
-from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_GET_CONFIG, \
- POLICY_BODY, POLICY_CONFIG
+from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode
-
-class PolicyUtils(object):
- """policy-client utils"""
- _logger = logging.getLogger("policy_handler.policy_utils")
- _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
-
- @staticmethod
- def safe_json_parse(json_str):
- """try parsing json without exception - returns the json_str back if fails"""
- if not json_str:
- return json_str
- try:
- return json.loads(json_str)
- except ValueError as err:
- PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
- return json_str
-
- @staticmethod
- def extract_policy_id(policy_name):
- """ policy_name = policy_id + "." + <version> + "." + <extension>
- For instance,
- policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
- policy_id = DCAE_alex.Config_alex_policy_number_1
- policy_scope = DCAE_alex
- policy_class = Config
- policy_version = 3
- type = extension = xml
- delimiter = "."
- policy_class_delimiter = "_"
- policy_name in PAP = DCAE_alex.alex_policy_number_1
- """
- if not policy_name:
- return
- return PolicyUtils._policy_name_ext.sub('', policy_name)
-
- @staticmethod
- def parse_policy_config(policy):
- """try parsing the config in policy."""
- if policy and POLICY_BODY in policy and POLICY_CONFIG in policy[POLICY_BODY]:
- policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(
- policy[POLICY_BODY][POLICY_CONFIG])
- return policy
-
- @staticmethod
- def convert_to_policy(policy_config):
- """wrap policy_config received from policy-engine with policy_id."""
- if not policy_config or POLICY_NAME not in policy_config \
- or POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION]:
- return
- policy_id = PolicyUtils.extract_policy_id(policy_config[POLICY_NAME])
- if not policy_id:
- return
- return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
-
- @staticmethod
- def select_latest_policy(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
- DCAE-Controller is only interested in the latest version
- """
- if not policy_configs:
- return
- latest_policy_config = {}
- for policy_config in policy_configs:
- if POLICY_VERSION not in policy_config or not policy_config[POLICY_VERSION] \
- or not policy_config[POLICY_VERSION].isdigit():
- continue
- if not latest_policy_config \
- or int(policy_config[POLICY_VERSION]) \
- > int(latest_policy_config[POLICY_VERSION]):
- latest_policy_config = policy_config
-
- return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
-
- @staticmethod
- def select_latest_policies(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
- DCAE-Controller is only interested in the latest versions
- """
- if not policy_configs:
- return {}
- policies = {}
- for policy_config in policy_configs:
- policy = PolicyUtils.convert_to_policy(policy_config)
- if not policy or POLICY_ID not in policy or POLICY_BODY not in policy:
- continue
- if POLICY_VERSION not in policy[POLICY_BODY] \
- or not policy[POLICY_BODY][POLICY_VERSION] \
- or not policy[POLICY_BODY][POLICY_VERSION].isdigit():
- continue
- if policy[POLICY_ID] not in policies:
- policies[policy[POLICY_ID]] = policy
- continue
- if int(policy[POLICY_BODY][POLICY_VERSION]) \
- > int(policies[policy[POLICY_ID]][POLICY_BODY][POLICY_VERSION]):
- policies[policy[POLICY_ID]] = policy
-
- for policy_id in policies:
- policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
-
- return policies
+from .policy_utils import PolicyUtils
class PolicyRest(object):
""" policy-engine """
_logger = logging.getLogger("policy_handler.policy_rest")
_lazy_inited = False
+ POLICY_GET_CONFIG = 'getConfig'
+ POLICY_CONFIG_STATUS = "policyConfigStatus"
+ CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
+ POLICY_CONFIG_MESSAGE = "policyConfigMessage"
+ NO_RESPONSE_RECEIVED = "No Response Received"
+
+ MIN_VERSION_EXPECTED = "min_version_expected"
+ IGNORE_POLICY_NAMES = "ignore_policy_names"
_requests_session = None
- _url = None
+ _url_get_config = None
_headers = None
_target_entity = None
_thread_pool_size = 4
@@ -167,7 +74,8 @@ class PolicyRest(object):
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
- PolicyRest._url = config["url"] + config["path_api"]
+ PolicyRest._url_get_config = config["url"] \
+ + config["path_api"] + PolicyRest.POLICY_GET_CONFIG
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4)
@@ -181,31 +89,32 @@ class PolicyRest(object):
PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \
- PolicyRest._url, Audit.log_json_dumps(PolicyRest._headers), \
+ PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \
json.dumps(PolicyRest._scope_prefixes))
@staticmethod
- def _post(audit, path, json_body):
+ def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
- full_path = PolicyRest._url + path
sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \
- targetServiceName=full_path)
+ targetServiceName=PolicyRest._url_get_config)
msg = json.dumps(json_body)
headers = copy.copy(PolicyRest._headers)
headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id
headers_str = Audit.log_json_dumps(headers)
- log_line = "post to PDP {0} msg={1} headers={2}".format(full_path, msg, headers_str)
+ log_line = "post to PDP {0} msg={1} headers={2}".format(
+ PolicyRest._url_get_config, msg, headers_str)
sub_aud.metrics_start(log_line)
PolicyRest._logger.info(log_line)
res = None
try:
- res = PolicyRest._requests_session.post(full_path, json=json_body, headers=headers)
+ res = PolicyRest._requests_session.post(
+ PolicyRest._url_get_config, json=json_body, headers=headers)
except requests.exceptions.RequestException as ex:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \
- .format(full_path, str(ex), msg, headers_str)
+ .format(PolicyRest._url_get_config, str(ex), msg, headers_str)
PolicyRest._logger.exception(error_msg)
sub_aud.set_http_status_code(error_code)
@@ -213,145 +122,301 @@ class PolicyRest(object):
sub_aud.metrics(error_msg)
return (error_code, None)
- log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( \
- full_path, res.status_code, msg, res.text, \
+ log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
+ PolicyRest._url_get_config, res.status_code, msg, res.text,
Audit.log_json_dumps(dict(res.request.headers.items())))
+
+ res_data = None
+ if res.status_code == requests.codes.ok:
+ res_data = res.json()
+
+ if res_data and isinstance(res_data, list) and len(res_data) == 1:
+ result = res_data[0]
+ if result and not result.get(POLICY_NAME):
+ res_data = None
+ if result.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED:
+ error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ error_msg = "unexpected {0}".format(log_line)
+
+ PolicyRest._logger.error(error_msg)
+ sub_aud.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ sub_aud.metrics(error_msg)
+ return (error_code, None)
+
sub_aud.set_http_status_code(res.status_code)
sub_aud.metrics(log_line)
PolicyRest._logger.info(log_line)
+ return res.status_code, res_data
- if res.status_code == requests.codes.ok:
- return res.status_code, res.json()
+ @staticmethod
+ def validate_policy(policy):
+ """Validates the config on policy"""
+ if not policy:
+ return
- return res.status_code, None
+ policy_body = policy.get(POLICY_BODY)
+
+ return bool(
+ policy_body
+ and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED
+ and policy_body.get(POLICY_CONFIG)
+ )
@staticmethod
- def get_latest_policy(aud_policy_name):
- """Get the latest policy for the policy_name from the policy-engine"""
+ def validate_policies(policies):
+ """Validate the config on policies. Returns (valid, errored) tuple"""
+ if not policies:
+ return None, policies
+
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in policies.iteritems():
+ if PolicyRest.validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+
+ return valid_policies, errored_policies
+
+ @staticmethod
+ def get_latest_policy(aud_policy_id):
+ """Get the latest policy for the policy_id from the policy-engine"""
PolicyRest._lazy_init()
- audit, policy_name = aud_policy_name
+ audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
status_code = 0
+ policy_configs = None
latest_policy = None
+ expect_policy_removed = (ignore_policy_names and not min_version_expected)
+
for retry in xrange(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug("%s", policy_name)
- status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
- {POLICY_NAME:policy_name})
- PolicyRest._logger.debug("%s %s policy_configs: %s", status_code, policy_name, \
- json.dumps(policy_configs or []))
- latest_policy = PolicyUtils.select_latest_policy(policy_configs)
- if not latest_policy:
- audit.error("received unexpected policy data from PDP for policy_name={0}: {1}" \
- .format(policy_name, json.dumps(policy_configs or [])), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ PolicyRest._logger.debug("%s", policy_id)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(
+ audit, {POLICY_NAME:policy_id}
+ )
+
+ PolicyRest._logger.debug("%s %s policy_configs: %s",
+ status_code, policy_id, json.dumps(policy_configs or []))
+
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_configs, min_version_expected, ignore_policy_names
+ )
+
+ if not latest_policy and not expect_policy_removed:
+ audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ .format(policy_id, json.dumps(policy_configs or [])),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR))
if latest_policy or not audit.retry_get_config \
+ or (expect_policy_removed and not policy_configs) \
or not PolicyRest._policy_retry_sleep \
- or AuditResponseCode.PERMISSION_ERROR.value \
- == AuditResponseCode.get_response_code(status_code).value:
+ or audit.is_serious_error(status_code):
break
if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_name={2}" \
- .format(POLICY_GET_CONFIG, retry, policy_name), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ .format(PolicyRest._url_get_config, retry, policy_id),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR))
break
- audit.warn("retry #{0} {1} from PDP in {2} secs for policy_name={3}" \
- .format(retry, POLICY_GET_CONFIG, PolicyRest._policy_retry_sleep, policy_name), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
+ audit.warn(
+ "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR))
time.sleep(PolicyRest._policy_retry_sleep)
+ if expect_policy_removed and not latest_policy \
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code:
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
audit.set_http_status_code(status_code)
- if not latest_policy:
+ if not PolicyRest.validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
+ )
+
return latest_policy
@staticmethod
- def get_latest_policies_by_names(aud_policy_names):
+ def get_latest_updated_policies(aud_policy_updates):
"""Get the latest policies of the list of policy_names from the policy-engine"""
PolicyRest._lazy_init()
- audit, policy_names = aud_policy_names
- if not policy_names:
+ audit, policies_updated, policies_removed = aud_policy_updates
+ if not policies_updated and not policies_removed:
return
- audit.metrics_start("get_latest_policies_by_names {0} {1}".format( \
- len(policy_names), json.dumps(policy_names)))
- PolicyRest._logger.debug("%d %s", len(policy_names), json.dumps(policy_names))
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
+ audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ PolicyRest._logger.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_name, policy_version) in policies_updated:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+ policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+
+ for (policy_name, _) in policies_removed:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()]
- thread_count = min(PolicyRest._thread_pool_size, len(policy_names))
- apns = [(audit, policy_name) for policy_name in policy_names]
policies = None
- if thread_count == 1:
+ apns_length = len(apns)
+ if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
else:
- pool = ThreadPool(thread_count)
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
policies = pool.map(PolicyRest.get_latest_policy, apns)
pool.close()
pool.join()
- audit.metrics("result get_latest_policies_by_names {0} {1}: {2} {3}".format( \
- len(policy_names), json.dumps(policy_names), len(policies), json.dumps(policies)), \
- targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
- policies = dict([(policy[POLICY_ID], policy) \
- for policy in policies if policy and POLICY_ID in policy])
- PolicyRest._logger.debug("policies %s", json.dumps(policies))
- if not policies:
+ audit.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)),
+ targetEntity=PolicyRest._target_entity,
+ targetServiceName=PolicyRest._url_get_config)
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()
+ if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ PolicyRest._logger.debug(
+ "result updated_policies %s, removed_policies %s, errored_policies %s",
+ json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- return policies
+ audit.error(
+ "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
+ )
+
+ return updated_policies, removed_policies
@staticmethod
- def _get_latest_policies(aud_scope_prefix):
- """Get the latest policies of the same scope from the policy-engine"""
- audit, scope_prefix = aud_scope_prefix
- PolicyRest._logger.debug("%s", scope_prefix)
- status_code, policy_configs = PolicyRest._post(audit, POLICY_GET_CONFIG, \
- {POLICY_NAME:scope_prefix + ".*"})
- audit.set_http_status_code(status_code)
- PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, \
- scope_prefix, json.dumps(policy_configs or []))
- latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+ def _get_latest_policies(aud_policy_filter):
+ """
+ get the latest policies by policy_filter
+ or all the latest policies of the same scope from the policy-engine
+ """
+ audit, policy_filter, error_if_not_found = aud_policy_filter
+ str_policy_filter = json.dumps(policy_filter)
+ PolicyRest._logger.debug("%s", str_policy_filter)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+
+ PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_configs or []))
+ latest_policies = PolicyUtils.select_latest_policies(policy_configs)
if not latest_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error("received unexpected policies data from PDP for scope {0}: {1}".format( \
- scope_prefix, json.dumps(policy_configs or [])), \
- errorCode=AuditResponseCode.DATA_ERROR.value, \
- errorDescription=AuditResponseCode.get_human_text( \
- AuditResponseCode.DATA_ERROR))
- return latest_policies
+ if error_if_not_found:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.warn(
+ "received no policies from PDP for policy_filter {0}: {1}"
+ .format(str_policy_filter, json.dumps(policy_configs or [])),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR)
+ )
+ return None, latest_policies
+
+ audit.set_http_status_code(status_code)
+ return PolicyRest.validate_policies(latest_policies)
@staticmethod
- def get_latest_policies(audit):
+ def get_latest_policies(audit, policy_filter=None):
"""Get the latest policies of the same scope from the policy-engine"""
PolicyRest._lazy_init()
- PolicyRest._logger.debug("%s", json.dumps(PolicyRest._scope_prefixes))
- audit.metrics_start("get_latest_policies for scopes {0} {1}".format( \
- len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes)))
- asps = [(audit, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes]
+ aud_policy_filters = None
+ str_metrics = None
+ str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes)
+ if policy_filter is not None:
+ aud_policy_filters = [(audit, policy_filter, True)]
+ str_metrics = "get_latest_policies for policy_filter {0}".format(
+ str_policy_filters)
+ else:
+ aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False)
+ for scope_prefix in PolicyRest._scope_prefixes]
+ str_metrics = "get_latest_policies for scopes {0} {1}".format( \
+ len(PolicyRest._scope_prefixes), str_policy_filters)
+
+ PolicyRest._logger.debug("%s", str_policy_filters)
+ audit.metrics_start(str_metrics)
+
latest_policies = None
- if PolicyRest._scope_thread_pool_size == 1:
- latest_policies = [PolicyRest._get_latest_policies(asps[0])]
+ apfs_length = len(aud_policy_filters)
+ if apfs_length == 1:
+ latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
else:
- pool = ThreadPool(PolicyRest._scope_thread_pool_size)
- latest_policies = pool.map(PolicyRest._get_latest_policies, asps)
+ pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+ latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
pool.close()
pool.join()
- audit.metrics("total result get_latest_policies for scopes {0} {1}: {2} {3}".format( \
- len(PolicyRest._scope_prefixes), json.dumps(PolicyRest._scope_prefixes), \
- len(latest_policies), json.dumps(latest_policies)), \
- targetEntity=PolicyRest._target_entity, targetServiceName=POLICY_GET_CONFIG)
+ audit.metrics("total result {0}: {1} {2}".format(
+ str_metrics, len(latest_policies), json.dumps(latest_policies)), \
+ targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config)
+
+ # latest_policies == [(valid_policies, errored_policies), ...]
+ valid_policies = dict(
+ pair for (vps, _) in latest_policies if vps for pair in vps.iteritems())
+
+ errored_policies = dict(
+ pair for (_, eps) in latest_policies if eps for pair in eps.iteritems())
- latest_policies = dict(pair for lp in latest_policies if lp for pair in lp.items())
- PolicyRest._logger.debug("latest_policies: %s %s", \
- json.dumps(PolicyRest._scope_prefixes), json.dumps(latest_policies))
+ PolicyRest._logger.debug(
+ "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s",
+ str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies))
- return latest_policies
+ return valid_policies, errored_policies