diff options
author | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
commit | f2d7bef13705812c1bf147c2fb65162fbf385c6b (patch) | |
tree | e6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler/policy_receiver.py | |
parent | 50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff) |
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads
of policy-handler to avoid losing the thread and tracking
the unexpected crashes
- rediscover the deployment-handler if not found before
and after each catchup
- refactored audit - separated metrics from audit
- added more stats and runtime info to healthcheck
= gc counts and garbage info if any detected
= memory usage - to detect the potential memory leaks
= request_id to all stats
= stats of active requests
- avoid reallocating the whole Queue of policy-updates after catchup
= clear of the internal queue under proper lock
Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r-- | policyhandler/policy_receiver.py | 66 |
1 files changed, 35 insertions, 31 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 751bea8..dd9eea6 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -35,7 +35,7 @@ from threading import Lock, Thread import websocket from .config import Config -from .onap.audit import Audit +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_updater import PolicyUpdater LOADED_POLICIES = 'loadedPolicies' @@ -115,32 +115,41 @@ class _PolicyReceiver(Thread): def _on_pdp_message(self, _, message): """received the notification from PDP""" - _PolicyReceiver._logger.info("Received notification message: %s", message) - if not message: - return - message = json.loads(message) - - if not message: - return - - policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(LOADED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(REMOVED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - - if not policies_updated and not policies_removed: - _PolicyReceiver._logger.info( - "no policy updated or removed for scopes %s", self._policy_scopes.pattern - ) - return + try: + _PolicyReceiver._logger.info("Received notification message: %s", message) + if not message: + return + message = json.loads(message) + + if not message or not isinstance(message, dict): + _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message)) + return + + policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(LOADED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(REMOVED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + + if not policies_updated and not policies_removed: + _PolicyReceiver._logger.info("no policy updated or removed for scopes %s", + self._policy_scopes.pattern) + return + + audit = Audit(job_name="policy_update", + req_message="policy-notification - updated[{0}], removed[{1}]" + .format(len(policies_updated), len(policies_removed)), + retry_get_config=True) + self._policy_updater.enqueue(audit, policies_updated, policies_removed) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_pdp_message", json.dumps(message)) + + _PolicyReceiver._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - audit = Audit(job_name="policy_update", - req_message="policy-notification - updated[{0}], removed[{1}]" - .format(len(policies_updated), len(policies_removed))) - audit.retry_get_config = True - self._policy_updater.enqueue(audit, policies_updated, policies_removed) def _on_ws_error(self, _, error): """report an error""" @@ -184,12 +193,7 @@ class PolicyReceiver(object): @staticmethod def run(audit): """Using policy-engine client to talk to policy engine""" - sub_aud = Audit(aud_parent=audit) - sub_aud.metrics_start("start policy receiver") - PolicyReceiver._policy_receiver = _PolicyReceiver() PolicyReceiver._policy_receiver.start() - sub_aud.metrics("started policy receiver") - PolicyReceiver.catch_up(audit) |