From a2b262994a62286e1c98fd9939115c6e64ed27ee Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Tue, 13 Mar 2018 17:50:51 -0400 Subject: 2.3.0 policy-handler - periodically catch_up - periodically catchup - interval is configurable = max_skips defines the number of times the catch_up message that is identical to previous one can be skipped - do not catchup more often than the interval even between the manual catchup and auto catchup - do not send the same catchup message twice in a row to the deployment-handler but not exceed a hard limit on catchup max_skips - catchup if the deployment-handler instance is changed Change-Id: I9a3fcc941e8a9e553abb3952dd882c37e0f5fdfe Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-389 --- policyhandler/policy_updater.py | 118 +++++++++++++++++++++++++++++++++------- 1 file changed, 98 insertions(+), 20 deletions(-) (limited to 'policyhandler/policy_updater.py') diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 01be550..e12af88 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -18,15 +18,21 @@ """policy-updater thread""" +import copy import json import logging from Queue import Queue from threading import Lock, Thread +from .config import Config from .deploy_handler import DeployHandler from .onap.audit import Audit -from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES +from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, + REMOVED_POLICIES) from .policy_rest import PolicyRest +from .policy_utils import Utils +from .step_timer import StepTimer + class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -37,9 +43,17 @@ class PolicyUpdater(Thread): Thread.__init__(self, name="policy_updater") self.daemon = True + self._catch_up_timer = None self._aud_shutdown = None self._aud_catch_up = None + catch_up_config = Config.config.get("catch_up", {}) + self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 + self._catch_up_skips = 0 + self._catch_up_prev_message = None + self._need_to_catch_up = False + self._lock = Lock() self._queue = Queue() @@ -57,7 +71,7 @@ class PolicyUpdater(Thread): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policies_updated, policies_removed = self._queue.get() + queued_audit, policies_updated, policies_removed = self._queue.get() PolicyUpdater._logger.info( "got policies_updated %s policies_removed %s", json.dumps(policies_updated), json.dumps(policies_removed)) @@ -66,17 +80,22 @@ class PolicyUpdater(Thread): self._queue.task_done() break - if self._on_catch_up(audit) or not audit: + if self._on_catch_up(queued_audit) or not queued_audit: continue updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (audit, policies_updated, policies_removed)) + (queued_audit, policies_updated, policies_removed)) message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - DeployHandler.policy_update(audit, message) - audit.audit_done() + self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message) + + queued_audit.audit_done() self._queue.task_done() + if self._need_to_catch_up: + self._pause_catch_up_timer() + self.catch_up() + PolicyUpdater._logger.info("exit policy-updater") def _keep_running(self): @@ -88,14 +107,68 @@ class PolicyUpdater(Thread): self._aud_shutdown.audit_done() return keep_running - def catch_up(self, audit): + def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" PolicyUpdater._logger.info("catch_up requested") with self._lock: - self._aud_catch_up = audit + self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) self.enqueue() + def _run_catch_up_timer(self): + """create and start the catch_up timer""" + if not self._catch_up_interval: + return + + if self._catch_up_timer: + self._catch_up_timer.next() + self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) + return + + self._catch_up_timer = StepTimer( + "catch_up_timer", + self._catch_up_interval, + PolicyUpdater.catch_up, + self + ) + self._catch_up_timer.start() + self._logger.info("started catch_up_timer in %s", self._catch_up_interval) + + def _pause_catch_up_timer(self): + """stop catch_up_timer""" + if self._catch_up_timer: + self._logger.info("pause catch_up_timer") + self._catch_up_timer.pause() + + def _stop_catch_up_timer(self): + """stop catch_up_timer""" + if self._catch_up_timer: + self._logger.info("stopping catch_up_timer") + self._catch_up_timer.stop() + self._catch_up_timer.join() + self._catch_up_timer = None + self._logger.info("stopped catch_up_timer") + + def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): + """try not to send the duplicate messages on auto catchup unless hitting the max count""" + if self._need_to_catch_up \ + or aud_catch_up.req_message != AUTO_CATCH_UP \ + or self._catch_up_skips >= self._catch_up_max_skips \ + or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): + self._catch_up_skips = 0 + self._need_to_catch_up = False + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + return True + + self._catch_up_skips += 1 + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + log_line = "skip {0} sending the same catch_up: {1}".format( + self._catch_up_skips, self._catch_up_prev_message + ) + self._logger.info(log_line) + aud_catch_up.info(log_line) + return False + def _reset_queue(self): """clear up the queue""" with self._lock: @@ -103,32 +176,34 @@ class PolicyUpdater(Thread): self._queue.task_done() self._queue = Queue() - def _on_catch_up(self, audit): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - aud_catch_up = self._aud_catch_up - if self._aud_catch_up: - self._aud_catch_up = None - self._lock.release() + def _on_catch_up(self, queued_audit): + """bring all the latest policies to DCAE-Controller""" + with self._lock: + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None if not aud_catch_up: return False PolicyUpdater._logger.info("catch_up") + self._pause_catch_up_timer() - result = PolicyRest.get_latest_policies(aud_catch_up) - result[CATCH_UP] = True + catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) + catch_up_message[CATCH_UP] = True if not aud_catch_up.is_success(): PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") - if not audit: + if not queued_audit: self._queue.task_done() - else: - DeployHandler.policy_update(aud_catch_up, result) + elif self._need_to_send_catch_up(aud_catch_up, catch_up_message): + DeployHandler.policy_update(aud_catch_up, catch_up_message) self._reset_queue() success, _, _ = aud_catch_up.audit_done() PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + self._run_catch_up_timer() + return success def shutdown(self, audit): @@ -137,5 +212,8 @@ class PolicyUpdater(Thread): with self._lock: self._aud_shutdown = audit self.enqueue() + + self._stop_catch_up_timer() + if self.is_alive(): self.join() -- cgit 1.2.3-korg