diff options
author | Alex Shatov <alexs@att.com> | 2018-06-21 09:19:07 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-06-21 09:19:07 -0400 |
commit | d444b320dea07248dc69c81d0ce9ea5fc353e701 (patch) | |
tree | cacd244c5e2424faca5e088dc3ca3baf9afd8511 /policyhandler | |
parent | c9ec231483d905f3a391c3985c2c2762344ed5c1 (diff) |
3.0.1 policy-handler - cleaning sonar smells
- no change of functionality or API
- removed the unused enum34>=1.1.6 from requirements.txt and setup.py
- refactored run_policy.sh to redirect the stdout+stderr only once
- refactoring to remove smells+vulnerability reported by sonar
-- renamed Config.config to Config.settings
-- removed the commented out code in customizer.py
-- renamed StepTimer.NEXT to StepTimer.STATE_NEXT to avoid the
naming confusion with the method StepTimer.next.
Also renamed the related StepTimer.STATE_* constants
-- refactored several functions by extracting methods to eliminate
4 out of 5 "brain-overload" smells reported by sonar
-- moved the literal string for the socket_host "0.0.0.0" to a
constant on the web-server to avoid the reported vulnerability
Change-Id: I4c7d47d41c6ecd7cb28f6704f5dad2053c1ca7d6
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-515
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/__main__.py | 2 | ||||
-rw-r--r-- | policyhandler/config.py | 16 | ||||
-rw-r--r-- | policyhandler/customize/customizer.py | 4 | ||||
-rw-r--r-- | policyhandler/deploy_handler.py | 2 | ||||
-rw-r--r-- | policyhandler/policy_receiver.py | 4 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 391 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 2 | ||||
-rw-r--r-- | policyhandler/step_timer.py | 75 | ||||
-rw-r--r-- | policyhandler/web_server.py | 3 |
9 files changed, 269 insertions, 230 deletions
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py index 1f17d9d..04ca657 100644 --- a/policyhandler/__main__.py +++ b/policyhandler/__main__.py @@ -46,7 +46,7 @@ def run_policy_handler(): Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) logger.info("starting policy_handler with config:") - logger.info(Audit.log_json_dumps(Config.config)) + logger.info(Audit.log_json_dumps(Config.settings)) audit = Audit(req_message="start policy handler") PolicyReceiver.run(audit) diff --git a/policyhandler/config.py b/policyhandler/config.py index 39d528b..8e6edf9 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -42,7 +42,7 @@ class Config(object): FIELD_POLICY_ENGINE = "policy_engine" wservice_port = 25577 _logger = logging.getLogger("policy_handler.config") - config = None + settings = None @staticmethod def merge(new_config): @@ -50,19 +50,19 @@ class Config(object): if not new_config: return - if not Config.config: - Config.config = new_config + if not Config.settings: + Config.settings = new_config return new_config = copy.deepcopy(new_config) - Config.config.update(new_config) + Config.settings.update(new_config) @staticmethod def get_system_name(): """find the name of the policy-handler system to be used as the key in consul-kv for config of policy-handler """ - return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) + return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) @staticmethod def discover(): @@ -76,9 +76,9 @@ class Config(object): Config._logger.debug("loaded config from discovery(%s): %s", \ discovery_key, json.dumps(new_config)) - Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.config)) + Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings)) Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("merged config from discovery: %s", json.dumps(Config.config)) + Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings)) @staticmethod def load_from_file(file_path=None): @@ -102,5 +102,5 @@ class Config(object): Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("config loaded from file: %s", json.dumps(Config.config)) + Config._logger.info("config loaded from file: %s", json.dumps(Config.settings)) return True diff --git a/policyhandler/customize/customizer.py b/policyhandler/customize/customizer.py index 38928e9..9ab5967 100644 --- a/policyhandler/customize/customizer.py +++ b/policyhandler/customize/customizer.py @@ -31,8 +31,4 @@ class Customizer(CustomizerBase): see README.md for the sample of the customizer.py """ - # def __init__(self): - # """class that contains the customization""" - # super().__init__() - pass diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 4ea5ad1..ea703f4 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -66,7 +66,7 @@ class DeployHandler(object): requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) ) - config_dh = Config.config.get("deploy_handler") + config_dh = Config.settings.get("deploy_handler") if config_dh and isinstance(config_dh, dict): # dns based routing to deployment-handler # config for policy-handler >= 2.4.0 diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 280e3c6..e1584a3 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -55,7 +55,7 @@ class _PolicyReceiver(Thread): self._lock = Lock() self._keep_running = True - config = Config.config[Config.FIELD_POLICY_ENGINE] + config = Config.settings[Config.FIELD_POLICY_ENGINE] self.web_socket_url = resturl = config["url"] + config["path_pdp"] if resturl.startswith("https:"): @@ -66,7 +66,7 @@ class _PolicyReceiver(Thread): self._web_socket = None scope_prefixes = [scope_prefix.replace(".", "[.]") - for scope_prefix in Config.config["scope_prefixes"]] + for scope_prefix in Config.settings["scope_prefixes"]] self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern) self._policy_updater = PolicyUpdater() diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 16c38bb..c8018f6 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -36,17 +36,17 @@ from .policy_utils import PolicyUtils class PolicyRest(object): - """ policy-engine """ + """using the http API to policy-engine""" _logger = logging.getLogger("policy_handler.policy_rest") _lazy_inited = False POLICY_GET_CONFIG = 'getConfig' - POLICY_CONFIG_STATUS = "policyConfigStatus" - CONFIG_RETRIEVED = "CONFIG_RETRIEVED" - CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" - POLICY_CONFIG_MESSAGE = "policyConfigMessage" - NO_RESPONSE_RECEIVED = "No Response Received" - POLICY_ENGINE_STATUS_CODE_ERROR = 400 - PE_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." + PDP_CONFIG_STATUS = "policyConfigStatus" + PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED" + PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND" + PDP_CONFIG_MESSAGE = "policyConfigMessage" + PDP_NO_RESPONSE_RECEIVED = "No Response Received" + PDP_STATUS_CODE_ERROR = 400 + PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit." MIN_VERSION_EXPECTED = "min_version_expected" IGNORE_POLICY_NAMES = "ignore_policy_names" @@ -68,7 +68,7 @@ class PolicyRest(object): return PolicyRest._lazy_inited = True - config = Config.config[Config.FIELD_POLICY_ENGINE] + config = Config.settings[Config.FIELD_POLICY_ENGINE] pool_size = config.get("pool_connections", 20) PolicyRest._requests_session = requests.Session() @@ -85,15 +85,15 @@ class PolicyRest(object): + config["path_api"] + PolicyRest.POLICY_GET_CONFIG PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4) + PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._scope_prefixes = Config.config["scope_prefixes"] + PolicyRest._scope_prefixes = Config.settings["scope_prefixes"] PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ len(PolicyRest._scope_prefixes)) - PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) + PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 + PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) PolicyRest._logger.info( "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", @@ -137,6 +137,19 @@ class PolicyRest(object): PolicyRest._url_get_config, res.status_code, msg, res.text, Metrics.log_json_dumps(dict(res.request.headers.items()))) + status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) + + if status_code: + return status_code, res_data + + metrics.set_http_status_code(res.status_code) + metrics.metrics(log_line) + PolicyRest._logger.info(log_line) + return res.status_code, res_data + + @staticmethod + def _extract_pdp_res_data(audit, metrics, log_line, res): + """special treatment of pdp response""" res_data = None if res.status_code == requests.codes.ok: res_data = res.json() @@ -145,7 +158,7 @@ class PolicyRest(object): rslt = res_data[0] if rslt and not rslt.get(POLICY_NAME): res_data = None - if rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED: + if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value error_msg = "unexpected {0}".format(log_line) @@ -153,30 +166,31 @@ class PolicyRest(object): metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) - return (error_code, None) + return error_code, None + return None, res_data - elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR: + if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR: try: - rslt = res.json() - if rslt and isinstance(rslt, list) and len(rslt) == 1: - rslt = rslt[0] - if rslt and not rslt.get(POLICY_NAME) \ - and rslt.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_NOT_FOUND \ - and rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.PE_DATA_NOT_FOUND: - status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value - info_msg = "not found {0}".format(log_line) - - PolicyRest._logger.info(info_msg) - metrics.set_http_status_code(status_code) - metrics.metrics(info_msg) - return (status_code, None) + res_data = res.json() except ValueError: - pass + return None, None + + if not res_data or not isinstance(res_data, list) or len(res_data) != 1: + return None, None + + rslt = res_data[0] + if (rslt and not rslt.get(POLICY_NAME) + and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND + and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): + status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value + info_msg = "not found {0}".format(log_line) + + PolicyRest._logger.info(info_msg) + metrics.set_http_status_code(status_code) + metrics.metrics(info_msg) + return status_code, None + return None, None - metrics.set_http_status_code(res.status_code) - metrics.metrics(log_line) - PolicyRest._logger.info(log_line) - return res.status_code, res_data @staticmethod def _validate_policy(policy): @@ -188,76 +202,20 @@ class PolicyRest(object): return bool( policy_body - and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED + and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED and policy_body.get(POLICY_CONFIG) ) @staticmethod def get_latest_policy(aud_policy_id): - """Get the latest policy for the policy_id from the policy-engine""" + """safely try retrieving the latest policy for the policy_id from the policy-engine""" audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format( policy_id, min_version_expected, json.dumps(ignore_policy_names)) try: - PolicyRest._lazy_init() - status_code = 0 - retry_get_config = audit.kwargs.get("retry_get_config") - policy_configs = None - latest_policy = None - expect_policy_removed = (ignore_policy_names and not min_version_expected) - - for retry in range(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug(str_metrics) - - status_code, policy_configs = PolicyRest._pdp_get_config( - audit, {POLICY_NAME:policy_id} - ) - - PolicyRest._logger.debug("%s %s policy_configs: %s", - status_code, policy_id, json.dumps(policy_configs or [])) - - latest_policy = PolicyUtils.select_latest_policy( - policy_configs, min_version_expected, ignore_policy_names - ) - - if not latest_policy and not expect_policy_removed: - audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" - .format(policy_id, json.dumps(policy_configs or [])), - error_code=AuditResponseCode.DATA_ERROR) - - if (latest_policy - or not retry_get_config - or (expect_policy_removed and not policy_configs) - or not PolicyRest._policy_retry_sleep - or audit.is_serious_error(status_code)): - break - - if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" - .format(PolicyRest._url_get_config, retry, policy_id), - error_code=AuditResponseCode.DATA_ERROR) - break - - audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( - retry, PolicyRest._url_get_config, - PolicyRest._policy_retry_sleep, policy_id), - error_code=AuditResponseCode.DATA_ERROR) - time.sleep(PolicyRest._policy_retry_sleep) - - if (expect_policy_removed and not latest_policy - and AuditHttpCode.RESPONSE_ERROR.value == status_code): - audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) - return None - - audit.set_http_status_code(status_code) - if not PolicyRest._validate_policy(latest_policy): - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( - "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR) - - return latest_policy + return PolicyRest._get_latest_policy( + audit, policy_id, min_version_expected, ignore_policy_names, str_metrics) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" @@ -271,100 +229,92 @@ class PolicyRest(object): @staticmethod - def get_latest_updated_policies(aud_policy_updates): - """Get the latest policies of the list of policy_names from the policy-engine""" - audit, policies_updated, policies_removed = aud_policy_updates - if not policies_updated and not policies_removed: - return None, None + def _get_latest_policy(audit, policy_id, + min_version_expected, ignore_policy_names, str_metrics): + """retry several times getting the latest policy for the policy_id from the policy-engine""" + PolicyRest._lazy_init() + latest_policy = None + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") + expect_policy_removed = (ignore_policy_names and not min_version_expected) + + for retry in range(1, PolicyRest._policy_retry_count + 1): + PolicyRest._logger.debug(str_metrics) - str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( - len(policies_updated), json.dumps(policies_updated), - len(policies_removed), json.dumps(policies_removed)) + done, latest_policy, status_code = PolicyRest._get_latest_policy_once( + audit, policy_id, min_version_expected, ignore_policy_names, + expect_policy_removed) - target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity) - try: - PolicyRest._lazy_init() - metrics = Metrics(aud_parent=audit, - targetEntity=target_entity, - targetServiceName=PolicyRest._url_get_config) + if done or not retry_get_config or not PolicyRest._policy_retry_sleep: + break - metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) - PolicyRest._logger.debug(str_metrics) + if retry == PolicyRest._policy_retry_count: + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + error_code=AuditResponseCode.DATA_ERROR) + break - policies_to_find = {} - for (policy_name, policy_version) in policies_updated: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id or not policy_version.isdigit(): - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), - PolicyRest.IGNORE_POLICY_NAMES: {} - } - continue - if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): - policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) - - for (policy_name, _) in policies_removed: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id: - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} - } - continue - policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True - - apns = [(audit, policy_id, - policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), - policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) - for (policy_id, policy_to_find) in policies_to_find.items()] - - policies = None - apns_length = len(apns) - if apns_length == 1: - policies = [PolicyRest.get_latest_policy(apns[0])] - else: - pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) - policies = pool.map(PolicyRest.get_latest_policy, apns) - pool.close() - pool.join() + audit.warn( + "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, + PolicyRest._policy_retry_sleep, policy_id), + error_code=AuditResponseCode.DATA_ERROR) + time.sleep(PolicyRest._policy_retry_sleep) + + if (expect_policy_removed and not latest_policy + and AuditHttpCode.RESPONSE_ERROR.value == status_code): + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + + audit.set_http_status_code(status_code) + if not PolicyRest._validate_policy(latest_policy): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR) + + return latest_policy + + @staticmethod + def _get_latest_policy_once(audit, policy_id, + min_version_expected, ignore_policy_names, + expect_policy_removed): + """single attempt to get the latest policy for the policy_id from the policy-engine""" - metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies))) + status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) - updated_policies = dict((policy[POLICY_ID], policy) - for policy in policies - if policy and policy.get(POLICY_ID)) + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) - removed_policies = dict((policy_id, True) - for (policy_id, policy_to_find) in policies_to_find.items() - if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) - and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) - and policy_id not in updated_policies) + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) + + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + + done = bool(latest_policy + or (expect_policy_removed and not policy_configs) + or audit.is_serious_error(status_code)) - errored_policies = dict((policy_id, policy_to_find) - for (policy_id, policy_to_find) in policies_to_find.items() - if policy_id not in updated_policies - and policy_id not in removed_policies) + return done, latest_policy, status_code - PolicyRest._logger.debug( - "result updated_policies %s, removed_policies %s, errored_policies %s", - json.dumps(updated_policies), json.dumps(removed_policies), - json.dumps(errored_policies)) + @staticmethod + def get_latest_updated_policies(aud_policy_updates): + """safely try retrieving the latest policies for the list of policy_names""" + audit, policies_updated, policies_removed = aud_policy_updates + if not policies_updated and not policies_removed: + return None, None - if errored_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( - "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), - error_code=AuditResponseCode.DATA_ERROR) + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) - return updated_policies, removed_policies + try: + return PolicyRest._get_latest_updated_policies( + audit, str_metrics, policies_updated, policies_removed) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" @@ -376,6 +326,93 @@ class PolicyRest(object): audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) return None, None + @staticmethod + def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): + """Get the latest policies of the list of policy_names from the policy-engine""" + PolicyRest._lazy_init() + metrics = Metrics( + aud_parent=audit, + targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.items()] + + policies = None + apns_length = len(apns) + if apns_length == 1: + policies = [PolicyRest.get_latest_policy(apns[0])] + else: + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) + policies = pool.map(PolicyRest.get_latest_policy, apns) + pool.close() + pool.join() + + metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.items() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.items() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + @staticmethod def _get_latest_policies(aud_policy_filter): diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index cff7b41..5ba7c29 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -47,7 +47,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = None self._aud_catch_up = None - catch_up_config = Config.config.get(CATCH_UP, {}) + catch_up_config = Config.settings.get(CATCH_UP, {}) self._catch_up_interval = catch_up_config.get("interval") or 15*60 self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 self._catch_up_skips = 0 diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py index 2a13dd5..768b400 100644 --- a/policyhandler/step_timer.py +++ b/policyhandler/step_timer.py @@ -25,12 +25,12 @@ from threading import Event, RLock, Thread class StepTimer(Thread): """call on_time after interval number of seconds, then wait to continue""" - INIT = "init" - NEXT = "next" - STARTED = "started" - PAUSED = "paused" - STOPPING = "stopping" - STOPPED = "stopped" + STATE_INIT = "init" + STATE_NEXT = "next" + STATE_STARTED = "started" + STATE_PAUSED = "paused" + STATE_STOPPING = "stopping" + STATE_STOPPED = "stopped" def __init__(self, name, interval, on_time, logger, *args, **kwargs): """create step timer with controlled start. next step and pause""" @@ -49,7 +49,7 @@ class StepTimer(Thread): self._paused = False self._finished = False - self._request = StepTimer.INIT + self._request = StepTimer.STATE_INIT self._req_count = 0 self._req_time = 0 self._req_ts = datetime.utcnow() @@ -80,14 +80,14 @@ class StepTimer(Thread): self._timeout.set() else: self._next.set() - self._request_to_timer(StepTimer.NEXT) + self._request_to_timer(StepTimer.STATE_NEXT) def pause(self): """pause the timer""" with self._lock: self._paused = True self._next.clear() - self._request_to_timer(StepTimer.PAUSED) + self._request_to_timer(StepTimer.STATE_PAUSED) def stop(self): """stop the timer if it hasn't finished yet""" @@ -95,12 +95,12 @@ class StepTimer(Thread): self._finished = True self._timeout.set() self._next.set() - self._request_to_timer(StepTimer.STOPPING) + self._request_to_timer(StepTimer.STATE_STOPPING) def _request_to_timer(self, request): """set the request on the timer""" with self._lock: - if request in [StepTimer.NEXT, StepTimer.STARTED]: + if request in [StepTimer.STATE_NEXT, StepTimer.STATE_STARTED]: self._req_count += 1 prev_req = self._request @@ -111,8 +111,8 @@ class StepTimer(Thread): self._logger.info("{0}[{1}] {2}->{3}".format( self.name, self._req_time, prev_req, self.get_timer_status())) - def _timer_substep(self, substep): - """log exe step""" + def _log_substep(self, substep): + """log timer substep""" with self._lock: self._substep = substep utcnow = datetime.utcnow() @@ -120,51 +120,56 @@ class StepTimer(Thread): self._substep_ts = utcnow self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status())) + def _on_time_event(self): + """execute the _on_time event""" + if self._paused: + self._log_substep("paused - skip on_time event") + return + + try: + self._log_substep("on_time event") + self._on_time(*self._args, **self._kwargs) + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})" + .format(self.name, type(ex).__name__, str(ex), "_on_time", + json.dumps(self._args), json.dumps(self._kwargs))) + self._logger.exception(error_msg) + def run(self): """loop one step a time until stopped=finished""" - self._request_to_timer(StepTimer.STARTED) + self._request_to_timer(StepTimer.STATE_STARTED) while True: with self._lock: self._timeout.clear() self._waiting_for_timeout = True - self._timer_substep("waiting for timeout {0}...".format(self._interval)) + self._log_substep("waiting for timeout {0}...".format(self._interval)) interrupted = self._timeout.wait(self._interval) with self._lock: self._waiting_for_timeout = False - self._timer_substep("woke up after {0}timeout" - .format((interrupted and "interrupted ") or "")) + self._log_substep("woke up after {0}timeout" + .format((interrupted and "interrupted ") or "")) if self._finished: - self._timer_substep("finished") + self._log_substep("finished") break if self._next.is_set() and interrupted: self._next.clear() - self._timer_substep("restart timer") + self._log_substep("restart timer") continue - if self._paused: - self._timer_substep("paused - skip on_time event") - else: - try: - self._timer_substep("on_time event") - self._on_time(*self._args, **self._kwargs) - except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})" - .format(self.name, type(ex).__name__, str(ex), "_on_time", - json.dumps(self._args), json.dumps(self._kwargs))) - self._logger.exception(error_msg) - - self._timer_substep("waiting for next...") + self._on_time_event() + + self._log_substep("waiting for next...") self._next.wait() with self._lock: self._next.clear() - self._timer_substep("woke up on next") + self._log_substep("woke up on next") if self._finished: - self._timer_substep("finished") + self._log_substep("finished") break - self._request_to_timer(StepTimer.STOPPED) + self._request_to_timer(StepTimer.STATE_STOPPED) diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 041a442..c49536f 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -30,13 +30,14 @@ from .policy_receiver import PolicyReceiver class PolicyWeb(object): """run REST API of policy-handler""" + SERVER_HOST = "0.0.0.0" logger = logging.getLogger("policy_handler.policy_web") @staticmethod def run_forever(audit): """run the web-server of the policy-handler forever""" PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port) - cherrypy.config.update({"server.socket_host": "0.0.0.0", + cherrypy.config.update({"server.socket_host": PolicyWeb.SERVER_HOST, 'server.socket_port': Config.wservice_port}) cherrypy.tree.mount(_PolicyWeb(), '/') audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port)) |