aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_rest.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r--policyhandler/policy_rest.py391
1 files changed, 214 insertions, 177 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 16c38bb..c8018f6 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -36,17 +36,17 @@ from .policy_utils import PolicyUtils
class PolicyRest(object):
- """ policy-engine """
+ """using the http API to policy-engine"""
_logger = logging.getLogger("policy_handler.policy_rest")
_lazy_inited = False
POLICY_GET_CONFIG = 'getConfig'
- POLICY_CONFIG_STATUS = "policyConfigStatus"
- CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
- CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
- POLICY_CONFIG_MESSAGE = "policyConfigMessage"
- NO_RESPONSE_RECEIVED = "No Response Received"
- POLICY_ENGINE_STATUS_CODE_ERROR = 400
- PE_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
+ PDP_CONFIG_STATUS = "policyConfigStatus"
+ PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
+ PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
+ PDP_CONFIG_MESSAGE = "policyConfigMessage"
+ PDP_NO_RESPONSE_RECEIVED = "No Response Received"
+ PDP_STATUS_CODE_ERROR = 400
+ PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
MIN_VERSION_EXPECTED = "min_version_expected"
IGNORE_POLICY_NAMES = "ignore_policy_names"
@@ -68,7 +68,7 @@ class PolicyRest(object):
return
PolicyRest._lazy_inited = True
- config = Config.config[Config.FIELD_POLICY_ENGINE]
+ config = Config.settings[Config.FIELD_POLICY_ENGINE]
pool_size = config.get("pool_connections", 20)
PolicyRest._requests_session = requests.Session()
@@ -85,15 +85,15 @@ class PolicyRest(object):
+ config["path_api"] + PolicyRest.POLICY_GET_CONFIG
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
- PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4)
+ PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._scope_prefixes = Config.config["scope_prefixes"]
+ PolicyRest._scope_prefixes = Config.settings["scope_prefixes"]
PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \
len(PolicyRest._scope_prefixes))
- PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1
- PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
+ PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
+ PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
PolicyRest._logger.info(
"PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
@@ -137,6 +137,19 @@ class PolicyRest(object):
PolicyRest._url_get_config, res.status_code, msg, res.text,
Metrics.log_json_dumps(dict(res.request.headers.items())))
+ status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
+
+ if status_code:
+ return status_code, res_data
+
+ metrics.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
+ PolicyRest._logger.info(log_line)
+ return res.status_code, res_data
+
+ @staticmethod
+ def _extract_pdp_res_data(audit, metrics, log_line, res):
+ """special treatment of pdp response"""
res_data = None
if res.status_code == requests.codes.ok:
res_data = res.json()
@@ -145,7 +158,7 @@ class PolicyRest(object):
rslt = res_data[0]
if rslt and not rslt.get(POLICY_NAME):
res_data = None
- if rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED:
+ if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
error_msg = "unexpected {0}".format(log_line)
@@ -153,30 +166,31 @@ class PolicyRest(object):
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
- return (error_code, None)
+ return error_code, None
+ return None, res_data
- elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR:
+ if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR:
try:
- rslt = res.json()
- if rslt and isinstance(rslt, list) and len(rslt) == 1:
- rslt = rslt[0]
- if rslt and not rslt.get(POLICY_NAME) \
- and rslt.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_NOT_FOUND \
- and rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.PE_DATA_NOT_FOUND:
- status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
- info_msg = "not found {0}".format(log_line)
-
- PolicyRest._logger.info(info_msg)
- metrics.set_http_status_code(status_code)
- metrics.metrics(info_msg)
- return (status_code, None)
+ res_data = res.json()
except ValueError:
- pass
+ return None, None
+
+ if not res_data or not isinstance(res_data, list) or len(res_data) != 1:
+ return None, None
+
+ rslt = res_data[0]
+ if (rslt and not rslt.get(POLICY_NAME)
+ and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
+ and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
+ status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
+ info_msg = "not found {0}".format(log_line)
+
+ PolicyRest._logger.info(info_msg)
+ metrics.set_http_status_code(status_code)
+ metrics.metrics(info_msg)
+ return status_code, None
+ return None, None
- metrics.set_http_status_code(res.status_code)
- metrics.metrics(log_line)
- PolicyRest._logger.info(log_line)
- return res.status_code, res_data
@staticmethod
def _validate_policy(policy):
@@ -188,76 +202,20 @@ class PolicyRest(object):
return bool(
policy_body
- and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED
+ and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED
and policy_body.get(POLICY_CONFIG)
)
@staticmethod
def get_latest_policy(aud_policy_id):
- """Get the latest policy for the policy_id from the policy-engine"""
+ """safely try retrieving the latest policy for the policy_id from the policy-engine"""
audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format(
policy_id, min_version_expected, json.dumps(ignore_policy_names))
try:
- PolicyRest._lazy_init()
- status_code = 0
- retry_get_config = audit.kwargs.get("retry_get_config")
- policy_configs = None
- latest_policy = None
- expect_policy_removed = (ignore_policy_names and not min_version_expected)
-
- for retry in range(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug(str_metrics)
-
- status_code, policy_configs = PolicyRest._pdp_get_config(
- audit, {POLICY_NAME:policy_id}
- )
-
- PolicyRest._logger.debug("%s %s policy_configs: %s",
- status_code, policy_id, json.dumps(policy_configs or []))
-
- latest_policy = PolicyUtils.select_latest_policy(
- policy_configs, min_version_expected, ignore_policy_names
- )
-
- if not latest_policy and not expect_policy_removed:
- audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
- .format(policy_id, json.dumps(policy_configs or [])),
- error_code=AuditResponseCode.DATA_ERROR)
-
- if (latest_policy
- or not retry_get_config
- or (expect_policy_removed and not policy_configs)
- or not PolicyRest._policy_retry_sleep
- or audit.is_serious_error(status_code)):
- break
-
- if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
- .format(PolicyRest._url_get_config, retry, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
- break
-
- audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
- retry, PolicyRest._url_get_config,
- PolicyRest._policy_retry_sleep, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
- time.sleep(PolicyRest._policy_retry_sleep)
-
- if (expect_policy_removed and not latest_policy
- and AuditHttpCode.RESPONSE_ERROR.value == status_code):
- audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
- return None
-
- audit.set_http_status_code(status_code)
- if not PolicyRest._validate_policy(latest_policy):
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
- "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
- error_code=AuditResponseCode.DATA_ERROR)
-
- return latest_policy
+ return PolicyRest._get_latest_policy(
+ audit, policy_id, min_version_expected, ignore_policy_names, str_metrics)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
@@ -271,100 +229,92 @@ class PolicyRest(object):
@staticmethod
- def get_latest_updated_policies(aud_policy_updates):
- """Get the latest policies of the list of policy_names from the policy-engine"""
- audit, policies_updated, policies_removed = aud_policy_updates
- if not policies_updated and not policies_removed:
- return None, None
+ def _get_latest_policy(audit, policy_id,
+ min_version_expected, ignore_policy_names, str_metrics):
+ """retry several times getting the latest policy for the policy_id from the policy-engine"""
+ PolicyRest._lazy_init()
+ latest_policy = None
+ status_code = 0
+ retry_get_config = audit.kwargs.get("retry_get_config")
+ expect_policy_removed = (ignore_policy_names and not min_version_expected)
+
+ for retry in range(1, PolicyRest._policy_retry_count + 1):
+ PolicyRest._logger.debug(str_metrics)
- str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
- len(policies_updated), json.dumps(policies_updated),
- len(policies_removed), json.dumps(policies_removed))
+ done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
+ audit, policy_id, min_version_expected, ignore_policy_names,
+ expect_policy_removed)
- target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity)
- try:
- PolicyRest._lazy_init()
- metrics = Metrics(aud_parent=audit,
- targetEntity=target_entity,
- targetServiceName=PolicyRest._url_get_config)
+ if done or not retry_get_config or not PolicyRest._policy_retry_sleep:
+ break
- metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
- PolicyRest._logger.debug(str_metrics)
+ if retry == PolicyRest._policy_retry_count:
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ .format(PolicyRest._url_get_config, retry, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
+ break
- policies_to_find = {}
- for (policy_name, policy_version) in policies_updated:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id or not policy_version.isdigit():
- continue
- policy = policies_to_find.get(policy_id)
- if not policy:
- policies_to_find[policy_id] = {
- POLICY_ID: policy_id,
- PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
- PolicyRest.IGNORE_POLICY_NAMES: {}
- }
- continue
- if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
- policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
-
- for (policy_name, _) in policies_removed:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id:
- continue
- policy = policies_to_find.get(policy_id)
- if not policy:
- policies_to_find[policy_id] = {
- POLICY_ID: policy_id,
- PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
- }
- continue
- policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
-
- apns = [(audit, policy_id,
- policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
- policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
- for (policy_id, policy_to_find) in policies_to_find.items()]
-
- policies = None
- apns_length = len(apns)
- if apns_length == 1:
- policies = [PolicyRest.get_latest_policy(apns[0])]
- else:
- pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
- policies = pool.map(PolicyRest.get_latest_policy, apns)
- pool.close()
- pool.join()
+ audit.warn(
+ "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ retry, PolicyRest._url_get_config,
+ PolicyRest._policy_retry_sleep, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
+ time.sleep(PolicyRest._policy_retry_sleep)
+
+ if (expect_policy_removed and not latest_policy
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code):
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
+ audit.set_http_status_code(status_code)
+ if not PolicyRest._validate_policy(latest_policy):
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return latest_policy
+
+ @staticmethod
+ def _get_latest_policy_once(audit, policy_id,
+ min_version_expected, ignore_policy_names,
+ expect_policy_removed):
+ """single attempt to get the latest policy for the policy_id from the policy-engine"""
- metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)))
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
- updated_policies = dict((policy[POLICY_ID], policy)
- for policy in policies
- if policy and policy.get(POLICY_ID))
+ PolicyRest._logger.debug("%s %s policy_configs: %s",
+ status_code, policy_id, json.dumps(policy_configs or []))
- removed_policies = dict((policy_id, True)
- for (policy_id, policy_to_find) in policies_to_find.items()
- if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
- and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
- and policy_id not in updated_policies)
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_configs, min_version_expected, ignore_policy_names
+ )
+
+ if not latest_policy and not expect_policy_removed:
+ audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ .format(policy_id, json.dumps(policy_configs or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ done = bool(latest_policy
+ or (expect_policy_removed and not policy_configs)
+ or audit.is_serious_error(status_code))
- errored_policies = dict((policy_id, policy_to_find)
- for (policy_id, policy_to_find) in policies_to_find.items()
- if policy_id not in updated_policies
- and policy_id not in removed_policies)
+ return done, latest_policy, status_code
- PolicyRest._logger.debug(
- "result updated_policies %s, removed_policies %s, errored_policies %s",
- json.dumps(updated_policies), json.dumps(removed_policies),
- json.dumps(errored_policies))
+ @staticmethod
+ def get_latest_updated_policies(aud_policy_updates):
+ """safely try retrieving the latest policies for the list of policy_names"""
+ audit, policies_updated, policies_removed = aud_policy_updates
+ if not policies_updated and not policies_removed:
+ return None, None
- if errored_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
- "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
- error_code=AuditResponseCode.DATA_ERROR)
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
- return updated_policies, removed_policies
+ try:
+ return PolicyRest._get_latest_updated_policies(
+ audit, str_metrics, policies_updated, policies_removed)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
@@ -376,6 +326,93 @@ class PolicyRest(object):
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None, None
+ @staticmethod
+ def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
+ """Get the latest policies of the list of policy_names from the policy-engine"""
+ PolicyRest._lazy_init()
+ metrics = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ PolicyRest._logger.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_name, policy_version) in policies_updated:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+ policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+
+ for (policy_name, _) in policies_removed:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.items()]
+
+ policies = None
+ apns_length = len(apns)
+ if apns_length == 1:
+ policies = [PolicyRest.get_latest_policy(apns[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
+ policies = pool.map(PolicyRest.get_latest_policy, apns)
+ pool.close()
+ pool.join()
+
+ metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)))
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ PolicyRest._logger.debug(
+ "result updated_policies %s, removed_policies %s, errored_policies %s",
+ json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return updated_policies, removed_policies
+
@staticmethod
def _get_latest_policies(aud_policy_filter):