diff options
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r-- | policyhandler/policy_rest.py | 95 |
1 files changed, 56 insertions, 39 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 0713b38..85dd914 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -22,6 +22,7 @@ import copy import json import logging import time +import urllib.parse from multiprocessing.dummy import Pool as ThreadPool from threading import Lock @@ -51,6 +52,7 @@ class PolicyRest(object): EXPECTED_VERSIONS = "expected_versions" IGNORE_POLICY_NAMES = "ignore_policy_names" + DEFAULT_TIMEOUT_IN_SECS = 60 _lock = Lock() _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, @@ -65,6 +67,7 @@ class PolicyRest(object): _thread_pool_size = 4 _policy_retry_count = 1 _policy_retry_sleep = 0 + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS @staticmethod def _init(): @@ -85,8 +88,9 @@ class PolicyRest(object): 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)) - PolicyRest._url_get_config = (config.get("url", "") + config.get("path_api", "") - + PolicyRest.POLICY_GET_CONFIG) + get_config_path = urllib.parse.urljoin( + config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG) + PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path) PolicyRest._headers = config.get("headers", {}) PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( @@ -101,12 +105,16 @@ class PolicyRest(object): tls_ca_mode = config.get(Config.TLS_CA_MODE) PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) + if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1: + PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS - PolicyRest._logger.info("PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) custom_kwargs(%s): %s", - PolicyRest._target_entity, PolicyRest._url_get_config, - Metrics.json_dumps(PolicyRest._headers), - tls_ca_mode, json.dumps(PolicyRest._custom_kwargs), - PolicyRest._settings) + PolicyRest._logger.info( + "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s", + PolicyRest._target_entity, PolicyRest._url_get_config, + Metrics.json_dumps(PolicyRest._headers), tls_ca_mode, + PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs), + PolicyRest._settings) PolicyRest._settings.commit_change() PolicyRest._lazy_inited = True @@ -144,6 +152,7 @@ class PolicyRest(object): session = PolicyRest._requests_session target_entity = PolicyRest._target_entity url = PolicyRest._url_get_config + timeout_in_secs = PolicyRest._timeout_in_secs headers = copy.deepcopy(PolicyRest._headers) custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) @@ -152,15 +161,17 @@ class PolicyRest(object): headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id log_action = "post to {} at {}".format(target_entity, url) - log_data = "msg={} headers={}, custom_kwargs({})".format( - json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs)) + log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( + json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs), + timeout_in_secs) log_line = log_action + " " + log_data PolicyRest._logger.info(metrics.metrics_start(log_line)) res = None try: - res = session.post(url, json=json_body, headers=headers, **custom_kwargs) + res = session.post(url, json=json_body, headers=headers, timeout=timeout_in_secs, + **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -195,12 +206,12 @@ class PolicyRest(object): res_data = res.json() if res_data and isinstance(res_data, list) and len(res_data) == 1: - rslt = res_data[0] - if rslt and not rslt.get(POLICY_NAME): + rslt = res_data[0] or {} + if not rslt.get(POLICY_NAME): res_data = None if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - error_msg = "unexpected {0}".format(log_line) + error_msg = "{} unexpected {}".format(error_code, log_line) PolicyRest._logger.error(error_msg) metrics.set_http_status_code(error_code) @@ -222,8 +233,8 @@ class PolicyRest(object): if (rslt and not rslt.get(POLICY_NAME) and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): - status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value - info_msg = "not found {0}".format(log_line) + status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value + info_msg = "{} not found {}".format(status_code, log_line) PolicyRest._logger.info(info_msg) metrics.set_http_status_code(status_code) @@ -279,7 +290,8 @@ class PolicyRest(object): expect_policy_removed = (ignore_policy_names and not expected_versions) for retry in range(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug(str_metrics) + PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s", + retry, retry_get_config, str_metrics) done, latest_policy, status_code = PolicyRest._get_latest_policy_once( audit, policy_id, expected_versions, ignore_policy_names, @@ -289,16 +301,16 @@ class PolicyRest(object): break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {} from PDP after #{} for policy_id={}" - .format(PolicyRest._url_get_config, retry, policy_id), - error_code=AuditResponseCode.DATA_ERROR) + PolicyRest._logger.error( + audit.error("gave up retrying after #{} for policy_id({}) from PDP {}" + .format(retry, policy_id, PolicyRest._url_get_config), + error_code=AuditResponseCode.DATA_ERROR)) break - audit.warn( - "retry #{} {} from PDP in {} secs for policy_id={}".format( - retry, PolicyRest._url_get_config, - PolicyRest._policy_retry_sleep, policy_id), - error_code=AuditResponseCode.DATA_ERROR) + PolicyRest._logger.warning(audit.warn( + "will retry({}) for policy_id({}) in {} secs from PDP {}".format( + retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config), + error_code=AuditResponseCode.DATA_ERROR)) time.sleep(PolicyRest._policy_retry_sleep) if (expect_policy_removed and not latest_policy @@ -308,10 +320,10 @@ class PolicyRest(object): audit.set_http_status_code(status_code) if not PolicyRest._validate_policy(latest_policy): - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + PolicyRest._logger.error(audit.error( "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR) + error_code=AuditResponseCode.DATA_ERROR)) return latest_policy @@ -331,9 +343,10 @@ class PolicyRest(object): ) if not latest_policy and not expect_policy_removed: - audit.error("received unexpected policy data from PDP for policy_id={}: {}" - .format(policy_id, json.dumps(policy_bodies or [])), - error_code=AuditResponseCode.DATA_ERROR) + PolicyRest._logger.error( + audit.error("received unexpected policy data from PDP for policy_id={}: {}" + .format(policy_id, json.dumps(policy_bodies or [])), + error_code=AuditResponseCode.DATA_ERROR)) done = bool(latest_policy or (expect_policy_removed and not policy_bodies) @@ -411,6 +424,9 @@ class PolicyRest(object): policies = None apns_length = len(apns) + PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length, + json.dumps(policies_to_find)) + if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] else: @@ -419,8 +435,9 @@ class PolicyRest(object): pool.close() pool.join() - metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies))) + metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}" + .format(apns_length, str_metrics, + len(policies), json.dumps(policies))) updated_policies = dict((policy[POLICY_ID], policy) for policy in policies @@ -438,12 +455,12 @@ class PolicyRest(object): and policy_id not in removed_policies) PolicyRest._logger.debug( - "result updated_policies %s, removed_policies %s, errored_policies %s", - json.dumps(updated_policies), json.dumps(removed_policies), + "result(%s) updated_policies %s, removed_policies %s, errored_policies %s", + apns_length, json.dumps(updated_policies), json.dumps(removed_policies), json.dumps(errored_policies)) if errored_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) audit.error( "errored_policies in PDP: {}".format(json.dumps(errored_policies)), error_code=AuditResponseCode.DATA_ERROR) @@ -460,6 +477,7 @@ class PolicyRest(object): PolicyRest._logger.debug("%s", str_policy_filter) status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter) + audit.set_http_status_code(status_code) PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code, str_policy_filter, json.dumps(policy_bodies or [])) @@ -467,14 +485,13 @@ class PolicyRest(object): latest_policies = PolicyUtils.select_latest_policies(policy_bodies) if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.warn( + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + PolicyRest._logger.warning(audit.warn( "received no policies from PDP for policy_filter {}: {}" .format(str_policy_filter, json.dumps(policy_bodies or [])), - error_code=AuditResponseCode.DATA_ERROR) + error_code=AuditResponseCode.DATA_ERROR)) return None, latest_policies - audit.set_http_status_code(status_code) valid_policies = {} errored_policies = {} for (policy_id, policy) in latest_policies.items(): |