diff options
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r-- | policyhandler/policy_receiver.py | 66 |
1 files changed, 35 insertions, 31 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 751bea8..dd9eea6 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -35,7 +35,7 @@ from threading import Lock, Thread import websocket from .config import Config -from .onap.audit import Audit +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_updater import PolicyUpdater LOADED_POLICIES = 'loadedPolicies' @@ -115,32 +115,41 @@ class _PolicyReceiver(Thread): def _on_pdp_message(self, _, message): """received the notification from PDP""" - _PolicyReceiver._logger.info("Received notification message: %s", message) - if not message: - return - message = json.loads(message) - - if not message: - return - - policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(LOADED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(REMOVED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - - if not policies_updated and not policies_removed: - _PolicyReceiver._logger.info( - "no policy updated or removed for scopes %s", self._policy_scopes.pattern - ) - return + try: + _PolicyReceiver._logger.info("Received notification message: %s", message) + if not message: + return + message = json.loads(message) + + if not message or not isinstance(message, dict): + _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message)) + return + + policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(LOADED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(REMOVED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + + if not policies_updated and not policies_removed: + _PolicyReceiver._logger.info("no policy updated or removed for scopes %s", + self._policy_scopes.pattern) + return + + audit = Audit(job_name="policy_update", + req_message="policy-notification - updated[{0}], removed[{1}]" + .format(len(policies_updated), len(policies_removed)), + retry_get_config=True) + self._policy_updater.enqueue(audit, policies_updated, policies_removed) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_pdp_message", json.dumps(message)) + + _PolicyReceiver._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - audit = Audit(job_name="policy_update", - req_message="policy-notification - updated[{0}], removed[{1}]" - .format(len(policies_updated), len(policies_removed))) - audit.retry_get_config = True - self._policy_updater.enqueue(audit, policies_updated, policies_removed) def _on_ws_error(self, _, error): """report an error""" @@ -184,12 +193,7 @@ class PolicyReceiver(object): @staticmethod def run(audit): """Using policy-engine client to talk to policy engine""" - sub_aud = Audit(aud_parent=audit) - sub_aud.metrics_start("start policy receiver") - PolicyReceiver._policy_receiver = _PolicyReceiver() PolicyReceiver._policy_receiver.start() - sub_aud.metrics("started policy receiver") - PolicyReceiver.catch_up(audit) |