diff options
author | Alex Shatov <alexs@att.com> | 2020-02-27 12:45:54 -0500 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2020-02-27 12:45:54 -0500 |
commit | 78ff88f9b3a3d32f941b3b9fedc2abfbaba291cb (patch) | |
tree | 5670dddc0e0cd9f793d419420b61ad0559639497 /policyhandler/pdp_api/policy_rest.py | |
parent | 715fc8a36ac1809cd3e36cbb6cfb7107ebb038ea (diff) |
5.1.0 policy-handler - policy-updates from new PDP5.1.0
DCAEGEN2-1851:
- policy-handler now supports the policy-update notification
from the new policy-engine thru DMaaP MR
= no policy-filters - only policy-id values
- see README for discoverable config settings of dmaap_mr client
= DMaaP MR client has the same flexibility as policy_engine
= set the query.timeout to high value like 15000 (default)
- requests to DMaaP MR go through a single blocking connection
- first catch-up only after draining the policy-updates from DMaaP MR
on the first loop
- safe parsing of messages from DMaaP MR
- policy-engine changed the data type for policy-version field
from int to string that is expected to have the semver value
- related change to deployment-handler (DCAEGEN2-2085) has to be
deployed to handle the non-numeric policyVersion
- on new PDP API: http /policy_latest and policy-updates
return the new data from the new PDP API with the following fields
added/renamed by the policy-handler to keep other policy related parts
intact in R4-R6 (see pdp_api/policy_utils.py)
* policyName = policy_id + "." + policyVersion.replace(".","-")
+ ".xml"
* policyVersion = str(metadata["policy-version"])
* "config" - is the renamed "properties" from the new PDP API response
- enabled the /catch_up and the periodic auto-catch-up for the new PDP
API
- enabled GET /policies_latest - returns the latest policies for the
deployed components
- POST /policies_latest - still disabled since no support for the
policy-filters is provided for the new PDP API
- fixed hiding the Authorization value on comparing the configs
- logging of secrets is now sha256 to see whether they changed
- added X-ONAP-RequestID to headers the same way as X-ECOMP-RequestID
- on policy-update process the removal first, then addition
- changed the pool_connections=1 (number of pools) on PDP and DH sides
== only a single destination is expected for each
- log the exception as fatal into error.log
- other minor fixes and refactoring
- unit-test coverage 74%
- integration testing is requested
DCAEGEN2-1976:
- policy-handler is enhanced to get user/password from env vars
for PDP and DMaaP MR clients and overwriting the Authorization field
in https headers received from the discoverable config
= to override the Authorization value on policy_engine,
set the environment vars $PDP_USER and $PDP_PWD in policy-handler
container
= to override the Authorization value on dmaap_mr,
if using https and user-password authentication,
set the environment vars $DMAAP_MR_USER and $DMAAP_MR_PWD in
policy-handler container
Change-Id: Iad8eab9e20e615a0e0d2822f4735dc64c50aa55c
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-1851
Issue-ID: DCAEGEN2-1976
Diffstat (limited to 'policyhandler/pdp_api/policy_rest.py')
-rw-r--r-- | policyhandler/pdp_api/policy_rest.py | 331 |
1 files changed, 297 insertions, 34 deletions
diff --git a/policyhandler/pdp_api/policy_rest.py b/policyhandler/pdp_api/policy_rest.py index 14d9296..0b33ccd 100644 --- a/policyhandler/pdp_api/policy_rest.py +++ b/policyhandler/pdp_api/policy_rest.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,16 +20,19 @@ import copy import json import os +import time import urllib.parse +from multiprocessing.dummy import Pool as ThreadPool from threading import Lock import requests from ..config import Config, Settings -from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, - AuditResponseCode, Metrics) +from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics +from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_ID, POLICY_NAMES) from ..utils import Utils -from .pdp_consts import PDP_POLICIES +from .pdp_consts import PDP_POLICIES, POLICY_NAME, POLICY_VERSION from .policy_utils import PolicyUtils _LOGGER = Utils.get_logger(__file__) @@ -37,11 +40,15 @@ _LOGGER = Utils.get_logger(__file__) class PolicyRest(object): """using the http API to policy-engine""" PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) - _lazy_inited = False + EXPECTED_VERSIONS = "expected_versions" + IGNORE_POLICY_NAMES = "ignore_policy_names" DEFAULT_TIMEOUT_IN_SECS = 60 + _lazy_inited = False _lock = Lock() - _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS) + _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, + Config.THREAD_POOL_SIZE, + Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) _target_entity = None _requests_session = None @@ -49,6 +56,9 @@ class PolicyRest(object): _url_pdp_decision = None _headers = None _custom_kwargs = {} + _thread_pool_size = 4 + _policy_retry_count = 1 + _policy_retry_sleep = 0 _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS @staticmethod @@ -63,10 +73,10 @@ class PolicyRest(object): changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) if changed: PolicyRest._requests_session.mount( - 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) PolicyRest._requests_session.mount( - 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) @@ -77,6 +87,15 @@ class PolicyRest(object): PolicyRest._url, config.get("path_decision", "/decision/v1/")) PolicyRest._headers = config.get("headers", {}) PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) + _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( + Config.THREAD_POOL_SIZE, 4) + if PolicyRest._thread_pool_size < 2: + PolicyRest._thread_pool_size = 2 + + _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_COUNT, 1) + _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_SLEEP, 0) tls_ca_mode = config.get(Config.TLS_CA_MODE) PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) @@ -121,13 +140,13 @@ class PolicyRest(object): PolicyRest._init() @staticmethod - def _pdp_get_decision(audit, pdp_req): - """Communication with the policy-engine""" + def _pdp_get_decision(audit, policy_ids): + """get policies from the policy-engine by policy-ids""" if not PolicyRest._url: _LOGGER.error( audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR)) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None + return None, None with PolicyRest._lock: session = PolicyRest._requests_session @@ -137,9 +156,11 @@ class PolicyRest(object): headers = copy.deepcopy(PolicyRest._headers) custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) + pdp_req = PolicyUtils.gen_req_to_pdp(policy_ids) + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) - headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + headers = metrics.put_request_id_into_headers(headers) log_action = "post to {} at {}".format(target_entity, url) log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( @@ -159,11 +180,11 @@ class PolicyRest(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) - return None + return (error_code, None) log_line = "response {} from {}: text={} headers={}".format( res.status_code, log_line, res.text, @@ -174,40 +195,282 @@ class PolicyRest(object): audit.set_http_status_code(res.status_code) metrics.metrics(log_line) - policy_bodies = None + latest_policies = None if res.status_code == requests.codes.ok: - policy_bodies = res.json().get(PDP_POLICIES) + policy_bodies = res.json().get(PDP_POLICIES, {}) + latest_policies = dict((policy_id, PolicyUtils.convert_to_policy(policy)) + for (policy_id, policy) in policy_bodies.items()) + + return res.status_code, latest_policies - return policy_bodies @staticmethod def get_latest_policy(aud_policy_id): """safely try retrieving the latest policy for the policy_id from the policy-engine""" - audit, policy_id, _, _ = aud_policy_id + audit, policy_id, expected_versions, ignore_policy_names = aud_policy_id + str_metrics = "policy_id({0}), expected_versions({1}) ignore_policy_names({2})".format( + policy_id, json.dumps(expected_versions), json.dumps(ignore_policy_names)) + try: - PolicyRest._lazy_init() + return PolicyRest._get_latest_policy( + audit, policy_id, expected_versions, ignore_policy_names, str_metrics) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policy", str_metrics)) + + _LOGGER.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None - pdp_req = PolicyUtils.gen_req_to_pdp(policy_id) - policy_bodies = PolicyRest._pdp_get_decision(audit, pdp_req) - log_line = "looking for policy_id({}) in policy_bodies: {}".format( - policy_id, json.dumps(policy_bodies)) - _LOGGER.info(log_line) + @staticmethod + def _get_latest_policy(audit, policy_id, + expected_versions, ignore_policy_names, str_metrics): + """retry several times getting the latest policy for the policy_id from the policy-engine""" + PolicyRest._lazy_init() + latest_policy = None + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") - latest_policy = None - if policy_bodies and policy_id in policy_bodies: - latest_policy = PolicyUtils.convert_to_policy(policy_bodies[policy_id]) + for retry in range(1, PolicyRest._policy_retry_count + 1): + _LOGGER.debug("try(%s) retry_get_config(%s): %s", retry, retry_get_config, str_metrics) + + done, removed, latest_policy, status_code = PolicyRest._get_latest_policy_once( + audit, policy_id, expected_versions, ignore_policy_names) - if not PolicyUtils.validate_policy(latest_policy): + if removed: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - _LOGGER.error(audit.error( - "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR)) + return None + + if done or not retry_get_config or not PolicyRest._policy_retry_sleep: + break + + if retry == PolicyRest._policy_retry_count: + _LOGGER.error( + audit.error("gave up retrying after #{} for policy_id({}) from PDP {}" + .format(retry, policy_id, PolicyRest._url_pdp_decision), + error_code=AuditResponseCode.DATA_ERROR)) + break + + _LOGGER.warning(audit.warn( + "will retry({}) for policy_id({}) in {} secs from PDP {}".format( + retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_pdp_decision), + error_code=AuditResponseCode.DATA_ERROR)) + time.sleep(PolicyRest._policy_retry_sleep) + + audit.set_http_status_code(status_code) + if not PolicyUtils.validate_policy(latest_policy): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + _LOGGER.error(audit.error( + "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR)) + + return latest_policy + + @staticmethod + def _get_latest_policy_once(audit, policy_id, expected_versions, ignore_policy_names): + """single attempt to get the latest policy for the policy_id from the policy-engine""" + + status_code, latest_policies = PolicyRest._pdp_get_decision(audit, policy_id) + + if (ignore_policy_names and not expected_versions and not latest_policies + and AuditHttpCode.HTTP_OK.value == status_code): + return True, True, None, status_code + + log_line = "{} looking for policy_id({}) in latest_policies: {}".format( + status_code, policy_id, json.dumps(latest_policies)) + _LOGGER.info(log_line) + + latest_policy = (latest_policies or {}).get(policy_id) + + log_error = "" + if latest_policy: + policy_body = latest_policy.get(POLICY_BODY, {}) + policy_version = policy_body.get(POLICY_VERSION) + policy_name = policy_body.get(POLICY_NAME) + if expected_versions and policy_version not in expected_versions: + log_error = ("received unexpected policy version({}) instead of ({})" + " from PDP for policy_id={}: {}" + .format(policy_version, json.dumps(expected_versions), + policy_id, json.dumps(latest_policy))) + elif ignore_policy_names and policy_name in ignore_policy_names: + log_error = ("unexpectedly received policy version({}) from PDP" + " for policy_id={}: {}. to ignore-policy-names {}" + .format(policy_version, policy_id, + json.dumps(latest_policy), + json.dumps(ignore_policy_names))) + + if not latest_policy or log_error: + _LOGGER.error(audit.error( + log_error or "received unexpected policy data({}) from PDP for policy_id={}: {}" + .format(json.dumps(latest_policy), policy_id, json.dumps(latest_policies)), + error_code=AuditResponseCode.DATA_ERROR)) + latest_policy = None + + done = bool(latest_policy or audit.is_serious_error(status_code)) + return done, False, latest_policy, status_code + + @staticmethod + def get_latest_updated_policies(audit, updated_policies, removed_policies): + """safely try retrieving the latest policies for the list of policy_names""" + if not updated_policies and not removed_policies: + return None, None + + policies_updated = [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) + for policy_id, policy in updated_policies.items()] + policies_removed = [(policy_id, policy.get(POLICY_NAMES, {})) + for policy_id, policy in removed_policies.items()] + + if not policies_updated and not policies_removed: + return None, None + + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) + + try: + return PolicyRest._get_latest_updated_policies( + audit, str_metrics, policies_updated, policies_removed) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_updated_policies", str_metrics)) + + _LOGGER.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None + + @staticmethod + def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): + """Get the latest policies of the list of policy_names from the policy-engine""" + PolicyRest._lazy_init() + metrics_total = Metrics( + aud_parent=audit, + targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), + targetServiceName=PolicyRest._url_pdp_decision) + + metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + _LOGGER.debug(str_metrics) + + policies_to_find = {} + for (policy_id, policy_version) in policies_updated: + if not policy_id or policy_version is None: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.EXPECTED_VERSIONS: {policy_version: True}, + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + policy[PolicyRest.EXPECTED_VERSIONS][policy_version] = True + + for (policy_id, policy_names) in policies_removed: + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: policy_names + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names) + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.EXPECTED_VERSIONS), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.items()] + + policies = None + apns_length = len(apns) + _LOGGER.debug("apns_length(%s) policies_to_find %s", apns_length, + json.dumps(policies_to_find)) + + if apns_length == 1: + policies = [PolicyRest.get_latest_policy(apns[0])] + else: + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) + policies = pool.map(PolicyRest.get_latest_policy, apns) + pool.close() + pool.join() + + metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}" + .format(apns_length, str_metrics, + len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.items() + if not policy_to_find.get(PolicyRest.EXPECTED_VERSIONS) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.items() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + _LOGGER.debug( + "result(%s) updated_policies %s, removed_policies %s, errored_policies %s", + apns_length, json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: + audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) + audit.error( + "errored_policies in PDP: {}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + + + @staticmethod + def get_latest_policies(audit, policy_ids=None): + """Get the latest policies by policy-ids from the policy-engine""" + result = {} + str_policy_ids = json.dumps(policy_ids or []) + + try: + PolicyRest._lazy_init() + if policy_ids: + _, latest_policies = PolicyRest._pdp_get_decision(audit, policy_ids) + + if not latest_policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + _LOGGER.warning(audit.warn( + "received no policies from PDP for policy_ids {}" + .format(str_policy_ids), error_code=AuditResponseCode.DATA_ERROR)) + return latest_policies + + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.items(): + if PolicyUtils.validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + + result[LATEST_POLICIES] = valid_policies + result[ERRORED_POLICIES] = errored_policies + + _LOGGER.debug("got policies for policy_ids: %s. result: %s", + str_policy_ids, json.dumps(result)) + return result - return latest_policy except Exception as ex: - error_msg = ("{}: get_latest_policy({}) crash {}: {}" - .format(audit.request_id, policy_id, type(ex).__name__, str(ex))) + error_msg = ("{}: crash {} {} at {}: {}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policies", str_policy_ids)) _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) |