From 9a4d3c5b8dc9c7697275cab38ee45b014dff9e55 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Mon, 1 Apr 2019 11:32:06 -0400 Subject: 5.0.0 policy-handler - new PDP API or old PDP API - in R4 Dublin the policy-engine introduced a totally new API - policy-handler now has a startup option to either use the new PDP API or the old PDP API that was created-updated before the end of 2018 - see README.md and README_pdp_api_v0.md for instructions on how to setup the policy-handler running either with the new PDP API or the old (pdp_api_v0) PDP API - this is a massive refactoring that changed almost all the source files, but kept the old logic when using the old (pdp_api_v0) PDP API - all the code related to PDP API version is split into two subfolders = pdp_api/ contains the new PDP API source code = pdp_api_v0/ contains the old (2018) PDP API source code = pdp_client.py imports from either pdp_api or pdp_api_v0 = the rest of the code is only affected when it needs to branch the logic - logging to policy_handler.log now shows the path of the source file to allow tracing which PDP API is actually used - when the new PDP API is used, the policy-update flow is disabled = passive mode of operation = no web-socket = no periodic catch_up = no policy-filters = reduced web-API - only a single /policy_latest endpoint is available /policies_latest returns 404 /catch_up request is accepted, but ignored - on new PDP API: http /policy_latest returns the new data from the new PDP API with the following fields added by the policy-handler to keep other policy related parts intact in R4 (see pdp_api/policy_utils.py) = "policyName" = policy_id + "." + "policyVersion" + ".xml" = "policyVersion" = str("metadata"."policy-version") = "config" - is the renamed "properties" from the new PDP API response - unit tests are split into two subfolders as well = main/ for the new PDP API testing = pdp_api_v0/ for the old (2018) PDP API - removed the following line from the license text of changed files ECOMP is a trademark and service mark of AT&T Intellectual Property. - the new PDP API is expected to be extended and redesigned in R5 El Alto - on retiring the old PDP API - the intention is to be able to remove the pdp_api_v0/ subfolder and minimal related cleanup of the code that imports that as well as the cleanup of the config.py, etc. Change-Id: Ief9a2ae4541300308caaf97377f4ed051535dbe4 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-1128 --- policyhandler/policy_updater.py | 214 +++++++++++----------------------------- 1 file changed, 60 insertions(+), 154 deletions(-) (limited to 'policyhandler/policy_updater.py') diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index af1ea4b..3fcde40 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -14,109 +14,26 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """policy-updater thread""" import json -import logging from threading import Event, Lock, Thread +from . import pdp_client from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode -from .policy_consts import (AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP, - POLICY_BODY, POLICY_ID, POLICY_NAME, POLICY_NAMES, - POLICY_VERSION) -from .policy_matcher import PolicyMatcher -from .policy_rest import PolicyRest -from .policy_utils import PolicyUtils +from .policy_consts import AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP from .service_activator import ServiceActivator from .step_timer import StepTimer +from .utils import Utils -class _PolicyUpdate(object): - """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" - _logger = logging.getLogger("policy_handler.policy_update") - - def __init__(self): - """init and reset""" - self._audit = None - self._policies_updated = {} - self._policies_removed = {} - - def reset(self): - """resets the state""" - self.__init__() - - def pop_policy_update(self): - """ - Returns the consolidated (audit, policies_updated, policies_removed) - and resets the state - """ - if not self._audit: - return None, None, None - - audit = self._audit - policies_updated = self._policies_updated - policies_removed = self._policies_removed - - self.reset() - - return audit, policies_updated, policies_removed - - - def push_policy_update(self, policies_updated, policies_removed): - """consolidate the new policies_updated, policies_removed to existing ones""" - for policy_body in policies_updated: - policy_name = policy_body.get(POLICY_NAME) - policy = PolicyUtils.convert_to_policy(policy_body) - if not policy: - continue - policy_id = policy.get(POLICY_ID) - - self._policies_updated[policy_id] = policy - - rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) - if rm_policy_names and policy_name in rm_policy_names: - del rm_policy_names[policy_name] - - for policy_body in policies_removed: - policy_name = policy_body.get(POLICY_NAME) - policy = PolicyUtils.convert_to_policy(policy_body) - if not policy: - continue - policy_id = policy.get(POLICY_ID) - - if policy_id in self._policies_removed: - policy = self._policies_removed[policy_id] - - if POLICY_NAMES not in policy: - policy[POLICY_NAMES] = {} - policy[POLICY_NAMES][policy_name] = True - self._policies_removed[policy_id] = policy - - req_message = ("policy-update notification - updated[{0}], removed[{1}]" - .format(len(self._policies_updated), - len(self._policies_removed))) - - if not self._audit: - self._audit = Audit(job_name="policy_update", - req_message=req_message, - retry_get_config=True) - else: - self._audit.req_message = req_message - - self._logger.info( - "pending(%s) for %s policies_updated %s policies_removed %s", - self._audit.request_id, req_message, - json.dumps(self._policies_updated), json.dumps(self._policies_removed)) - +_LOGGER = Utils.get_logger(__file__) class PolicyUpdater(Thread): """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" - _logger = logging.getLogger("policy_handler.policy_updater") - def __init__(self, on_reconfigure_receiver): """init static config of PolicyUpdater.""" Thread.__init__(self, name="policy_updater", daemon=True) @@ -132,7 +49,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = None self._aud_catch_up = None self._aud_reconfigure = None - self._policy_update = _PolicyUpdate() + self._policy_updates = pdp_client.PolicyUpdates() self._catch_up_interval = None self._reconfigure_interval = None @@ -151,7 +68,7 @@ class PolicyUpdater(Thread): _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 - PolicyUpdater._logger.info( + _LOGGER.info( "intervals: catch_up(%s) reconfigure(%s): %s", self._catch_up_interval, self._reconfigure_interval, self._settings) self._settings.commit_change() @@ -160,7 +77,7 @@ class PolicyUpdater(Thread): def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" with self._lock: - self._policy_update.push_policy_update(policies_updated, policies_removed) + self._policy_updates.push_policy_updates(policies_updated, policies_removed) self._run.set() def catch_up(self, audit=None): @@ -168,7 +85,7 @@ class PolicyUpdater(Thread): with self._lock: if not self._aud_catch_up: self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) - PolicyUpdater._logger.info( + _LOGGER.info( "catch_up %s request_id %s", self._aud_catch_up.req_message, self._aud_catch_up.request_id ) @@ -179,7 +96,7 @@ class PolicyUpdater(Thread): with self._lock: if not self._aud_reconfigure: self._aud_reconfigure = audit or Audit(req_message=AUTO_RECONFIGURE) - PolicyUpdater._logger.info( + _LOGGER.info( "%s request_id %s", self._aud_reconfigure.req_message, self._aud_reconfigure.request_id ) @@ -187,10 +104,10 @@ class PolicyUpdater(Thread): def run(self): """wait and run the policy-update in thread""" - PolicyUpdater._logger.info("starting policy_updater...") + _LOGGER.info("starting policy_updater...") self._run_reconfigure_timer() while True: - PolicyUpdater._logger.info("waiting for policy-updates...") + _LOGGER.info("waiting for policy-updates...") self._run.wait() with self._lock: @@ -209,7 +126,7 @@ class PolicyUpdater(Thread): self._on_policy_update() - PolicyUpdater._logger.info("exit policy-updater") + _LOGGER.info("exit policy-updater") def _keep_running(self): """thread-safe check whether to continue running""" @@ -226,18 +143,15 @@ class PolicyUpdater(Thread): return if self._catch_up_timer: - self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) + _LOGGER.info("next step catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.next(self._catch_up_interval) return self._catch_up_timer = StepTimer( - "catch_up_timer", - self._catch_up_interval, - PolicyUpdater.catch_up, - PolicyUpdater._logger, - self + "catch_up_timer", self._catch_up_interval, + PolicyUpdater.catch_up, self ) - self._logger.info("started catch_up_timer in %s", self._catch_up_interval) + _LOGGER.info("started catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.start() def _run_reconfigure_timer(self): @@ -246,41 +160,38 @@ class PolicyUpdater(Thread): return if self._reconfigure_timer: - self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) + _LOGGER.info("next step reconfigure_timer in %s", self._reconfigure_interval) self._reconfigure_timer.next(self._reconfigure_interval) return self._reconfigure_timer = StepTimer( - "reconfigure_timer", - self._reconfigure_interval, - PolicyUpdater.reconfigure, - PolicyUpdater._logger, - self + "reconfigure_timer", self._reconfigure_interval, + PolicyUpdater.reconfigure, self ) - self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) + _LOGGER.info("started reconfigure_timer in %s", self._reconfigure_interval) self._reconfigure_timer.start() def _pause_catch_up_timer(self): """pause catch_up_timer""" if self._catch_up_timer: - self._logger.info("pause catch_up_timer") + _LOGGER.info("pause catch_up_timer") self._catch_up_timer.pause() def _stop_timers(self): """stop and destroy the catch_up and reconfigure timers""" if self._catch_up_timer: - self._logger.info("stopping catch_up_timer") + _LOGGER.info("stopping catch_up_timer") self._catch_up_timer.stop() self._catch_up_timer.join() self._catch_up_timer = None - self._logger.info("stopped catch_up_timer") + _LOGGER.info("stopped catch_up_timer") if self._reconfigure_timer: - self._logger.info("stopping reconfigure_timer") + _LOGGER.info("stopping reconfigure_timer") self._reconfigure_timer.stop() self._reconfigure_timer.join() self._reconfigure_timer = None - self._logger.info("stopped reconfigure_timer") + _LOGGER.info("stopped reconfigure_timer") def _on_reconfigure(self): """bring the latest config and reconfigure""" @@ -296,7 +207,7 @@ class PolicyUpdater(Thread): reconfigure_result = "" try: need_to_catch_up = False - PolicyUpdater._logger.info(log_line) + _LOGGER.info(log_line) active_prev = ServiceActivator.is_active_mode_of_operation(aud_reconfigure) Config.discover(aud_reconfigure) @@ -314,7 +225,7 @@ class PolicyUpdater(Thread): if self._set_timer_intervals(): changed_configs.append("timer_intervals") - if PolicyRest.reconfigure(): + if pdp_client.PolicyRest.reconfigure(): need_to_catch_up = True changed_configs.append(Config.FIELD_POLICY_ENGINE) @@ -335,7 +246,7 @@ class PolicyUpdater(Thread): Config.discovered_config.commit_change() aud_reconfigure.audit_done(result=reconfigure_result) - PolicyUpdater._logger.info(log_line + reconfigure_result) + _LOGGER.info(log_line + reconfigure_result) if need_to_catch_up: self._pause_catch_up_timer() @@ -345,16 +256,15 @@ class PolicyUpdater(Thread): error_msg = "crash {} {}{}: {}: {}".format( "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) - PolicyUpdater._logger.exception(error_msg) + _LOGGER.exception(error_msg) aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) aud_reconfigure.audit_done(result=error_msg) self._run_reconfigure_timer() - PolicyUpdater._logger.info("policy_handler health: %s", - json.dumps(aud_reconfigure.health(full=True))) - PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) + _LOGGER.info("policy_handler health: %s", json.dumps(aud_reconfigure.health(full=True))) + _LOGGER.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) def _on_catch_up(self): @@ -363,7 +273,7 @@ class PolicyUpdater(Thread): aud_catch_up = self._aud_catch_up if self._aud_catch_up: self._aud_catch_up = None - self._policy_update.reset() + self._policy_updates.reset() if not aud_catch_up: return False @@ -374,12 +284,11 @@ class PolicyUpdater(Thread): ) self._pause_catch_up_timer() aud_catch_up.audit_done(result=catch_up_result) - PolicyUpdater._logger.info(catch_up_result) + _LOGGER.info(catch_up_result) self._run_catch_up_timer() - PolicyUpdater._logger.info("policy_handler health: %s", - json.dumps(aud_catch_up.health(full=True))) - PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) + _LOGGER.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True))) + _LOGGER.info("process_info: %s", json.dumps(aud_catch_up.process_info())) return False log_line = "catch_up {0} request_id {1}".format( @@ -388,25 +297,26 @@ class PolicyUpdater(Thread): catch_up_result = "" try: not_found_ok = None - PolicyUpdater._logger.info(log_line) + _LOGGER.info(log_line) self._pause_catch_up_timer() - _, policies, policy_filters = PolicyMatcher.get_deployed_policies(aud_catch_up) + (_, policies, + policy_filters) = pdp_client.PolicyMatcher.get_deployed_policies(aud_catch_up) catch_up_message = None if aud_catch_up.is_not_found(): not_found_ok = True else: - _, catch_up_message = PolicyMatcher.build_catch_up_message( + _, catch_up_message = pdp_client.PolicyMatcher.build_catch_up_message( aud_catch_up, policies, policy_filters) if not_found_ok: catch_up_result = ("- not sending catch-up " "- no deployed policies or policy-filters") - PolicyUpdater._logger.warning(catch_up_result) + _LOGGER.warning(catch_up_result) elif not (catch_up_message and aud_catch_up.is_success()): catch_up_result = "- not sending catch-up to deployment-handler due to errors" - PolicyUpdater._logger.warning(catch_up_result) + _LOGGER.warning(catch_up_result) elif catch_up_message.empty(): catch_up_result = "- not sending empty catch-up to deployment-handler" else: @@ -414,18 +324,18 @@ class PolicyUpdater(Thread): DeployHandler.policy_update(aud_catch_up, catch_up_message) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" - PolicyUpdater._logger.warning(catch_up_result) + _LOGGER.warning(catch_up_result) else: catch_up_result = "- sent catch-up to deployment-handler" success, _, _ = aud_catch_up.audit_done(result=catch_up_result) - PolicyUpdater._logger.info(log_line + " " + catch_up_result) + _LOGGER.info(log_line + " " + catch_up_result) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" .format(aud_catch_up.request_id, type(ex).__name__, str(ex), "on_catch_up", log_line + " " + catch_up_result)) - PolicyUpdater._logger.exception(error_msg) + _LOGGER.exception(error_msg) aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) aud_catch_up.audit_done(result=error_msg) @@ -433,9 +343,8 @@ class PolicyUpdater(Thread): self._run_catch_up_timer() - PolicyUpdater._logger.info("policy_handler health: %s", - json.dumps(aud_catch_up.health(full=True))) - PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) + _LOGGER.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True))) + _LOGGER.info("process_info: %s", json.dumps(aud_catch_up.process_info())) return success @@ -443,42 +352,39 @@ class PolicyUpdater(Thread): """handle the event of policy-updates""" result = "" with self._lock: - audit, policies_updated, policies_removed = self._policy_update.pop_policy_update() + audit, policies_updated, policies_removed = self._policy_updates.pop_policy_updates() if not audit: return log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed)) - PolicyUpdater._logger.info(log_line) + _LOGGER.info(log_line) try: not_found_ok = None (updated_policies, removed_policies, - policy_filter_matches) = PolicyMatcher.match_to_deployed_policies( + policy_filter_matches) = pdp_client.PolicyMatcher.match_to_deployed_policies( audit, policies_updated, policies_removed) if audit.is_not_found(): not_found_ok = True elif updated_policies or removed_policies: - updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (audit, - [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) - for policy_id, policy in updated_policies.items()], - [(policy_id, policy.get(POLICY_NAMES, {})) - for policy_id, policy in removed_policies.items()] - )) + (updated_policies, + removed_policies) = pdp_client.PolicyRest.get_latest_updated_policies( + audit, updated_policies, removed_policies + ) if not_found_ok: result = ("- not sending policy-updates to deployment-handler " "- no deployed policies or policy-filters") - PolicyUpdater._logger.warning(result) + _LOGGER.warning(result) elif not audit.is_success(): result = "- not sending policy-updates to deployment-handler due to errors" - PolicyUpdater._logger.warning(result) + _LOGGER.warning(result) elif not updated_policies and not removed_policies: result = "- not sending empty policy-updates to deployment-handler" - PolicyUpdater._logger.info(result) + _LOGGER.info(result) else: message = PolicyUpdateMessage(updated_policies, removed_policies, policy_filter_matches, False) @@ -493,19 +399,19 @@ class PolicyUpdater(Thread): log_line = "request_id[{}]: {}".format(audit.request_id, str(message)) if not audit.is_success(): result = "- failed to send to deployment-handler {}".format(log_updates) - PolicyUpdater._logger.warning(result) + _LOGGER.warning(result) else: result = "- sent to deployment-handler {}".format(log_updates) audit.audit_done(result=result) - PolicyUpdater._logger.info(log_line + " " + result) + _LOGGER.info(log_line + " " + result) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" .format(audit.request_id, type(ex).__name__, str(ex), "on_policies_update", log_line + " " + result)) - PolicyUpdater._logger.exception(error_msg) + _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) @@ -517,7 +423,7 @@ class PolicyUpdater(Thread): def shutdown(self, audit): """Shutdown the policy-updater""" - PolicyUpdater._logger.info("shutdown policy-updater") + _LOGGER.info("shutdown policy-updater") with self._lock: self._aud_shutdown = audit self._run.set() -- cgit 1.2.3-korg