aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
new file mode 100644
index 0000000..1f1539f
--- /dev/null
+++ b/policyhandler/policy_updater.py
@@ -0,0 +1,124 @@
+"""policy-updater thread"""
+
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+import json
+from Queue import Queue
+from threading import Thread, Lock
+
+from .policy_rest import PolicyRest
+from .deploy_handler import DeployHandler
+
+class PolicyUpdater(Thread):
+ """queue and handle the policy-updates in a separate thread"""
+ _logger = logging.getLogger("policy_handler.policy_updater")
+
+ def __init__(self):
+ """init static config of PolicyUpdater."""
+ Thread.__init__(self)
+ self.name = "policy_updater"
+ self.daemon = True
+
+ self._req_shutdown = None
+ self._req_catch_up = None
+
+ self._lock = Lock()
+ self._queue = Queue()
+
+ def enqueue(self, audit=None, policy_names=None):
+ """enqueue the policy-names"""
+ policy_names = policy_names or []
+ PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names))
+ self._queue.put((audit, policy_names))
+
+ def run(self):
+ """wait and run the policy-update in thread"""
+ while True:
+ PolicyUpdater._logger.info("waiting for policy-updates...")
+ audit, policy_names = self._queue.get()
+ PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names))
+ if not self._keep_running():
+ self._queue.task_done()
+ break
+ if self._on_catch_up():
+ continue
+
+ if not policy_names:
+ self._queue.task_done()
+ continue
+
+ updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names))
+ PolicyUpdater.policy_update(audit, updated_policies)
+ audit.audit_done()
+ self._queue.task_done()
+
+ PolicyUpdater._logger.info("exit policy-updater")
+
+ def _keep_running(self):
+ """thread-safe check whether to continue running"""
+ self._lock.acquire()
+ keep_running = not self._req_shutdown
+ self._lock.release()
+ if self._req_shutdown:
+ self._req_shutdown.audit_done()
+ return keep_running
+
+ def catch_up(self, audit):
+ """need to bring the latest policies to DCAE-Controller"""
+ self._lock.acquire()
+ self._req_catch_up = audit
+ self._lock.release()
+ self.enqueue()
+
+ def _on_catch_up(self):
+ """Bring the latest policies to DCAE-Controller"""
+ self._lock.acquire()
+ req_catch_up = self._req_catch_up
+ if self._req_catch_up:
+ self._req_catch_up = None
+ self._queue.task_done()
+ self._queue = Queue()
+ self._lock.release()
+ if not req_catch_up:
+ return False
+
+ PolicyUpdater._logger.info("catch_up")
+ latest_policies = PolicyRest.get_latest_policies(req_catch_up)
+ PolicyUpdater.policy_update(req_catch_up, latest_policies)
+ req_catch_up.audit_done()
+ return True
+
+ @staticmethod
+ def policy_update(audit, updated_policies):
+ """Invoke deploy-handler"""
+ if updated_policies:
+ PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies))
+ DeployHandler.policy_update(audit, updated_policies)
+
+ def shutdown(self, audit):
+ """Shutdown the policy-updater"""
+ PolicyUpdater._logger.info("shutdown policy-updater")
+ self._lock.acquire()
+ self._req_shutdown = audit
+ self._lock.release()
+ self.enqueue()
+ if self.is_alive():
+ self.join()