summaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_rest.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r--policyhandler/policy_rest.py95
1 files changed, 56 insertions, 39 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 0713b38..85dd914 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -22,6 +22,7 @@ import copy
import json
import logging
import time
+import urllib.parse
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
@@ -51,6 +52,7 @@ class PolicyRest(object):
EXPECTED_VERSIONS = "expected_versions"
IGNORE_POLICY_NAMES = "ignore_policy_names"
+ DEFAULT_TIMEOUT_IN_SECS = 60
_lock = Lock()
_settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS,
@@ -65,6 +67,7 @@ class PolicyRest(object):
_thread_pool_size = 4
_policy_retry_count = 1
_policy_retry_sleep = 0
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
@staticmethod
def _init():
@@ -85,8 +88,9 @@ class PolicyRest(object):
'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
pool_maxsize=pool_size))
- PolicyRest._url_get_config = (config.get("url", "") + config.get("path_api", "")
- + PolicyRest.POLICY_GET_CONFIG)
+ get_config_path = urllib.parse.urljoin(
+ config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG)
+ PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path)
PolicyRest._headers = config.get("headers", {})
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
_, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
@@ -101,12 +105,16 @@ class PolicyRest(object):
tls_ca_mode = config.get(Config.TLS_CA_MODE)
PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+ PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
+ if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1:
+ PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS
- PolicyRest._logger.info("PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) custom_kwargs(%s): %s",
- PolicyRest._target_entity, PolicyRest._url_get_config,
- Metrics.json_dumps(PolicyRest._headers),
- tls_ca_mode, json.dumps(PolicyRest._custom_kwargs),
- PolicyRest._settings)
+ PolicyRest._logger.info(
+ "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s",
+ PolicyRest._target_entity, PolicyRest._url_get_config,
+ Metrics.json_dumps(PolicyRest._headers), tls_ca_mode,
+ PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs),
+ PolicyRest._settings)
PolicyRest._settings.commit_change()
PolicyRest._lazy_inited = True
@@ -144,6 +152,7 @@ class PolicyRest(object):
session = PolicyRest._requests_session
target_entity = PolicyRest._target_entity
url = PolicyRest._url_get_config
+ timeout_in_secs = PolicyRest._timeout_in_secs
headers = copy.deepcopy(PolicyRest._headers)
custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs)
@@ -152,15 +161,17 @@ class PolicyRest(object):
headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
log_action = "post to {} at {}".format(target_entity, url)
- log_data = "msg={} headers={}, custom_kwargs({})".format(
- json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs))
+ log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format(
+ json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs),
+ timeout_in_secs)
log_line = log_action + " " + log_data
PolicyRest._logger.info(metrics.metrics_start(log_line))
res = None
try:
- res = session.post(url, json=json_body, headers=headers, **custom_kwargs)
+ res = session.post(url, json=json_body, headers=headers, timeout=timeout_in_secs,
+ **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -195,12 +206,12 @@ class PolicyRest(object):
res_data = res.json()
if res_data and isinstance(res_data, list) and len(res_data) == 1:
- rslt = res_data[0]
- if rslt and not rslt.get(POLICY_NAME):
+ rslt = res_data[0] or {}
+ if not rslt.get(POLICY_NAME):
res_data = None
if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
- error_msg = "unexpected {0}".format(log_line)
+ error_msg = "{} unexpected {}".format(error_code, log_line)
PolicyRest._logger.error(error_msg)
metrics.set_http_status_code(error_code)
@@ -222,8 +233,8 @@ class PolicyRest(object):
if (rslt and not rslt.get(POLICY_NAME)
and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
- status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
- info_msg = "not found {0}".format(log_line)
+ status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value
+ info_msg = "{} not found {}".format(status_code, log_line)
PolicyRest._logger.info(info_msg)
metrics.set_http_status_code(status_code)
@@ -279,7 +290,8 @@ class PolicyRest(object):
expect_policy_removed = (ignore_policy_names and not expected_versions)
for retry in range(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug(str_metrics)
+ PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s",
+ retry, retry_get_config, str_metrics)
done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
audit, policy_id, expected_versions, ignore_policy_names,
@@ -289,16 +301,16 @@ class PolicyRest(object):
break
if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {} from PDP after #{} for policy_id={}"
- .format(PolicyRest._url_get_config, retry, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
+ PolicyRest._logger.error(
+ audit.error("gave up retrying after #{} for policy_id({}) from PDP {}"
+ .format(retry, policy_id, PolicyRest._url_get_config),
+ error_code=AuditResponseCode.DATA_ERROR))
break
- audit.warn(
- "retry #{} {} from PDP in {} secs for policy_id={}".format(
- retry, PolicyRest._url_get_config,
- PolicyRest._policy_retry_sleep, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
+ PolicyRest._logger.warning(audit.warn(
+ "will retry({}) for policy_id({}) in {} secs from PDP {}".format(
+ retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config),
+ error_code=AuditResponseCode.DATA_ERROR))
time.sleep(PolicyRest._policy_retry_sleep)
if (expect_policy_removed and not latest_policy
@@ -308,10 +320,10 @@ class PolicyRest(object):
audit.set_http_status_code(status_code)
if not PolicyRest._validate_policy(latest_policy):
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ PolicyRest._logger.error(audit.error(
"received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
- error_code=AuditResponseCode.DATA_ERROR)
+ error_code=AuditResponseCode.DATA_ERROR))
return latest_policy
@@ -331,9 +343,10 @@ class PolicyRest(object):
)
if not latest_policy and not expect_policy_removed:
- audit.error("received unexpected policy data from PDP for policy_id={}: {}"
- .format(policy_id, json.dumps(policy_bodies or [])),
- error_code=AuditResponseCode.DATA_ERROR)
+ PolicyRest._logger.error(
+ audit.error("received unexpected policy data from PDP for policy_id={}: {}"
+ .format(policy_id, json.dumps(policy_bodies or [])),
+ error_code=AuditResponseCode.DATA_ERROR))
done = bool(latest_policy
or (expect_policy_removed and not policy_bodies)
@@ -411,6 +424,9 @@ class PolicyRest(object):
policies = None
apns_length = len(apns)
+ PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length,
+ json.dumps(policies_to_find))
+
if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
else:
@@ -419,8 +435,9 @@ class PolicyRest(object):
pool.close()
pool.join()
- metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)))
+ metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}"
+ .format(apns_length, str_metrics,
+ len(policies), json.dumps(policies)))
updated_policies = dict((policy[POLICY_ID], policy)
for policy in policies
@@ -438,12 +455,12 @@ class PolicyRest(object):
and policy_id not in removed_policies)
PolicyRest._logger.debug(
- "result updated_policies %s, removed_policies %s, errored_policies %s",
- json.dumps(updated_policies), json.dumps(removed_policies),
+ "result(%s) updated_policies %s, removed_policies %s, errored_policies %s",
+ apns_length, json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(errored_policies))
if errored_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
audit.error(
"errored_policies in PDP: {}".format(json.dumps(errored_policies)),
error_code=AuditResponseCode.DATA_ERROR)
@@ -460,6 +477,7 @@ class PolicyRest(object):
PolicyRest._logger.debug("%s", str_policy_filter)
status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
+ audit.set_http_status_code(status_code)
PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code,
str_policy_filter, json.dumps(policy_bodies or []))
@@ -467,14 +485,13 @@ class PolicyRest(object):
latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
if not latest_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.warn(
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ PolicyRest._logger.warning(audit.warn(
"received no policies from PDP for policy_filter {}: {}"
.format(str_policy_filter, json.dumps(policy_bodies or [])),
- error_code=AuditResponseCode.DATA_ERROR)
+ error_code=AuditResponseCode.DATA_ERROR))
return None, latest_policies
- audit.set_http_status_code(status_code)
valid_policies = {}
errored_policies = {}
for (policy_id, policy) in latest_policies.items():