"""policy-updater thread""" # org.onap.dcae # ================================================================================ # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. import logging import json from Queue import Queue from threading import Thread, Lock from .policy_rest import PolicyRest from .deploy_handler import DeployHandler class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" _logger = logging.getLogger("policy_handler.policy_updater") def __init__(self): """init static config of PolicyUpdater.""" Thread.__init__(self) self.name = "policy_updater" self.daemon = True self._req_shutdown = None self._req_catch_up = None self._lock = Lock() self._queue = Queue() def enqueue(self, audit=None, policy_names=None): """enqueue the policy-names""" policy_names = policy_names or [] PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names)) self._queue.put((audit, policy_names)) def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") audit, policy_names = self._queue.get() PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names)) if not self._keep_running(): self._queue.task_done() break if self._on_catch_up(): continue if not policy_names: self._queue.task_done() continue updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names)) PolicyUpdater.policy_update(audit, updated_policies) audit.audit_done() self._queue.task_done() PolicyUpdater._logger.info("exit policy-updater") def _keep_running(self): """thread-safe check whether to continue running""" self._lock.acquire() keep_running = not self._req_shutdown self._lock.release() if self._req_shutdown: self._req_shutdown.audit_done() return keep_running def catch_up(self, audit): """need to bring the latest policies to DCAE-Controller""" self._lock.acquire() self._req_catch_up = audit self._lock.release() self.enqueue() def _on_catch_up(self): """Bring the latest policies to DCAE-Controller""" self._lock.acquire() req_catch_up = self._req_catch_up if self._req_catch_up: self._req_catch_up = None self._queue.task_done() self._queue = Queue() self._lock.release() if not req_catch_up: return False PolicyUpdater._logger.info("catch_up") latest_policies = PolicyRest.get_latest_policies(req_catch_up) PolicyUpdater.policy_update(req_catch_up, latest_policies) req_catch_up.audit_done() return True @staticmethod def policy_update(audit, updated_policies): """Invoke deploy-handler""" if updated_policies: PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies)) DeployHandler.policy_update(audit, updated_policies) def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") self._lock.acquire() self._req_shutdown = audit self._lock.release() self.enqueue() if self.is_alive(): self.join()