aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py107
1 files changed, 61 insertions, 46 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 1f1539f..9732f69 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -26,6 +26,7 @@ from threading import Thread, Lock
from .policy_rest import PolicyRest
from .deploy_handler import DeployHandler
+from .onap.audit import Audit
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -33,40 +34,45 @@ class PolicyUpdater(Thread):
def __init__(self):
"""init static config of PolicyUpdater."""
- Thread.__init__(self)
- self.name = "policy_updater"
+ Thread.__init__(self, name="policy_updater")
self.daemon = True
- self._req_shutdown = None
- self._req_catch_up = None
+ self._aud_shutdown = None
+ self._aud_catch_up = None
self._lock = Lock()
self._queue = Queue()
- def enqueue(self, audit=None, policy_names=None):
- """enqueue the policy-names"""
- policy_names = policy_names or []
- PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names))
- self._queue.put((audit, policy_names))
+ def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
+ """enqueue the policy-updates"""
+ policies_updated = policies_updated or []
+ policies_removed = policies_removed or []
+
+ PolicyUpdater._logger.info(
+ "policies_updated %s policies_removed %s",
+ json.dumps(policies_updated), json.dumps(policies_removed))
+ self._queue.put((audit, policies_updated, policies_removed))
def run(self):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- audit, policy_names = self._queue.get()
- PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names))
+ audit, policies_updated, policies_removed = self._queue.get()
+ PolicyUpdater._logger.info(
+ "got policies_updated %s policies_removed %s",
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
if not self._keep_running():
self._queue.task_done()
break
- if self._on_catch_up():
- continue
- if not policy_names:
- self._queue.task_done()
+ if self._on_catch_up(audit) or not audit:
continue
- updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names))
- PolicyUpdater.policy_update(audit, updated_policies)
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (audit, policies_updated, policies_removed))
+
+ DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies)
audit.audit_done()
self._queue.task_done()
@@ -74,51 +80,60 @@ class PolicyUpdater(Thread):
def _keep_running(self):
"""thread-safe check whether to continue running"""
- self._lock.acquire()
- keep_running = not self._req_shutdown
- self._lock.release()
- if self._req_shutdown:
- self._req_shutdown.audit_done()
+ with self._lock:
+ keep_running = not self._aud_shutdown
+
+ if self._aud_shutdown:
+ self._aud_shutdown.audit_done()
return keep_running
def catch_up(self, audit):
"""need to bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- self._req_catch_up = audit
- self._lock.release()
+ PolicyUpdater._logger.info("catch_up requested")
+ with self._lock:
+ self._aud_catch_up = audit
+
self.enqueue()
- def _on_catch_up(self):
- """Bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- req_catch_up = self._req_catch_up
- if self._req_catch_up:
- self._req_catch_up = None
+ def _reset_queue(self):
+ """clear up the queue"""
+ with self._lock:
+ self._aud_catch_up = None
self._queue.task_done()
self._queue = Queue()
+
+ def _on_catch_up(self, audit):
+ """Bring the latest policies to DCAE-Controller"""
+ self._lock.acquire()
+ aud_catch_up = self._aud_catch_up
+ if self._aud_catch_up:
+ self._aud_catch_up = None
self._lock.release()
- if not req_catch_up:
+
+ if not aud_catch_up:
return False
PolicyUpdater._logger.info("catch_up")
- latest_policies = PolicyRest.get_latest_policies(req_catch_up)
- PolicyUpdater.policy_update(req_catch_up, latest_policies)
- req_catch_up.audit_done()
- return True
-
- @staticmethod
- def policy_update(audit, updated_policies):
- """Invoke deploy-handler"""
- if updated_policies:
- PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies))
- DeployHandler.policy_update(audit, updated_policies)
+ latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up)
+
+ if not aud_catch_up.is_success():
+ PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
+ if not audit:
+ self._queue.task_done()
+ else:
+ DeployHandler.policy_update(
+ aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True)
+ self._reset_queue()
+ success, _, _ = aud_catch_up.audit_done()
+ PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+
+ return success
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")
- self._lock.acquire()
- self._req_shutdown = audit
- self._lock.release()
+ with self._lock:
+ self._aud_shutdown = audit
self.enqueue()
if self.is_alive():
self.join()