aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py154
1 files changed, 100 insertions, 54 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 70823fa..38ce93a 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -26,7 +26,7 @@ from threading import Lock, Thread
from .config import Config
from .deploy_handler import DeployHandler
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
REMOVED_POLICIES)
from .policy_rest import PolicyRest
@@ -56,6 +56,7 @@ class PolicyUpdater(Thread):
self._lock = Lock()
self._queue = Queue()
+
def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
"""enqueue the policy-updates"""
policies_updated = policies_updated or []
@@ -65,7 +66,23 @@ class PolicyUpdater(Thread):
"enqueue request_id %s policies_updated %s policies_removed %s",
((audit and audit.request_id) or "none"),
json.dumps(policies_updated), json.dumps(policies_removed))
- self._queue.put((audit, policies_updated, policies_removed))
+
+ with self._lock:
+ self._queue.put((audit, policies_updated, policies_removed))
+
+
+ def catch_up(self, audit=None):
+ """need to bring the latest policies to DCAE-Controller"""
+ with self._lock:
+ if not self._aud_catch_up:
+ self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+ PolicyUpdater._logger.info(
+ "catch_up %s request_id %s",
+ self._aud_catch_up.req_message, self._aud_catch_up.request_id
+ )
+
+ self.enqueue()
+
def run(self):
"""wait and run the policy-update in thread"""
@@ -78,32 +95,15 @@ class PolicyUpdater(Thread):
json.dumps(policies_updated), json.dumps(policies_removed))
if not self._keep_running():
- self._queue.task_done()
break
if self._on_catch_up():
self._reset_queue()
continue
elif not queued_audit:
- self._queue.task_done()
continue
- updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (queued_audit, policies_updated, policies_removed))
-
- message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
-
- deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
-
- self._queue.task_done()
- queued_audit.audit_done()
-
- if deployment_handler_changed:
- self._catch_up_prev_message = None
- self._pause_catch_up_timer()
- self.catch_up()
- elif not queued_audit.is_success():
- self._catch_up_prev_message = None
+ self._on_policies_update(queued_audit, policies_updated, policies_removed)
PolicyUpdater._logger.info("exit policy-updater")
@@ -116,17 +116,6 @@ class PolicyUpdater(Thread):
self._aud_shutdown.audit_done()
return keep_running
- def catch_up(self, audit=None):
- """need to bring the latest policies to DCAE-Controller"""
- with self._lock:
- self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
- PolicyUpdater._logger.info(
- "catch_up %s request_id %s",
- self._aud_catch_up.req_message, self._aud_catch_up.request_id
- )
-
- self.enqueue()
-
def _run_catch_up_timer(self):
"""create and start the catch_up timer"""
if not self._catch_up_interval:
@@ -190,9 +179,9 @@ class PolicyUpdater(Thread):
def _reset_queue(self):
"""clear up the queue"""
with self._lock:
- self._aud_catch_up = None
- self._queue.task_done()
- self._queue = Queue()
+ if not self._aud_catch_up and not self._aud_shutdown:
+ with self._queue.mutex:
+ self._queue.queue.clear()
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
@@ -207,37 +196,94 @@ class PolicyUpdater(Thread):
log_line = "catch_up {0} request_id {1}".format(
aud_catch_up.req_message, aud_catch_up.request_id
)
+ try:
+ PolicyUpdater._logger.info(log_line)
+ self._pause_catch_up_timer()
- PolicyUpdater._logger.info(log_line)
- self._pause_catch_up_timer()
+ catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+ catch_up_message[CATCH_UP] = True
- catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
- catch_up_message[CATCH_UP] = True
-
- catch_up_result = ""
- if not aud_catch_up.is_success():
- catch_up_result = "- not sending catch-up to deployment-handler due to errors"
- PolicyUpdater._logger.warn(catch_up_result)
- elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
- DeployHandler.policy_update(aud_catch_up, catch_up_message)
- if aud_catch_up.is_success():
- catch_up_result = "- sent catch-up to deployment-handler"
- else:
- catch_up_result = "- failed to send catch-up to deployment-handler"
+ catch_up_result = ""
+ if not aud_catch_up.is_success():
+ catch_up_result = "- not sending catch-up to deployment-handler due to errors"
PolicyUpdater._logger.warn(catch_up_result)
- else:
- catch_up_result = "- skipped sending the same policies"
- success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
- PolicyUpdater._logger.info(log_line + " " + catch_up_result)
- PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+ elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+ catch_up_result = "- skipped sending the same policies"
+ else:
+ DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+ if not aud_catch_up.is_success():
+ catch_up_result = "- failed to send catch-up to deployment-handler"
+ PolicyUpdater._logger.warn(catch_up_result)
+ else:
+ catch_up_result = "- sent catch-up to deployment-handler"
+ success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(log_line + " " + catch_up_result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(aud_catch_up.request_id, type(ex).__name__, str(ex),
+ "on_catch_up", log_line + " " + catch_up_result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
if not success:
self._catch_up_prev_message = None
self._run_catch_up_timer()
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_catch_up.health(full=True)))
return success
+
+ def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
+ """handle the event of policy-updates from the queue"""
+ deployment_handler_changed = None
+ result = ""
+
+ log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
+ ((queued_audit and queued_audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+ try:
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (queued_audit, policies_updated, policies_removed))
+
+ if not queued_audit.is_success():
+ result = "- not sending policy-updates to deployment-handler due to errors"
+ PolicyUpdater._logger.warn(result)
+ else:
+ message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+ deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+ if not queued_audit.is_success():
+ result = "- failed to send policy-updates to deployment-handler"
+ PolicyUpdater._logger.warn(result)
+ else:
+ result = "- sent policy-updates to deployment-handler"
+
+ success, _, _ = queued_audit.audit_done(result=result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(queued_audit.request_id, type(ex).__name__, str(ex),
+ "on_policies_update", log_line + " " + result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
+
+ if deployment_handler_changed:
+ self._catch_up_prev_message = None
+ self._pause_catch_up_timer()
+ self.catch_up()
+ elif not success:
+ self._catch_up_prev_message = None
+
+
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")