aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_receiver.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r--policyhandler/policy_receiver.py66
1 files changed, 35 insertions, 31 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 751bea8..dd9eea6 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -35,7 +35,7 @@ from threading import Lock, Thread
import websocket
from .config import Config
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_updater import PolicyUpdater
LOADED_POLICIES = 'loadedPolicies'
@@ -115,32 +115,41 @@ class _PolicyReceiver(Thread):
def _on_pdp_message(self, _, message):
"""received the notification from PDP"""
- _PolicyReceiver._logger.info("Received notification message: %s", message)
- if not message:
- return
- message = json.loads(message)
-
- if not message:
- return
-
- policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(LOADED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
- policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(REMOVED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
-
- if not policies_updated and not policies_removed:
- _PolicyReceiver._logger.info(
- "no policy updated or removed for scopes %s", self._policy_scopes.pattern
- )
- return
+ try:
+ _PolicyReceiver._logger.info("Received notification message: %s", message)
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message))
+ return
+
+ policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(LOADED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+ policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(REMOVED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+
+ if not policies_updated and not policies_removed:
+ _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
+ self._policy_scopes.pattern)
+ return
+
+ audit = Audit(job_name="policy_update",
+ req_message="policy-notification - updated[{0}], removed[{1}]"
+ .format(len(policies_updated), len(policies_removed)),
+ retry_get_config=True)
+ self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _PolicyReceiver._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- audit = Audit(job_name="policy_update",
- req_message="policy-notification - updated[{0}], removed[{1}]"
- .format(len(policies_updated), len(policies_removed)))
- audit.retry_get_config = True
- self._policy_updater.enqueue(audit, policies_updated, policies_removed)
def _on_ws_error(self, _, error):
"""report an error"""
@@ -184,12 +193,7 @@ class PolicyReceiver(object):
@staticmethod
def run(audit):
"""Using policy-engine client to talk to policy engine"""
- sub_aud = Audit(aud_parent=audit)
- sub_aud.metrics_start("start policy receiver")
-
PolicyReceiver._policy_receiver = _PolicyReceiver()
PolicyReceiver._policy_receiver.start()
- sub_aud.metrics("started policy receiver")
-
PolicyReceiver.catch_up(audit)