From d444b320dea07248dc69c81d0ce9ea5fc353e701 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Thu, 21 Jun 2018 09:19:07 -0400 Subject: 3.0.1 policy-handler - cleaning sonar smells - no change of functionality or API - removed the unused enum34>=1.1.6 from requirements.txt and setup.py - refactored run_policy.sh to redirect the stdout+stderr only once - refactoring to remove smells+vulnerability reported by sonar -- renamed Config.config to Config.settings -- removed the commented out code in customizer.py -- renamed StepTimer.NEXT to StepTimer.STATE_NEXT to avoid the naming confusion with the method StepTimer.next. Also renamed the related StepTimer.STATE_* constants -- refactored several functions by extracting methods to eliminate 4 out of 5 "brain-overload" smells reported by sonar -- moved the literal string for the socket_host "0.0.0.0" to a constant on the web-server to avoid the reported vulnerability Change-Id: I4c7d47d41c6ecd7cb28f6704f5dad2053c1ca7d6 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-515 --- policyhandler/policy_rest.py | 391 +++++++++++++++++++++++-------------------- 1 file changed, 214 insertions(+), 177 deletions(-) (limited to 'policyhandler/policy_rest.py') diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 16c38bb..c8018f6 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -36,17 +36,17 @@ from .policy_utils import PolicyUtils class PolicyRest(object): - """ policy-engine """ + """using the http API to policy-engine""" _logger = logging.getLogger("policy_handler.policy_rest") _lazy_inited = False POLICY_GET_CONFIG = 'getConfig' - POLICY_CONFIG_STATUS = "policyConfigStatus" - CONFIG_RETRIEVED = "CONFIG_RETRIEVED" - CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" - POLICY_CONFIG_MESSAGE = "policyConfigMessage" - NO_RESPONSE_RECEIVED = "No Response Received" - POLICY_ENGINE_STATUS_CODE_ERROR = 400 - PE_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." + PDP_CONFIG_STATUS = "policyConfigStatus" + PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED" + PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" + PDP_CONFIG_MESSAGE = "policyConfigMessage" + PDP_NO_RESPONSE_RECEIVED = "No Response Received" + PDP_STATUS_CODE_ERROR = 400 + PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." MIN_VERSION_EXPECTED = "min_version_expected" IGNORE_POLICY_NAMES = "ignore_policy_names" @@ -68,7 +68,7 @@ class PolicyRest(object): return PolicyRest._lazy_inited = True - config = Config.config[Config.FIELD_POLICY_ENGINE] + config = Config.settings[Config.FIELD_POLICY_ENGINE] pool_size = config.get("pool_connections", 20) PolicyRest._requests_session = requests.Session() @@ -85,15 +85,15 @@ class PolicyRest(object): + config["path_api"] + PolicyRest.POLICY_GET_CONFIG PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) + PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._scope_prefixes = Config.config["scope_prefixes"] + PolicyRest._scope_prefixes = Config.settings["scope_prefixes"] PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ len(PolicyRest._scope_prefixes)) - PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) + PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 + PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) PolicyRest._logger.info( "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", @@ -137,6 +137,19 @@ class PolicyRest(object): PolicyRest._url_get_config, res.status_code, msg, res.text, Metrics.log_json_dumps(dict(res.request.headers.items()))) + status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) + + if status_code: + return status_code, res_data + + metrics.set_http_status_code(res.status_code) + metrics.metrics(log_line) + PolicyRest._logger.info(log_line) + return res.status_code, res_data + + @staticmethod + def _extract_pdp_res_data(audit, metrics, log_line, res): + """special treatment of pdp response""" res_data = None if res.status_code == requests.codes.ok: res_data = res.json() @@ -145,7 +158,7 @@ class PolicyRest(object): rslt = res_data[0] if rslt and not rslt.get(POLICY_NAME): res_data = None - if rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED: + if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value error_msg = "unexpected {0}".format(log_line) @@ -153,30 +166,31 @@ class PolicyRest(object): metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) - return (error_code, None) + return error_code, None + return None, res_data - elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR: + if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR: try: - rslt = res.json() - if rslt and isinstance(rslt, list) and len(rslt) == 1: - rslt = rslt[0] - if rslt and not rslt.get(POLICY_NAME) \ - and rslt.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_NOT_FOUND \ - and rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.PE_DATA_NOT_FOUND: - status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value - info_msg = "not found {0}".format(log_line) - - PolicyRest._logger.info(info_msg) - metrics.set_http_status_code(status_code) - metrics.metrics(info_msg) - return (status_code, None) + res_data = res.json() except ValueError: - pass + return None, None + + if not res_data or not isinstance(res_data, list) or len(res_data) != 1: + return None, None + + rslt = res_data[0] + if (rslt and not rslt.get(POLICY_NAME) + and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND + and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): + status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value + info_msg = "not found {0}".format(log_line) + + PolicyRest._logger.info(info_msg) + metrics.set_http_status_code(status_code) + metrics.metrics(info_msg) + return status_code, None + return None, None - metrics.set_http_status_code(res.status_code) - metrics.metrics(log_line) - PolicyRest._logger.info(log_line) - return res.status_code, res_data @staticmethod def _validate_policy(policy): @@ -188,76 +202,20 @@ class PolicyRest(object): return bool( policy_body - and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED + and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED and policy_body.get(POLICY_CONFIG) ) @staticmethod def get_latest_policy(aud_policy_id): - """Get the latest policy for the policy_id from the policy-engine""" + """safely try retrieving the latest policy for the policy_id from the policy-engine""" audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format( policy_id, min_version_expected, json.dumps(ignore_policy_names)) try: - PolicyRest._lazy_init() - status_code = 0 - retry_get_config = audit.kwargs.get("retry_get_config") - policy_configs = None - latest_policy = None - expect_policy_removed = (ignore_policy_names and not min_version_expected) - - for retry in range(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug(str_metrics) - - status_code, policy_configs = PolicyRest._pdp_get_config( - audit, {POLICY_NAME:policy_id} - ) - - PolicyRest._logger.debug("%s %s policy_configs: %s", - status_code, policy_id, json.dumps(policy_configs or [])) - - latest_policy = PolicyUtils.select_latest_policy( - policy_configs, min_version_expected, ignore_policy_names - ) - - if not latest_policy and not expect_policy_removed: - audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" - .format(policy_id, json.dumps(policy_configs or [])), - error_code=AuditResponseCode.DATA_ERROR) - - if (latest_policy - or not retry_get_config - or (expect_policy_removed and not policy_configs) - or not PolicyRest._policy_retry_sleep - or audit.is_serious_error(status_code)): - break - - if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" - .format(PolicyRest._url_get_config, retry, policy_id), - error_code=AuditResponseCode.DATA_ERROR) - break - - audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( - retry, PolicyRest._url_get_config, - PolicyRest._policy_retry_sleep, policy_id), - error_code=AuditResponseCode.DATA_ERROR) - time.sleep(PolicyRest._policy_retry_sleep) - - if (expect_policy_removed and not latest_policy - and AuditHttpCode.RESPONSE_ERROR.value == status_code): - audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) - return None - - audit.set_http_status_code(status_code) - if not PolicyRest._validate_policy(latest_policy): - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( - "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR) - - return latest_policy + return PolicyRest._get_latest_policy( + audit, policy_id, min_version_expected, ignore_policy_names, str_metrics) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" @@ -271,100 +229,92 @@ class PolicyRest(object): @staticmethod - def get_latest_updated_policies(aud_policy_updates): - """Get the latest policies of the list of policy_names from the policy-engine""" - audit, policies_updated, policies_removed = aud_policy_updates - if not policies_updated and not policies_removed: - return None, None + def _get_latest_policy(audit, policy_id, + min_version_expected, ignore_policy_names, str_metrics): + """retry several times getting the latest policy for the policy_id from the policy-engine""" + PolicyRest._lazy_init() + latest_policy = None + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") + expect_policy_removed = (ignore_policy_names and not min_version_expected) + + for retry in range(1, PolicyRest._policy_retry_count + 1): + PolicyRest._logger.debug(str_metrics) - str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( - len(policies_updated), json.dumps(policies_updated), - len(policies_removed), json.dumps(policies_removed)) + done, latest_policy, status_code = PolicyRest._get_latest_policy_once( + audit, policy_id, min_version_expected, ignore_policy_names, + expect_policy_removed) - target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity) - try: - PolicyRest._lazy_init() - metrics = Metrics(aud_parent=audit, - targetEntity=target_entity, - targetServiceName=PolicyRest._url_get_config) + if done or not retry_get_config or not PolicyRest._policy_retry_sleep: + break - metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) - PolicyRest._logger.debug(str_metrics) + if retry == PolicyRest._policy_retry_count: + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + error_code=AuditResponseCode.DATA_ERROR) + break - policies_to_find = {} - for (policy_name, policy_version) in policies_updated: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id or not policy_version.isdigit(): - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), - PolicyRest.IGNORE_POLICY_NAMES: {} - } - continue - if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): - policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) - - for (policy_name, _) in policies_removed: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id: - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} - } - continue - policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True - - apns = [(audit, policy_id, - policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), - policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) - for (policy_id, policy_to_find) in policies_to_find.items()] - - policies = None - apns_length = len(apns) - if apns_length == 1: - policies = [PolicyRest.get_latest_policy(apns[0])] - else: - pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) - policies = pool.map(PolicyRest.get_latest_policy, apns) - pool.close() - pool.join() + audit.warn( + "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, + PolicyRest._policy_retry_sleep, policy_id), + error_code=AuditResponseCode.DATA_ERROR) + time.sleep(PolicyRest._policy_retry_sleep) + + if (expect_policy_removed and not latest_policy + and AuditHttpCode.RESPONSE_ERROR.value == status_code): + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + + audit.set_http_status_code(status_code) + if not PolicyRest._validate_policy(latest_policy): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR) + + return latest_policy + + @staticmethod + def _get_latest_policy_once(audit, policy_id, + min_version_expected, ignore_policy_names, + expect_policy_removed): + """single attempt to get the latest policy for the policy_id from the policy-engine""" - metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies))) + status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) - updated_policies = dict((policy[POLICY_ID], policy) - for policy in policies - if policy and policy.get(POLICY_ID)) + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) - removed_policies = dict((policy_id, True) - for (policy_id, policy_to_find) in policies_to_find.items() - if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) - and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) - and policy_id not in updated_policies) + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) + + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + + done = bool(latest_policy + or (expect_policy_removed and not policy_configs) + or audit.is_serious_error(status_code)) - errored_policies = dict((policy_id, policy_to_find) - for (policy_id, policy_to_find) in policies_to_find.items() - if policy_id not in updated_policies - and policy_id not in removed_policies) + return done, latest_policy, status_code - PolicyRest._logger.debug( - "result updated_policies %s, removed_policies %s, errored_policies %s", - json.dumps(updated_policies), json.dumps(removed_policies), - json.dumps(errored_policies)) + @staticmethod + def get_latest_updated_policies(aud_policy_updates): + """safely try retrieving the latest policies for the list of policy_names""" + audit, policies_updated, policies_removed = aud_policy_updates + if not policies_updated and not policies_removed: + return None, None - if errored_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( - "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), - error_code=AuditResponseCode.DATA_ERROR) + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) - return updated_policies, removed_policies + try: + return PolicyRest._get_latest_updated_policies( + audit, str_metrics, policies_updated, policies_removed) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" @@ -376,6 +326,93 @@ class PolicyRest(object): audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) return None, None + @staticmethod + def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): + """Get the latest policies of the list of policy_names from the policy-engine""" + PolicyRest._lazy_init() + metrics = Metrics( + aud_parent=audit, + targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.items()] + + policies = None + apns_length = len(apns) + if apns_length == 1: + policies = [PolicyRest.get_latest_policy(apns[0])] + else: + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) + policies = pool.map(PolicyRest.get_latest_policy, apns) + pool.close() + pool.join() + + metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.items() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.items() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + @staticmethod def _get_latest_policies(aud_policy_filter): -- cgit 1.2.3-korg