diff options
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r-- | policyhandler/policy_updater.py | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py new file mode 100644 index 0000000..1f1539f --- /dev/null +++ b/policyhandler/policy_updater.py @@ -0,0 +1,124 @@ +"""policy-updater thread""" + +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import logging +import json +from Queue import Queue +from threading import Thread, Lock + +from .policy_rest import PolicyRest +from .deploy_handler import DeployHandler + +class PolicyUpdater(Thread): + """queue and handle the policy-updates in a separate thread""" + _logger = logging.getLogger("policy_handler.policy_updater") + + def __init__(self): + """init static config of PolicyUpdater.""" + Thread.__init__(self) + self.name = "policy_updater" + self.daemon = True + + self._req_shutdown = None + self._req_catch_up = None + + self._lock = Lock() + self._queue = Queue() + + def enqueue(self, audit=None, policy_names=None): + """enqueue the policy-names""" + policy_names = policy_names or [] + PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names)) + self._queue.put((audit, policy_names)) + + def run(self): + """wait and run the policy-update in thread""" + while True: + PolicyUpdater._logger.info("waiting for policy-updates...") + audit, policy_names = self._queue.get() + PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names)) + if not self._keep_running(): + self._queue.task_done() + break + if self._on_catch_up(): + continue + + if not policy_names: + self._queue.task_done() + continue + + updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names)) + PolicyUpdater.policy_update(audit, updated_policies) + audit.audit_done() + self._queue.task_done() + + PolicyUpdater._logger.info("exit policy-updater") + + def _keep_running(self): + """thread-safe check whether to continue running""" + self._lock.acquire() + keep_running = not self._req_shutdown + self._lock.release() + if self._req_shutdown: + self._req_shutdown.audit_done() + return keep_running + + def catch_up(self, audit): + """need to bring the latest policies to DCAE-Controller""" + self._lock.acquire() + self._req_catch_up = audit + self._lock.release() + self.enqueue() + + def _on_catch_up(self): + """Bring the latest policies to DCAE-Controller""" + self._lock.acquire() + req_catch_up = self._req_catch_up + if self._req_catch_up: + self._req_catch_up = None + self._queue.task_done() + self._queue = Queue() + self._lock.release() + if not req_catch_up: + return False + + PolicyUpdater._logger.info("catch_up") + latest_policies = PolicyRest.get_latest_policies(req_catch_up) + PolicyUpdater.policy_update(req_catch_up, latest_policies) + req_catch_up.audit_done() + return True + + @staticmethod + def policy_update(audit, updated_policies): + """Invoke deploy-handler""" + if updated_policies: + PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies)) + DeployHandler.policy_update(audit, updated_policies) + + def shutdown(self, audit): + """Shutdown the policy-updater""" + PolicyUpdater._logger.info("shutdown policy-updater") + self._lock.acquire() + self._req_shutdown = audit + self._lock.release() + self.enqueue() + if self.is_alive(): + self.join() |