diff options
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r-- | policyhandler/policy_updater.py | 256 |
1 files changed, 153 insertions, 103 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 5ba7c29..3ae8199 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -18,24 +18,101 @@ """policy-updater thread""" -import copy import json import logging -from queue import Queue -from threading import Lock, Thread +from threading import Event, Lock, Thread from .config import Config -from .deploy_handler import DeployHandler +from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode -from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, - REMOVED_POLICIES) +from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, + POLICY_NAME, POLICY_NAMES, POLICY_VERSION) +from .policy_matcher import PolicyMatcher from .policy_rest import PolicyRest -from .policy_utils import Utils +from .policy_utils import PolicyUtils from .step_timer import StepTimer +class _PolicyUpdate(object): + """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" + _logger = logging.getLogger("policy_handler.policy_update") + + def __init__(self): + """init and reset""" + self._audit = None + self._policies_updated = {} + self._policies_removed = {} + + def reset(self): + """resets the state""" + self.__init__() + + def pop_policy_update(self): + """ + Returns the consolidated (audit, policies_updated, policies_removed) + and resets the state + """ + if not self._audit: + return None, None, None + + audit = self._audit + policies_updated = self._policies_updated + policies_removed = self._policies_removed + + self.reset() + + return audit, policies_updated, policies_removed + + + def push_policy_update(self, policies_updated, policies_removed): + """consolidate the new policies_updated, policies_removed to existing ones""" + for policy_body in policies_updated: + policy_name = policy_body.get(POLICY_NAME) + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + + self._policies_updated[policy_id] = policy + + rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) + if rm_policy_names and policy_name in rm_policy_names: + del rm_policy_names[policy_name] + + for policy_body in policies_removed: + policy_name = policy_body.get(POLICY_NAME) + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + + if policy_id in self._policies_removed: + policy = self._policies_removed[policy_id] + + if POLICY_NAMES not in policy: + policy[POLICY_NAMES] = {} + policy[POLICY_NAMES][policy_name] = True + self._policies_removed[policy_id] = policy + + req_message = ("policy-update notification - updated[{0}], removed[{1}]" + .format(len(self._policies_updated), + len(self._policies_removed))) + + if not self._audit: + self._audit = Audit(job_name="policy_update", + req_message=req_message, + retry_get_config=True) + else: + self._audit.req_message = req_message + + self._logger.info( + "pending request_id %s for %s policies_updated %s policies_removed %s", + self._audit.request_id, req_message, + json.dumps(policies_updated), json.dumps(policies_removed)) + + class PolicyUpdater(Thread): - """queue and handle the policy-updates in a separate thread""" + """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" _logger = logging.getLogger("policy_handler.policy_updater") def __init__(self): @@ -43,33 +120,22 @@ class PolicyUpdater(Thread): Thread.__init__(self, name="policy_updater") self.daemon = True + self._lock = Lock() + self._run = Event() + self._catch_up_timer = None self._aud_shutdown = None self._aud_catch_up = None + self._policy_update = _PolicyUpdate() catch_up_config = Config.settings.get(CATCH_UP, {}) self._catch_up_interval = catch_up_config.get("interval") or 15*60 - self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 - self._catch_up_skips = 0 - self._catch_up_prev_message = None - - self._lock = Lock() - self._queue = Queue() - - def enqueue(self, audit=None, policies_updated=None, policies_removed=None): + def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" - policies_updated = policies_updated or [] - policies_removed = policies_removed or [] - - PolicyUpdater._logger.info( - "enqueue request_id %s policies_updated %s policies_removed %s", - ((audit and audit.request_id) or "none"), - json.dumps(policies_updated), json.dumps(policies_removed)) - with self._lock: - self._queue.put((audit, policies_updated, policies_removed)) - + self._policy_update.push_policy_update(policies_updated, policies_removed) + self._run.set() def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" @@ -80,30 +146,24 @@ class PolicyUpdater(Thread): "catch_up %s request_id %s", self._aud_catch_up.req_message, self._aud_catch_up.request_id ) - - self.enqueue() - + self._run.set() def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - queued_audit, policies_updated, policies_removed = self._queue.get() - PolicyUpdater._logger.info( - "got request_id %s policies_updated %s policies_removed %s", - ((queued_audit and queued_audit.request_id) or "none"), - json.dumps(policies_updated), json.dumps(policies_removed)) + self._run.wait() + + with self._lock: + self._run.clear() if not self._keep_running(): break if self._on_catch_up(): - self._reset_queue() - continue - elif not queued_audit: continue - self._on_policies_update(queued_audit, policies_updated, policies_removed) + self._on_policy_update() PolicyUpdater._logger.info("exit policy-updater") @@ -151,44 +211,13 @@ class PolicyUpdater(Thread): self._catch_up_timer = None self._logger.info("stopped catch_up_timer") - def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): - """try not to send the duplicate messages on auto catchup unless hitting the max count""" - if aud_catch_up.req_message != AUTO_CATCH_UP \ - or self._catch_up_skips >= self._catch_up_max_skips \ - or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): - self._catch_up_skips = 0 - self._catch_up_prev_message = copy.deepcopy(catch_up_message) - log_line = "going to send the catch_up {0}: {1}".format( - aud_catch_up.req_message, - json.dumps(self._catch_up_prev_message) - ) - self._logger.info(log_line) - aud_catch_up.info(log_line) - return True - - self._catch_up_skips += 1 - self._catch_up_prev_message = copy.deepcopy(catch_up_message) - log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format( - self._catch_up_skips, self._catch_up_max_skips, - aud_catch_up.req_message, json.dumps(self._catch_up_prev_message) - ) - self._logger.info(log_line) - aud_catch_up.info(log_line) - return False - - def _reset_queue(self): - """clear up the queue""" - with self._lock: - if not self._aud_catch_up and not self._aud_shutdown: - with self._queue.mutex: - self._queue.queue.clear() - def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: aud_catch_up = self._aud_catch_up if self._aud_catch_up: self._aud_catch_up = None + self._policy_update.reset() if not aud_catch_up: return False @@ -196,20 +225,20 @@ class PolicyUpdater(Thread): log_line = "catch_up {0} request_id {1}".format( aud_catch_up.req_message, aud_catch_up.request_id ) + catch_up_result = "" try: PolicyUpdater._logger.info(log_line) self._pause_catch_up_timer() - catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) - catch_up_message[CATCH_UP] = True + _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up) - catch_up_result = "" - if not aud_catch_up.is_success(): + if not catch_up_message or not aud_catch_up.is_success_or_not_found(): catch_up_result = "- not sending catch-up to deployment-handler due to errors" PolicyUpdater._logger.warning(catch_up_result) - elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message): - catch_up_result = "- skipped sending the same policies" + elif catch_up_message.empty(): + catch_up_result = "- not sending empty catch-up to deployment-handler" else: + aud_catch_up.reset_http_status_not_found() DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" @@ -229,9 +258,6 @@ class PolicyUpdater(Thread): aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) success = False - if not success: - self._catch_up_prev_message = None - self._run_catch_up_timer() PolicyUpdater._logger.info("policy_handler health: %s", @@ -240,49 +266,73 @@ class PolicyUpdater(Thread): return success - def _on_policies_update(self, queued_audit, policies_updated, policies_removed): - """handle the event of policy-updates from the queue""" - deployment_handler_changed = None + def _on_policy_update(self): + """handle the event of policy-updates""" result = "" + with self._lock: + audit, policies_updated, policies_removed = self._policy_update.pop_policy_update() + + if not audit: + return log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( - ((queued_audit and queued_audit.request_id) or "none"), - json.dumps(policies_updated), json.dumps(policies_removed)) + audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed)) + PolicyUpdater._logger.info(log_line) try: - updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (queued_audit, policies_updated, policies_removed)) - - if not queued_audit.is_success(): + (updated_policies, removed_policies, + policy_filter_matches) = PolicyMatcher.match_to_deployed_policies( + audit, policies_updated, policies_removed) + + if updated_policies or removed_policies: + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (audit, + [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) + for policy_id, policy in updated_policies.items()], + [(policy_id, policy.get(POLICY_NAMES, {})) + for policy_id, policy in removed_policies.items()] + )) + + if not audit.is_success_or_not_found(): result = "- not sending policy-updates to deployment-handler due to errors" PolicyUpdater._logger.warning(result) + elif not updated_policies and not removed_policies: + result = "- not sending empty policy-updates to deployment-handler" + PolicyUpdater._logger.warning(result) else: - message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) - if not queued_audit.is_success(): - result = "- failed to send policy-updates to deployment-handler" + message = PolicyUpdateMessage(updated_policies, removed_policies, + policy_filter_matches, False) + log_updates = ("policies-updated[{0}], removed[{1}]" + .format(len(updated_policies), len(removed_policies))) + + audit.reset_http_status_not_found() + DeployHandler.policy_update(audit, message) + + if not audit.is_success(): + result = "- failed to send to deployment-handler {}".format(log_updates) PolicyUpdater._logger.warning(result) else: - result = "- sent policy-updates to deployment-handler" + result = "- sent to deployment-handler {}".format(log_updates) + log_line = "request_id: {} updated_policies: {} removed_policies: {}".format( + audit.request_id, + json.dumps(updated_policies), json.dumps(removed_policies)) - success, _, _ = queued_audit.audit_done(result=result) + audit.audit_done(result=result) + PolicyUpdater._logger.info(log_line + " " + result) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" - .format(queued_audit.request_id, type(ex).__name__, str(ex), + .format(audit.request_id, type(ex).__name__, str(ex), "on_policies_update", log_line + " " + result)) PolicyUpdater._logger.exception(error_msg) - queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - success = False + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - if deployment_handler_changed: - self._catch_up_prev_message = None + if DeployHandler.server_instance_changed: + DeployHandler.server_instance_changed = False self._pause_catch_up_timer() self.catch_up() - elif not success: - self._catch_up_prev_message = None def shutdown(self, audit): @@ -290,7 +340,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.info("shutdown policy-updater") with self._lock: self._aud_shutdown = audit - self.enqueue() + self._run.set() self._stop_catch_up_timer() |