aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_rest.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-08-07 12:11:35 -0400
committerAlex Shatov <alexs@att.com>2018-08-07 12:11:35 -0400
commitd7f34d4b71ec4d86547628cda351d20bff4d017f (patch)
tree101c7669fb5508a103894e262964da0d0c8319bc /policyhandler/policy_rest.py
parenta29f70823b18f492417629f56c86f61f94b96af8 (diff)
4.0.0 new dataflow on policy-update and catchup
- changed API and functionality - new dataflow - new dataflow between policy-handler and deployment-handler on policy-update and catchup = GETting policy_ids+versions and policy-filters from deployment-handler = PUTting policy-update and catchup in the new message format = data segmenting the policy-update/catchup messages to deployment-handler to avoid 413 on deployment-handler side = matching policies from policy-engine to policies and policy-filters from deployment-handler = coarsening the policyName filter received from deployment-handler to reduce the number messages passed to policy-engine on catchup = consolidating sequential policy-updates into a single request when the policy-update is busy - removed policy scope-prefixes from config and logic - it is not needed anymore because = the policy matching happens directly to policies and policy-filters received from deployment-handler = on catchup - the policy scope-prefix equivalents are calculated based on the data received from deployment-handler - API - GET /policies_latest now returns the info on deployed policy_ids+versions and policy-filters, rather than policies of the scope-prefixes previously found in config (obsolete) - not sending an empty catch_up message to deployment-handler when nothing changed - send policy-removed to deployment-handler when getting 404-not found from PDP on removal of policy - config change: removed catch_up.max_skips - obsolete - brought the latest CommonLogger.py - minor refactoring - improved naming of variables Change-Id: I36b3412eefd439088cb693703a6e5f18f4238b00 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-492
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r--policyhandler/policy_rest.py146
1 files changed, 65 insertions, 81 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index c8018f6..39c26a5 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -29,9 +29,9 @@ import requests
from .config import Config
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
-from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES,
- POLICY_BODY, POLICY_CONFIG, POLICY_FILTER,
- POLICY_ID, POLICY_NAME, SCOPE_PREFIXES)
+from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS,
+ POLICY_ID, POLICY_NAME)
from .policy_utils import PolicyUtils
@@ -56,8 +56,6 @@ class PolicyRest(object):
_headers = None
_target_entity = None
_thread_pool_size = 4
- _scope_prefixes = None
- _scope_thread_pool_size = 4
_policy_retry_count = 1
_policy_retry_sleep = 0
@@ -70,7 +68,7 @@ class PolicyRest(object):
config = Config.settings[Config.FIELD_POLICY_ENGINE]
- pool_size = config.get("pool_connections", 20)
+ pool_size = Config.settings.get("pool_connections", 20)
PolicyRest._requests_session = requests.Session()
PolicyRest._requests_session.mount(
'https://',
@@ -81,24 +79,20 @@ class PolicyRest(object):
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
- PolicyRest._url_get_config = config["url"] \
- + config["path_api"] + PolicyRest.POLICY_GET_CONFIG
+ PolicyRest._url_get_config = (config["url"] + config["path_api"]
+ + PolicyRest.POLICY_GET_CONFIG)
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._scope_prefixes = Config.settings["scope_prefixes"]
- PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \
- len(PolicyRest._scope_prefixes))
PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
PolicyRest._logger.info(
- "PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
- PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers),
- json.dumps(PolicyRest._scope_prefixes))
+ "PolicyClient url(%s) headers(%s)",
+ PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers))
@staticmethod
def _pdp_get_config(audit, json_body):
@@ -180,8 +174,8 @@ class PolicyRest(object):
rslt = res_data[0]
if (rslt and not rslt.get(POLICY_NAME)
- and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
- and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
+ and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
+ and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
info_msg = "not found {0}".format(log_line)
@@ -281,22 +275,22 @@ class PolicyRest(object):
expect_policy_removed):
"""single attempt to get the latest policy for the policy_id from the policy-engine"""
- status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
+ status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
- PolicyRest._logger.debug("%s %s policy_configs: %s",
- status_code, policy_id, json.dumps(policy_configs or []))
+ PolicyRest._logger.debug("%s %s policy_bodies: %s",
+ status_code, policy_id, json.dumps(policy_bodies or []))
latest_policy = PolicyUtils.select_latest_policy(
- policy_configs, min_version_expected, ignore_policy_names
+ policy_bodies, min_version_expected, ignore_policy_names
)
if not latest_policy and not expect_policy_removed:
audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
- .format(policy_id, json.dumps(policy_configs or [])),
+ .format(policy_id, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR)
done = bool(latest_policy
- or (expect_policy_removed and not policy_configs)
+ or (expect_policy_removed and not policy_bodies)
or audit.is_serious_error(status_code))
return done, latest_policy, status_code
@@ -330,18 +324,17 @@ class PolicyRest(object):
def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
"""Get the latest policies of the list of policy_names from the policy-engine"""
PolicyRest._lazy_init()
- metrics = Metrics(
+ metrics_total = Metrics(
aud_parent=audit,
targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
targetServiceName=PolicyRest._url_get_config)
- metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
PolicyRest._logger.debug(str_metrics)
policies_to_find = {}
- for (policy_name, policy_version) in policies_updated:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id or not policy_version.isdigit():
+ for (policy_id, policy_version) in policies_updated:
+ if not policy_id or not policy_version or not policy_version.isdigit():
continue
policy = policies_to_find.get(policy_id)
if not policy:
@@ -354,18 +347,17 @@ class PolicyRest(object):
if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
- for (policy_name, _) in policies_removed:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
+ for (policy_id, policy_names) in policies_removed:
if not policy_id:
continue
policy = policies_to_find.get(policy_id)
if not policy:
policies_to_find[policy_id] = {
POLICY_ID: policy_id,
- PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ PolicyRest.IGNORE_POLICY_NAMES: policy_names
}
continue
- policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+ policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names)
apns = [(audit, policy_id,
policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
@@ -382,8 +374,8 @@ class PolicyRest(object):
pool.close()
pool.join()
- metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)))
+ metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)))
updated_policies = dict((policy[POLICY_ID], policy)
for policy in policies
@@ -416,36 +408,26 @@ class PolicyRest(object):
@staticmethod
def _get_latest_policies(aud_policy_filter):
- """
- get the latest policies by policy_filter
- or all the latest policies of the same scope from the policy-engine
- """
- audit, policy_filter, scope_prefix = aud_policy_filter
+ """get the latest policies by policy_filter from the policy-engine"""
+ audit, policy_filter = aud_policy_filter
try:
str_policy_filter = json.dumps(policy_filter)
PolicyRest._logger.debug("%s", str_policy_filter)
- status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+ status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
- PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
- str_policy_filter, json.dumps(policy_configs or []))
+ PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_bodies or []))
- latest_policies = PolicyUtils.select_latest_policies(policy_configs)
-
- if (scope_prefix and not policy_configs
- and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value):
- audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
- error_code=AuditResponseCode.DATA_ERROR)
- return None, latest_policies, scope_prefix
+ latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
if not latest_policies:
- if not scope_prefix:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.warn(
- "received no policies from PDP for policy_filter {0}: {1}"
- .format(str_policy_filter, json.dumps(policy_configs or [])),
- error_code=AuditResponseCode.DATA_ERROR)
- return None, latest_policies, None
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.warn(
+ "received no policies from PDP for policy_filter {0}: {1}"
+ .format(str_policy_filter, json.dumps(policy_bodies or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+ return None, latest_policies
audit.set_http_status_code(status_code)
valid_policies = {}
@@ -455,22 +437,22 @@ class PolicyRest(object):
valid_policies[policy_id] = policy
else:
errored_policies[policy_id] = policy
- return valid_policies, errored_policies, None
+ return valid_policies, errored_policies
except Exception as ex:
- error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})"
+ error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})"
.format(audit.request_id, type(ex).__name__, str(ex),
- "_get_latest_policies", json.dumps(policy_filter), scope_prefix))
+ "_get_latest_policies", json.dumps(policy_filter)))
PolicyRest._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- return None, None, scope_prefix
+ return None, None
@staticmethod
- def get_latest_policies(audit, policy_filter=None):
- """Get the latest policies of the same scope from the policy-engine"""
+ def get_latest_policies(audit, policy_filter=None, policy_filters=None):
+ """Get the latest policies by policy-filter(s) from the policy-engine"""
result = {}
aud_policy_filters = None
str_policy_filters = None
@@ -479,51 +461,53 @@ class PolicyRest(object):
try:
PolicyRest._lazy_init()
- if policy_filter is not None:
- aud_policy_filters = [(audit, policy_filter, None)]
+ if policy_filter:
+ aud_policy_filters = [(audit, policy_filter)]
str_policy_filters = json.dumps(policy_filter)
str_metrics = "get_latest_policies for policy_filter {0}".format(
str_policy_filters)
target_entity = ("{0} total get_latest_policies by policy_filter"
.format(PolicyRest._target_entity))
result[POLICY_FILTER] = copy.deepcopy(policy_filter)
- else:
- aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
- for scope_prefix in PolicyRest._scope_prefixes]
- str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
- str_metrics = "get_latest_policies for scopes {0} {1}".format( \
- len(PolicyRest._scope_prefixes), str_policy_filters)
- target_entity = ("{0} total get_latest_policies by scope_prefixes"
+ elif policy_filters:
+ aud_policy_filters = [
+ (audit, policy_filter)
+ for policy_filter in policy_filters
+ ]
+ str_policy_filters = json.dumps(policy_filters)
+ str_metrics = "get_latest_policies for policy_filters {0}".format(
+ str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by policy_filters"
.format(PolicyRest._target_entity))
- result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
+ result[POLICY_FILTERS] = copy.deepcopy(policy_filters)
+ else:
+ return result
PolicyRest._logger.debug("%s", str_policy_filters)
- metrics = Metrics(aud_parent=audit, targetEntity=target_entity,
- targetServiceName=PolicyRest._url_get_config)
+ metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity,
+ targetServiceName=PolicyRest._url_get_config)
- metrics.metrics_start(str_metrics)
+ metrics_total.metrics_start(str_metrics)
latest_policies = None
apfs_length = len(aud_policy_filters)
if apfs_length == 1:
latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
else:
- pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length))
latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
pool.close()
pool.join()
- metrics.metrics("total result {0}: {1} {2}".format(
+ metrics_total.metrics("total result {0}: {1} {2}".format(
str_metrics, len(latest_policies), json.dumps(latest_policies)))
- # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+ # latest_policies == [(valid_policies, errored_policies), ...]
result[LATEST_POLICIES] = dict(
- pair for (vps, _, _) in latest_policies if vps for pair in vps.items())
+ pair for (vps, _) in latest_policies if vps for pair in vps.items())
result[ERRORED_POLICIES] = dict(
- pair for (_, eps, _) in latest_policies if eps for pair in eps.items())
-
- result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
+ pair for (_, eps) in latest_policies if eps for pair in eps.items())
PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
str_policy_filters, json.dumps(result))