aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py69
1 files changed, 49 insertions, 20 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index e12af88..2c2f729 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -52,7 +52,6 @@ class PolicyUpdater(Thread):
self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
self._catch_up_skips = 0
self._catch_up_prev_message = None
- self._need_to_catch_up = False
self._lock = Lock()
self._queue = Queue()
@@ -63,7 +62,8 @@ class PolicyUpdater(Thread):
policies_removed = policies_removed or []
PolicyUpdater._logger.info(
- "policies_updated %s policies_removed %s",
+ "enqueue request_id %s policies_updated %s policies_removed %s",
+ ((audit and audit.request_id) or "none"),
json.dumps(policies_updated), json.dumps(policies_removed))
self._queue.put((audit, policies_updated, policies_removed))
@@ -73,28 +73,37 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.info("waiting for policy-updates...")
queued_audit, policies_updated, policies_removed = self._queue.get()
PolicyUpdater._logger.info(
- "got policies_updated %s policies_removed %s",
+ "got request_id %s policies_updated %s policies_removed %s",
+ ((queued_audit and queued_audit.request_id) or "none"),
json.dumps(policies_updated), json.dumps(policies_removed))
if not self._keep_running():
self._queue.task_done()
break
- if self._on_catch_up(queued_audit) or not queued_audit:
+ if self._on_catch_up():
+ self._reset_queue()
+ continue
+ elif not queued_audit:
+ self._queue.task_done()
continue
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
(queued_audit, policies_updated, policies_removed))
message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
- self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message)
- queued_audit.audit_done()
+ deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+
self._queue.task_done()
+ queued_audit.audit_done()
- if self._need_to_catch_up:
+ if deployment_handler_changed:
+ self._catch_up_prev_message = None
self._pause_catch_up_timer()
self.catch_up()
+ elif not queued_audit.is_success():
+ self._catch_up_prev_message = None
PolicyUpdater._logger.info("exit policy-updater")
@@ -109,9 +118,10 @@ class PolicyUpdater(Thread):
def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
- PolicyUpdater._logger.info("catch_up requested")
with self._lock:
self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+ PolicyUpdater._logger.info("catch_up %s request_id %s",
+ self._aud_catch_up.req_message, self._aud_catch_up.request_id)
self.enqueue()
@@ -151,19 +161,24 @@ class PolicyUpdater(Thread):
def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
"""try not to send the duplicate messages on auto catchup unless hitting the max count"""
- if self._need_to_catch_up \
- or aud_catch_up.req_message != AUTO_CATCH_UP \
+ if aud_catch_up.req_message != AUTO_CATCH_UP \
or self._catch_up_skips >= self._catch_up_max_skips \
or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
self._catch_up_skips = 0
- self._need_to_catch_up = False
self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "going to send the catch_up {0}: {1}".format(
+ aud_catch_up.req_message,
+ json.dumps(self._catch_up_prev_message)
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
return True
self._catch_up_skips += 1
self._catch_up_prev_message = copy.deepcopy(catch_up_message)
- log_line = "skip {0} sending the same catch_up: {1}".format(
- self._catch_up_skips, self._catch_up_prev_message
+ log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format(
+ self._catch_up_skips, self._catch_up_max_skips,
+ aud_catch_up.req_message, json.dumps(self._catch_up_prev_message)
)
self._logger.info(log_line)
aud_catch_up.info(log_line)
@@ -176,7 +191,7 @@ class PolicyUpdater(Thread):
self._queue.task_done()
self._queue = Queue()
- def _on_catch_up(self, queued_audit):
+ def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
aud_catch_up = self._aud_catch_up
@@ -186,22 +201,36 @@ class PolicyUpdater(Thread):
if not aud_catch_up:
return False
- PolicyUpdater._logger.info("catch_up")
+ log_line = "catch_up {0} request_id {1}".format(
+ aud_catch_up.req_message, aud_catch_up.request_id
+ )
+
+ PolicyUpdater._logger.info(log_line)
self._pause_catch_up_timer()
catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
catch_up_message[CATCH_UP] = True
+ catch_up_result = ""
if not aud_catch_up.is_success():
- PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
- if not queued_audit:
- self._queue.task_done()
+ catch_up_result = "- not sending catch-up to deployment-handler due to errors"
+ PolicyUpdater._logger.warn(catch_up_result)
elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
DeployHandler.policy_update(aud_catch_up, catch_up_message)
- self._reset_queue()
- success, _, _ = aud_catch_up.audit_done()
+ if aud_catch_up.is_success():
+ catch_up_result = "- sent catch-up to deployment-handler"
+ else:
+ catch_up_result = "- failed to send catch-up to deployment-handler"
+ PolicyUpdater._logger.warn(catch_up_result)
+ else:
+ catch_up_result = "- skipped sending the same policies"
+ success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(log_line + " " + catch_up_result)
PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+ if not success:
+ self._catch_up_prev_message = None
+
self._run_catch_up_timer()
return success