diff options
author | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
commit | f2d7bef13705812c1bf147c2fb65162fbf385c6b (patch) | |
tree | e6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler/policy_updater.py | |
parent | 50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff) |
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads
of policy-handler to avoid losing the thread and tracking
the unexpected crashes
- rediscover the deployment-handler if not found before
and after each catchup
- refactored audit - separated metrics from audit
- added more stats and runtime info to healthcheck
= gc counts and garbage info if any detected
= memory usage - to detect the potential memory leaks
= request_id to all stats
= stats of active requests
- avoid reallocating the whole Queue of policy-updates after catchup
= clear of the internal queue under proper lock
Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r-- | policyhandler/policy_updater.py | 154 |
1 files changed, 100 insertions, 54 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 70823fa..38ce93a 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -26,7 +26,7 @@ from threading import Lock, Thread from .config import Config from .deploy_handler import DeployHandler -from .onap.audit import Audit +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES) from .policy_rest import PolicyRest @@ -56,6 +56,7 @@ class PolicyUpdater(Thread): self._lock = Lock() self._queue = Queue() + def enqueue(self, audit=None, policies_updated=None, policies_removed=None): """enqueue the policy-updates""" policies_updated = policies_updated or [] @@ -65,7 +66,23 @@ class PolicyUpdater(Thread): "enqueue request_id %s policies_updated %s policies_removed %s", ((audit and audit.request_id) or "none"), json.dumps(policies_updated), json.dumps(policies_removed)) - self._queue.put((audit, policies_updated, policies_removed)) + + with self._lock: + self._queue.put((audit, policies_updated, policies_removed)) + + + def catch_up(self, audit=None): + """need to bring the latest policies to DCAE-Controller""" + with self._lock: + if not self._aud_catch_up: + self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) + PolicyUpdater._logger.info( + "catch_up %s request_id %s", + self._aud_catch_up.req_message, self._aud_catch_up.request_id + ) + + self.enqueue() + def run(self): """wait and run the policy-update in thread""" @@ -78,32 +95,15 @@ class PolicyUpdater(Thread): json.dumps(policies_updated), json.dumps(policies_removed)) if not self._keep_running(): - self._queue.task_done() break if self._on_catch_up(): self._reset_queue() continue elif not queued_audit: - self._queue.task_done() continue - updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (queued_audit, policies_updated, policies_removed)) - - message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - - deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) - - self._queue.task_done() - queued_audit.audit_done() - - if deployment_handler_changed: - self._catch_up_prev_message = None - self._pause_catch_up_timer() - self.catch_up() - elif not queued_audit.is_success(): - self._catch_up_prev_message = None + self._on_policies_update(queued_audit, policies_updated, policies_removed) PolicyUpdater._logger.info("exit policy-updater") @@ -116,17 +116,6 @@ class PolicyUpdater(Thread): self._aud_shutdown.audit_done() return keep_running - def catch_up(self, audit=None): - """need to bring the latest policies to DCAE-Controller""" - with self._lock: - self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) - PolicyUpdater._logger.info( - "catch_up %s request_id %s", - self._aud_catch_up.req_message, self._aud_catch_up.request_id - ) - - self.enqueue() - def _run_catch_up_timer(self): """create and start the catch_up timer""" if not self._catch_up_interval: @@ -190,9 +179,9 @@ class PolicyUpdater(Thread): def _reset_queue(self): """clear up the queue""" with self._lock: - self._aud_catch_up = None - self._queue.task_done() - self._queue = Queue() + if not self._aud_catch_up and not self._aud_shutdown: + with self._queue.mutex: + self._queue.queue.clear() def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" @@ -207,37 +196,94 @@ class PolicyUpdater(Thread): log_line = "catch_up {0} request_id {1}".format( aud_catch_up.req_message, aud_catch_up.request_id ) + try: + PolicyUpdater._logger.info(log_line) + self._pause_catch_up_timer() - PolicyUpdater._logger.info(log_line) - self._pause_catch_up_timer() + catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) + catch_up_message[CATCH_UP] = True - catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) - catch_up_message[CATCH_UP] = True - - catch_up_result = "" - if not aud_catch_up.is_success(): - catch_up_result = "- not sending catch-up to deployment-handler due to errors" - PolicyUpdater._logger.warn(catch_up_result) - elif self._need_to_send_catch_up(aud_catch_up, catch_up_message): - DeployHandler.policy_update(aud_catch_up, catch_up_message) - if aud_catch_up.is_success(): - catch_up_result = "- sent catch-up to deployment-handler" - else: - catch_up_result = "- failed to send catch-up to deployment-handler" + catch_up_result = "" + if not aud_catch_up.is_success(): + catch_up_result = "- not sending catch-up to deployment-handler due to errors" PolicyUpdater._logger.warn(catch_up_result) - else: - catch_up_result = "- skipped sending the same policies" - success, _, _ = aud_catch_up.audit_done(result=catch_up_result) - PolicyUpdater._logger.info(log_line + " " + catch_up_result) - PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message): + catch_up_result = "- skipped sending the same policies" + else: + DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + if not aud_catch_up.is_success(): + catch_up_result = "- failed to send catch-up to deployment-handler" + PolicyUpdater._logger.warn(catch_up_result) + else: + catch_up_result = "- sent catch-up to deployment-handler" + success, _, _ = aud_catch_up.audit_done(result=catch_up_result) + PolicyUpdater._logger.info(log_line + " " + catch_up_result) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(aud_catch_up.request_id, type(ex).__name__, str(ex), + "on_catch_up", log_line + " " + catch_up_result)) + + PolicyUpdater._logger.exception(error_msg) + aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + success = False if not success: self._catch_up_prev_message = None self._run_catch_up_timer() + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_catch_up.health(full=True))) return success + + def _on_policies_update(self, queued_audit, policies_updated, policies_removed): + """handle the event of policy-updates from the queue""" + deployment_handler_changed = None + result = "" + + log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( + ((queued_audit and queued_audit.request_id) or "none"), + json.dumps(policies_updated), json.dumps(policies_removed)) + + try: + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (queued_audit, policies_updated, policies_removed)) + + if not queued_audit.is_success(): + result = "- not sending policy-updates to deployment-handler due to errors" + PolicyUpdater._logger.warn(result) + else: + message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} + deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) + if not queued_audit.is_success(): + result = "- failed to send policy-updates to deployment-handler" + PolicyUpdater._logger.warn(result) + else: + result = "- sent policy-updates to deployment-handler" + + success, _, _ = queued_audit.audit_done(result=result) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(queued_audit.request_id, type(ex).__name__, str(ex), + "on_policies_update", log_line + " " + result)) + + PolicyUpdater._logger.exception(error_msg) + queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + success = False + + if deployment_handler_changed: + self._catch_up_prev_message = None + self._pause_catch_up_timer() + self.catch_up() + elif not success: + self._catch_up_prev_message = None + + def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") |