From 1d693376205c66af93283d04e8e9740c947a7d02 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Fri, 24 Aug 2018 13:15:04 -0400 Subject: 4.2.0 policy-handler - periodic reconfigure - reconfigure == periodically retrieve the policy-handler config from consul-kv and compare to previous config and subconfigs. If changed, reconfigure the subunits - selectively change one or any settings for the following = catch_up timer interval = reconfigure timer interval = deployment-handler url and params (thread-safe) = policy-engine url and params (thread-safe) = web-socket url to policy-engine (through a callback) - each subunit has its own Settings that keep track of changes - try-catch and metrics around discovery - consul API - hidden the secrets from logs - froze the web-socket version to 0.49.0 because 0.50.0 and 0.51.0 are broken - looking around for stable alternatives - fixed-adapted the callbacks passed to the web-socket lib that changed its API in 0.49.0 and later - log the stack on the exception occurring in the web-socket lib - unit test refactoring Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-470 --- policyhandler/policy_updater.py | 144 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 133 insertions(+), 11 deletions(-) (limited to 'policyhandler/policy_updater.py') diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 7733146..8909cc7 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -22,7 +22,7 @@ import json import logging from threading import Event, Lock, Thread -from .config import Config +from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, @@ -115,21 +115,45 @@ class PolicyUpdater(Thread): """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" _logger = logging.getLogger("policy_handler.policy_updater") - def __init__(self): + def __init__(self, on_reconfigure_receiver): """init static config of PolicyUpdater.""" - Thread.__init__(self, name="policy_updater") - self.daemon = True + Thread.__init__(self, name="policy_updater", daemon=True) + self._reconfigure_receiver = on_reconfigure_receiver self._lock = Lock() self._run = Event() + self._settings = Settings(CATCH_UP, Config.RECONFIGURE) self._catch_up_timer = None + self._reconfigure_timer = None + self._aud_shutdown = None self._aud_catch_up = None + self._aud_reconfigure = None self._policy_update = _PolicyUpdate() - catch_up_config = Config.settings.get(CATCH_UP, {}) - self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_interval = None + self._reconfigure_interval = None + self._set_timer_intervals() + + def _set_timer_intervals(self): + """set intervals on timers""" + self._settings.set_config(Config.discovered_config) + if not self._settings.is_changed(): + self._settings.commit_change() + return False + + _, catch_up = self._settings.get_by_key(CATCH_UP, {}) + self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60 + + _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) + self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 + + PolicyUpdater._logger.info( + "intervals: catch_up(%s) reconfigure(%s): %s", + self._catch_up_interval, self._reconfigure_interval, self._settings) + self._settings.commit_change() + return True def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" @@ -148,8 +172,20 @@ class PolicyUpdater(Thread): ) self._run.set() + def _reconfigure(self): + """job to check for and bring in the updated config for policy-handler""" + with self._lock: + if not self._aud_reconfigure: + self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) + PolicyUpdater._logger.info( + "reconfigure %s request_id %s", + self._aud_reconfigure.req_message, self._aud_reconfigure.request_id + ) + self._run.set() + def run(self): """wait and run the policy-update in thread""" + self._run_reconfigure_timer() while True: PolicyUpdater._logger.info("waiting for policy-updates...") self._run.wait() @@ -157,6 +193,11 @@ class PolicyUpdater(Thread): with self._lock: self._run.clear() + if not self._keep_running(): + break + + self._on_reconfigure() + if not self._keep_running(): break @@ -183,7 +224,7 @@ class PolicyUpdater(Thread): if self._catch_up_timer: self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) - self._catch_up_timer.next() + self._catch_up_timer.next(self._catch_up_interval) return self._catch_up_timer = StepTimer( @@ -196,14 +237,34 @@ class PolicyUpdater(Thread): self._logger.info("started catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.start() + def _run_reconfigure_timer(self): + """create and start the reconfigure timer""" + if not self._reconfigure_interval: + return + + if self._reconfigure_timer: + self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.next(self._reconfigure_interval) + return + + self._reconfigure_timer = StepTimer( + "reconfigure_timer", + self._reconfigure_interval, + PolicyUpdater._reconfigure, + PolicyUpdater._logger, + self + ) + self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.start() + def _pause_catch_up_timer(self): """pause catch_up_timer""" if self._catch_up_timer: self._logger.info("pause catch_up_timer") self._catch_up_timer.pause() - def _stop_catch_up_timer(self): - """stop and destroy the catch_up_timer""" + def _stop_timers(self): + """stop and destroy the catch_up and reconfigure timers""" if self._catch_up_timer: self._logger.info("stopping catch_up_timer") self._catch_up_timer.stop() @@ -211,6 +272,66 @@ class PolicyUpdater(Thread): self._catch_up_timer = None self._logger.info("stopped catch_up_timer") + if self._reconfigure_timer: + self._logger.info("stopping reconfigure_timer") + self._reconfigure_timer.stop() + self._reconfigure_timer.join() + self._reconfigure_timer = None + self._logger.info("stopped reconfigure_timer") + + def _on_reconfigure(self): + """bring the latest config and reconfigure""" + with self._lock: + aud_reconfigure = self._aud_reconfigure + if self._aud_reconfigure: + self._aud_reconfigure = None + + if not aud_reconfigure: + return + + log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id) + reconfigure_result = "" + try: + PolicyUpdater._logger.info(log_line) + Config.discover(aud_reconfigure) + if not Config.discovered_config.is_changed(): + reconfigure_result = " -- config not changed" + else: + reconfigure_result = " -- config changed for:" + if self._set_timer_intervals(): + reconfigure_result += " timer_intervals" + + if PolicyRest.reconfigure(): + reconfigure_result += " " + Config.FIELD_POLICY_ENGINE + + if DeployHandler.reconfigure(aud_reconfigure): + reconfigure_result += " " + Config.DEPLOY_HANDLER + + if self._reconfigure_receiver(): + reconfigure_result += " web-socket" + + reconfigure_result += " -- change: {}".format(Config.discovered_config) + + Config.discovered_config.commit_change() + aud_reconfigure.audit_done(result=reconfigure_result) + PolicyUpdater._logger.info(log_line + reconfigure_result) + + except Exception as ex: + error_msg = "crash {} {}{}: {}: {}".format( + "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) + + PolicyUpdater._logger.exception(error_msg) + aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_reconfigure.audit_done(result=error_msg) + + self._run_reconfigure_timer() + + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_reconfigure.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) + + def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: @@ -239,7 +360,7 @@ class PolicyUpdater(Thread): catch_up_result = "- not sending empty catch-up to deployment-handler" else: aud_catch_up.reset_http_status_not_found() - DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + DeployHandler.policy_update(aud_catch_up, catch_up_message) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" PolicyUpdater._logger.warning(catch_up_result) @@ -256,6 +377,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.exception(error_msg) aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_catch_up.audit_done(result=error_msg) success = False self._run_catch_up_timer() @@ -342,7 +464,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = audit self._run.set() - self._stop_catch_up_timer() + self._stop_timers() if self.is_alive(): self.join() -- cgit 1.2.3-korg