diff options
author | Alex Shatov <alexs@att.com> | 2018-01-10 11:00:50 -0500 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-01-10 11:07:30 -0500 |
commit | 1369bea8b3c24ef063799acefbfc01659878f034 (patch) | |
tree | 95fa3e5580f62be9c1e1d630ed0c6496b9fb03a2 /policyhandler/policy_updater.py | |
parent | dc5da5bf63ae4a4ac11b4b5c46407e58da16fbfe (diff) |
variable collection of policies per component
* new feature variable collection of policies per component in DCAE
* massive refactoring
* dissolved the external PolicyEngine.py into policy_receiver.py
- kept only the web-socket communication to PolicyEngine
* new /healthcheck - shows some stats of service running
* Unit Test coverage 75%
Change-Id: I816b7d5713ae0dd88fa73d3656f272b4f3e7946e
Issue-ID: DCAEGEN2-249
Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r-- | policyhandler/policy_updater.py | 107 |
1 files changed, 61 insertions, 46 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 1f1539f..9732f69 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -26,6 +26,7 @@ from threading import Thread, Lock from .policy_rest import PolicyRest from .deploy_handler import DeployHandler +from .onap.audit import Audit class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -33,40 +34,45 @@ class PolicyUpdater(Thread): def __init__(self): """init static config of PolicyUpdater.""" - Thread.__init__(self) - self.name = "policy_updater" + Thread.__init__(self, name="policy_updater") self.daemon = True - self._req_shutdown = None - self._req_catch_up = None + self._aud_shutdown = None + self._aud_catch_up = None self._lock = Lock() self._queue = Queue() - def enqueue(self, audit=None, policy_names=None): - """enqueue the policy-names""" - policy_names = policy_names or [] - PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names)) - self._queue.put((audit, policy_names)) + def enqueue(self, audit=None, policies_updated=None, policies_removed=None): + """enqueue the policy-updates""" + policies_updated = policies_updated or [] + policies_removed = policies_removed or [] + + PolicyUpdater._logger.info( + "policies_updated %s policies_removed %s", + json.dumps(policies_updated), json.dumps(policies_removed)) + self._queue.put((audit, policies_updated, policies_removed)) def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policy_names = self._queue.get() - PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names)) + audit, policies_updated, policies_removed = self._queue.get() + PolicyUpdater._logger.info( + "got policies_updated %s policies_removed %s", + json.dumps(policies_updated), json.dumps(policies_removed)) + if not self._keep_running(): self._queue.task_done() break - if self._on_catch_up(): - continue - if not policy_names: - self._queue.task_done() + if self._on_catch_up(audit) or not audit: continue - updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names)) - PolicyUpdater.policy_update(audit, updated_policies) + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (audit, policies_updated, policies_removed)) + + DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies) audit.audit_done() self._queue.task_done() @@ -74,51 +80,60 @@ class PolicyUpdater(Thread): def _keep_running(self): """thread-safe check whether to continue running""" - self._lock.acquire() - keep_running = not self._req_shutdown - self._lock.release() - if self._req_shutdown: - self._req_shutdown.audit_done() + with self._lock: + keep_running = not self._aud_shutdown + + if self._aud_shutdown: + self._aud_shutdown.audit_done() return keep_running def catch_up(self, audit): """need to bring the latest policies to DCAE-Controller""" - self._lock.acquire() - self._req_catch_up = audit - self._lock.release() + PolicyUpdater._logger.info("catch_up requested") + with self._lock: + self._aud_catch_up = audit + self.enqueue() - def _on_catch_up(self): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - req_catch_up = self._req_catch_up - if self._req_catch_up: - self._req_catch_up = None + def _reset_queue(self): + """clear up the queue""" + with self._lock: + self._aud_catch_up = None self._queue.task_done() self._queue = Queue() + + def _on_catch_up(self, audit): + """Bring the latest policies to DCAE-Controller""" + self._lock.acquire() + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None self._lock.release() - if not req_catch_up: + + if not aud_catch_up: return False PolicyUpdater._logger.info("catch_up") - latest_policies = PolicyRest.get_latest_policies(req_catch_up) - PolicyUpdater.policy_update(req_catch_up, latest_policies) - req_catch_up.audit_done() - return True - - @staticmethod - def policy_update(audit, updated_policies): - """Invoke deploy-handler""" - if updated_policies: - PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies)) - DeployHandler.policy_update(audit, updated_policies) + latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up) + + if not aud_catch_up.is_success(): + PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") + if not audit: + self._queue.task_done() + else: + DeployHandler.policy_update( + aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True) + self._reset_queue() + success, _, _ = aud_catch_up.audit_done() + PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + + return success def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") - self._lock.acquire() - self._req_shutdown = audit - self._lock.release() + with self._lock: + self._aud_shutdown = audit self.enqueue() if self.is_alive(): self.join() |