diff options
author | Alex Shatov <alexs@att.com> | 2018-08-07 12:11:35 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-08-07 12:11:35 -0400 |
commit | d7f34d4b71ec4d86547628cda351d20bff4d017f (patch) | |
tree | 101c7669fb5508a103894e262964da0d0c8319bc /policyhandler/policy_rest.py | |
parent | a29f70823b18f492417629f56c86f61f94b96af8 (diff) |
4.0.0 new dataflow on policy-update and catchup
- changed API and functionality - new dataflow
- new dataflow between policy-handler and deployment-handler
on policy-update and catchup
= GETting policy_ids+versions and policy-filters from
deployment-handler
= PUTting policy-update and catchup in the new message format
= data segmenting the policy-update/catchup messages to
deployment-handler to avoid 413 on deployment-handler side
= matching policies from policy-engine to policies
and policy-filters from deployment-handler
= coarsening the policyName filter received from deployment-handler
to reduce the number messages passed to policy-engine on catchup
= consolidating sequential policy-updates into a single request
when the policy-update is busy
- removed policy scope-prefixes from config and logic -
it is not needed anymore because
= the policy matching happens directly to policies and
policy-filters received from deployment-handler
= on catchup - the policy scope-prefix equivalents are calculated
based on the data received from deployment-handler
- API - GET /policies_latest now returns the info on deployed
policy_ids+versions and policy-filters, rather than policies
of the scope-prefixes previously found in config (obsolete)
- not sending an empty catch_up message to deployment-handler
when nothing changed
- send policy-removed to deployment-handler when getting
404-not found from PDP on removal of policy
- config change: removed catch_up.max_skips - obsolete
- brought the latest CommonLogger.py
- minor refactoring - improved naming of variables
Change-Id: I36b3412eefd439088cb693703a6e5f18f4238b00
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-492
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r-- | policyhandler/policy_rest.py | 146 |
1 files changed, 65 insertions, 81 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index c8018f6..39c26a5 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -29,9 +29,9 @@ import requests from .config import Config from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) -from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES, - POLICY_BODY, POLICY_CONFIG, POLICY_FILTER, - POLICY_ID, POLICY_NAME, SCOPE_PREFIXES) +from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS, + POLICY_ID, POLICY_NAME) from .policy_utils import PolicyUtils @@ -56,8 +56,6 @@ class PolicyRest(object): _headers = None _target_entity = None _thread_pool_size = 4 - _scope_prefixes = None - _scope_thread_pool_size = 4 _policy_retry_count = 1 _policy_retry_sleep = 0 @@ -70,7 +68,7 @@ class PolicyRest(object): config = Config.settings[Config.FIELD_POLICY_ENGINE] - pool_size = config.get("pool_connections", 20) + pool_size = Config.settings.get("pool_connections", 20) PolicyRest._requests_session = requests.Session() PolicyRest._requests_session.mount( 'https://', @@ -81,24 +79,20 @@ class PolicyRest(object): requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) - PolicyRest._url_get_config = config["url"] \ - + config["path_api"] + PolicyRest.POLICY_GET_CONFIG + PolicyRest._url_get_config = (config["url"] + config["path_api"] + + PolicyRest.POLICY_GET_CONFIG) PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._scope_prefixes = Config.settings["scope_prefixes"] - PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ - len(PolicyRest._scope_prefixes)) PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) PolicyRest._logger.info( - "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", - PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers), - json.dumps(PolicyRest._scope_prefixes)) + "PolicyClient url(%s) headers(%s)", + PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers)) @staticmethod def _pdp_get_config(audit, json_body): @@ -180,8 +174,8 @@ class PolicyRest(object): rslt = res_data[0] if (rslt and not rslt.get(POLICY_NAME) - and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND - and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): + and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND + and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value info_msg = "not found {0}".format(log_line) @@ -281,22 +275,22 @@ class PolicyRest(object): expect_policy_removed): """single attempt to get the latest policy for the policy_id from the policy-engine""" - status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) + status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) - PolicyRest._logger.debug("%s %s policy_configs: %s", - status_code, policy_id, json.dumps(policy_configs or [])) + PolicyRest._logger.debug("%s %s policy_bodies: %s", + status_code, policy_id, json.dumps(policy_bodies or [])) latest_policy = PolicyUtils.select_latest_policy( - policy_configs, min_version_expected, ignore_policy_names + policy_bodies, min_version_expected, ignore_policy_names ) if not latest_policy and not expect_policy_removed: audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" - .format(policy_id, json.dumps(policy_configs or [])), + .format(policy_id, json.dumps(policy_bodies or [])), error_code=AuditResponseCode.DATA_ERROR) done = bool(latest_policy - or (expect_policy_removed and not policy_configs) + or (expect_policy_removed and not policy_bodies) or audit.is_serious_error(status_code)) return done, latest_policy, status_code @@ -330,18 +324,17 @@ class PolicyRest(object): def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): """Get the latest policies of the list of policy_names from the policy-engine""" PolicyRest._lazy_init() - metrics = Metrics( + metrics_total = Metrics( aud_parent=audit, targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), targetServiceName=PolicyRest._url_get_config) - metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) PolicyRest._logger.debug(str_metrics) policies_to_find = {} - for (policy_name, policy_version) in policies_updated: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id or not policy_version.isdigit(): + for (policy_id, policy_version) in policies_updated: + if not policy_id or not policy_version or not policy_version.isdigit(): continue policy = policies_to_find.get(policy_id) if not policy: @@ -354,18 +347,17 @@ class PolicyRest(object): if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) - for (policy_name, _) in policies_removed: - policy_id = PolicyUtils.extract_policy_id(policy_name) + for (policy_id, policy_names) in policies_removed: if not policy_id: continue policy = policies_to_find.get(policy_id) if not policy: policies_to_find[policy_id] = { POLICY_ID: policy_id, - PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + PolicyRest.IGNORE_POLICY_NAMES: policy_names } continue - policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names) apns = [(audit, policy_id, policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), @@ -382,8 +374,8 @@ class PolicyRest(object): pool.close() pool.join() - metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies))) + metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) updated_policies = dict((policy[POLICY_ID], policy) for policy in policies @@ -416,36 +408,26 @@ class PolicyRest(object): @staticmethod def _get_latest_policies(aud_policy_filter): - """ - get the latest policies by policy_filter - or all the latest policies of the same scope from the policy-engine - """ - audit, policy_filter, scope_prefix = aud_policy_filter + """get the latest policies by policy_filter from the policy-engine""" + audit, policy_filter = aud_policy_filter try: str_policy_filter = json.dumps(policy_filter) PolicyRest._logger.debug("%s", str_policy_filter) - status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter) - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, - str_policy_filter, json.dumps(policy_configs or [])) + PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code, + str_policy_filter, json.dumps(policy_bodies or [])) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) - - if (scope_prefix and not policy_configs - and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value): - audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), - error_code=AuditResponseCode.DATA_ERROR) - return None, latest_policies, scope_prefix + latest_policies = PolicyUtils.select_latest_policies(policy_bodies) if not latest_policies: - if not scope_prefix: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.warn( - "received no policies from PDP for policy_filter {0}: {1}" - .format(str_policy_filter, json.dumps(policy_configs or [])), - error_code=AuditResponseCode.DATA_ERROR) - return None, latest_policies, None + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_bodies or [])), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies audit.set_http_status_code(status_code) valid_policies = {} @@ -455,22 +437,22 @@ class PolicyRest(object): valid_policies[policy_id] = policy else: errored_policies[policy_id] = policy - return valid_policies, errored_policies, None + return valid_policies, errored_policies except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})" + error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})" .format(audit.request_id, type(ex).__name__, str(ex), - "_get_latest_policies", json.dumps(policy_filter), scope_prefix)) + "_get_latest_policies", json.dumps(policy_filter))) PolicyRest._logger.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None, None, scope_prefix + return None, None @staticmethod - def get_latest_policies(audit, policy_filter=None): - """Get the latest policies of the same scope from the policy-engine""" + def get_latest_policies(audit, policy_filter=None, policy_filters=None): + """Get the latest policies by policy-filter(s) from the policy-engine""" result = {} aud_policy_filters = None str_policy_filters = None @@ -479,51 +461,53 @@ class PolicyRest(object): try: PolicyRest._lazy_init() - if policy_filter is not None: - aud_policy_filters = [(audit, policy_filter, None)] + if policy_filter: + aud_policy_filters = [(audit, policy_filter)] str_policy_filters = json.dumps(policy_filter) str_metrics = "get_latest_policies for policy_filter {0}".format( str_policy_filters) target_entity = ("{0} total get_latest_policies by policy_filter" .format(PolicyRest._target_entity)) result[POLICY_FILTER] = copy.deepcopy(policy_filter) - else: - aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) - for scope_prefix in PolicyRest._scope_prefixes] - str_policy_filters = json.dumps(PolicyRest._scope_prefixes) - str_metrics = "get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), str_policy_filters) - target_entity = ("{0} total get_latest_policies by scope_prefixes" + elif policy_filters: + aud_policy_filters = [ + (audit, policy_filter) + for policy_filter in policy_filters + ] + str_policy_filters = json.dumps(policy_filters) + str_metrics = "get_latest_policies for policy_filters {0}".format( + str_policy_filters) + target_entity = ("{0} total get_latest_policies by policy_filters" .format(PolicyRest._target_entity)) - result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) + result[POLICY_FILTERS] = copy.deepcopy(policy_filters) + else: + return result PolicyRest._logger.debug("%s", str_policy_filters) - metrics = Metrics(aud_parent=audit, targetEntity=target_entity, - targetServiceName=PolicyRest._url_get_config) + metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity, + targetServiceName=PolicyRest._url_get_config) - metrics.metrics_start(str_metrics) + metrics_total.metrics_start(str_metrics) latest_policies = None apfs_length = len(aud_policy_filters) if apfs_length == 1: latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] else: - pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length)) latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) pool.close() pool.join() - metrics.metrics("total result {0}: {1} {2}".format( + metrics_total.metrics("total result {0}: {1} {2}".format( str_metrics, len(latest_policies), json.dumps(latest_policies))) - # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + # latest_policies == [(valid_policies, errored_policies), ...] result[LATEST_POLICIES] = dict( - pair for (vps, _, _) in latest_policies if vps for pair in vps.items()) + pair for (vps, _) in latest_policies if vps for pair in vps.items()) result[ERRORED_POLICIES] = dict( - pair for (_, eps, _) in latest_policies if eps for pair in eps.items()) - - result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) + pair for (_, eps) in latest_policies if eps for pair in eps.items()) PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", str_policy_filters, json.dumps(result)) |