aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_receiver.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-05-10 09:23:16 -0400
committerAlex Shatov <alexs@att.com>2018-05-10 09:23:16 -0400
commitf2d7bef13705812c1bf147c2fb65162fbf385c6b (patch)
treee6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler/policy_receiver.py
parent50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff)
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads of policy-handler to avoid losing the thread and tracking the unexpected crashes - rediscover the deployment-handler if not found before and after each catchup - refactored audit - separated metrics from audit - added more stats and runtime info to healthcheck = gc counts and garbage info if any detected = memory usage - to detect the potential memory leaks = request_id to all stats = stats of active requests - avoid reallocating the whole Queue of policy-updates after catchup = clear of the internal queue under proper lock Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r--policyhandler/policy_receiver.py66
1 files changed, 35 insertions, 31 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 751bea8..dd9eea6 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -35,7 +35,7 @@ from threading import Lock, Thread
import websocket
from .config import Config
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_updater import PolicyUpdater
LOADED_POLICIES = 'loadedPolicies'
@@ -115,32 +115,41 @@ class _PolicyReceiver(Thread):
def _on_pdp_message(self, _, message):
"""received the notification from PDP"""
- _PolicyReceiver._logger.info("Received notification message: %s", message)
- if not message:
- return
- message = json.loads(message)
-
- if not message:
- return
-
- policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(LOADED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
- policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(REMOVED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
-
- if not policies_updated and not policies_removed:
- _PolicyReceiver._logger.info(
- "no policy updated or removed for scopes %s", self._policy_scopes.pattern
- )
- return
+ try:
+ _PolicyReceiver._logger.info("Received notification message: %s", message)
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message))
+ return
+
+ policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(LOADED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+ policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(REMOVED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+
+ if not policies_updated and not policies_removed:
+ _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
+ self._policy_scopes.pattern)
+ return
+
+ audit = Audit(job_name="policy_update",
+ req_message="policy-notification - updated[{0}], removed[{1}]"
+ .format(len(policies_updated), len(policies_removed)),
+ retry_get_config=True)
+ self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _PolicyReceiver._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- audit = Audit(job_name="policy_update",
- req_message="policy-notification - updated[{0}], removed[{1}]"
- .format(len(policies_updated), len(policies_removed)))
- audit.retry_get_config = True
- self._policy_updater.enqueue(audit, policies_updated, policies_removed)
def _on_ws_error(self, _, error):
"""report an error"""
@@ -184,12 +193,7 @@ class PolicyReceiver(object):
@staticmethod
def run(audit):
"""Using policy-engine client to talk to policy engine"""
- sub_aud = Audit(aud_parent=audit)
- sub_aud.metrics_start("start policy receiver")
-
PolicyReceiver._policy_receiver = _PolicyReceiver()
PolicyReceiver._policy_receiver.start()
- sub_aud.metrics("started policy receiver")
-
PolicyReceiver.catch_up(audit)