diff options
author | Alex Shatov <alexs@att.com> | 2018-08-24 13:15:04 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-08-24 13:15:04 -0400 |
commit | 1d693376205c66af93283d04e8e9740c947a7d02 (patch) | |
tree | 9188af307614661c1afbe50cdaa2fa8a2cdc691c /policyhandler/policy_updater.py | |
parent | 1cddbc70e4799970dc606014ef79e025d6a8e722 (diff) |
4.2.0 policy-handler - periodic reconfigure
- reconfigure == periodically retrieve the policy-handler config
from consul-kv and compare to previous config and subconfigs.
If changed, reconfigure the subunits
- selectively change one or any settings for the following
= catch_up timer interval
= reconfigure timer interval
= deployment-handler url and params (thread-safe)
= policy-engine url and params (thread-safe)
= web-socket url to policy-engine (through a callback)
- each subunit has its own Settings that keep track of changes
- try-catch and metrics around discovery - consul API
- hidden the secrets from logs
- froze the web-socket version to 0.49.0 because 0.50.0
and 0.51.0 are broken - looking around for stable alternatives
- fixed-adapted the callbacks passed to the web-socket lib
that changed its API in 0.49.0 and later
- log the stack on the exception occurring in the web-socket lib
- unit test refactoring
Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-470
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r-- | policyhandler/policy_updater.py | 144 |
1 files changed, 133 insertions, 11 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 7733146..8909cc7 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -22,7 +22,7 @@ import json import logging from threading import Event, Lock, Thread -from .config import Config +from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, @@ -115,21 +115,45 @@ class PolicyUpdater(Thread): """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" _logger = logging.getLogger("policy_handler.policy_updater") - def __init__(self): + def __init__(self, on_reconfigure_receiver): """init static config of PolicyUpdater.""" - Thread.__init__(self, name="policy_updater") - self.daemon = True + Thread.__init__(self, name="policy_updater", daemon=True) + self._reconfigure_receiver = on_reconfigure_receiver self._lock = Lock() self._run = Event() + self._settings = Settings(CATCH_UP, Config.RECONFIGURE) self._catch_up_timer = None + self._reconfigure_timer = None + self._aud_shutdown = None self._aud_catch_up = None + self._aud_reconfigure = None self._policy_update = _PolicyUpdate() - catch_up_config = Config.settings.get(CATCH_UP, {}) - self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_interval = None + self._reconfigure_interval = None + self._set_timer_intervals() + + def _set_timer_intervals(self): + """set intervals on timers""" + self._settings.set_config(Config.discovered_config) + if not self._settings.is_changed(): + self._settings.commit_change() + return False + + _, catch_up = self._settings.get_by_key(CATCH_UP, {}) + self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60 + + _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) + self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 + + PolicyUpdater._logger.info( + "intervals: catch_up(%s) reconfigure(%s): %s", + self._catch_up_interval, self._reconfigure_interval, self._settings) + self._settings.commit_change() + return True def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" @@ -148,8 +172,20 @@ class PolicyUpdater(Thread): ) self._run.set() + def _reconfigure(self): + """job to check for and bring in the updated config for policy-handler""" + with self._lock: + if not self._aud_reconfigure: + self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) + PolicyUpdater._logger.info( + "reconfigure %s request_id %s", + self._aud_reconfigure.req_message, self._aud_reconfigure.request_id + ) + self._run.set() + def run(self): """wait and run the policy-update in thread""" + self._run_reconfigure_timer() while True: PolicyUpdater._logger.info("waiting for policy-updates...") self._run.wait() @@ -160,6 +196,11 @@ class PolicyUpdater(Thread): if not self._keep_running(): break + self._on_reconfigure() + + if not self._keep_running(): + break + if self._on_catch_up(): continue @@ -183,7 +224,7 @@ class PolicyUpdater(Thread): if self._catch_up_timer: self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) - self._catch_up_timer.next() + self._catch_up_timer.next(self._catch_up_interval) return self._catch_up_timer = StepTimer( @@ -196,14 +237,34 @@ class PolicyUpdater(Thread): self._logger.info("started catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.start() + def _run_reconfigure_timer(self): + """create and start the reconfigure timer""" + if not self._reconfigure_interval: + return + + if self._reconfigure_timer: + self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.next(self._reconfigure_interval) + return + + self._reconfigure_timer = StepTimer( + "reconfigure_timer", + self._reconfigure_interval, + PolicyUpdater._reconfigure, + PolicyUpdater._logger, + self + ) + self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.start() + def _pause_catch_up_timer(self): """pause catch_up_timer""" if self._catch_up_timer: self._logger.info("pause catch_up_timer") self._catch_up_timer.pause() - def _stop_catch_up_timer(self): - """stop and destroy the catch_up_timer""" + def _stop_timers(self): + """stop and destroy the catch_up and reconfigure timers""" if self._catch_up_timer: self._logger.info("stopping catch_up_timer") self._catch_up_timer.stop() @@ -211,6 +272,66 @@ class PolicyUpdater(Thread): self._catch_up_timer = None self._logger.info("stopped catch_up_timer") + if self._reconfigure_timer: + self._logger.info("stopping reconfigure_timer") + self._reconfigure_timer.stop() + self._reconfigure_timer.join() + self._reconfigure_timer = None + self._logger.info("stopped reconfigure_timer") + + def _on_reconfigure(self): + """bring the latest config and reconfigure""" + with self._lock: + aud_reconfigure = self._aud_reconfigure + if self._aud_reconfigure: + self._aud_reconfigure = None + + if not aud_reconfigure: + return + + log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id) + reconfigure_result = "" + try: + PolicyUpdater._logger.info(log_line) + Config.discover(aud_reconfigure) + if not Config.discovered_config.is_changed(): + reconfigure_result = " -- config not changed" + else: + reconfigure_result = " -- config changed for:" + if self._set_timer_intervals(): + reconfigure_result += " timer_intervals" + + if PolicyRest.reconfigure(): + reconfigure_result += " " + Config.FIELD_POLICY_ENGINE + + if DeployHandler.reconfigure(aud_reconfigure): + reconfigure_result += " " + Config.DEPLOY_HANDLER + + if self._reconfigure_receiver(): + reconfigure_result += " web-socket" + + reconfigure_result += " -- change: {}".format(Config.discovered_config) + + Config.discovered_config.commit_change() + aud_reconfigure.audit_done(result=reconfigure_result) + PolicyUpdater._logger.info(log_line + reconfigure_result) + + except Exception as ex: + error_msg = "crash {} {}{}: {}: {}".format( + "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) + + PolicyUpdater._logger.exception(error_msg) + aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_reconfigure.audit_done(result=error_msg) + + self._run_reconfigure_timer() + + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_reconfigure.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) + + def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: @@ -239,7 +360,7 @@ class PolicyUpdater(Thread): catch_up_result = "- not sending empty catch-up to deployment-handler" else: aud_catch_up.reset_http_status_not_found() - DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + DeployHandler.policy_update(aud_catch_up, catch_up_message) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" PolicyUpdater._logger.warning(catch_up_result) @@ -256,6 +377,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.exception(error_msg) aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_catch_up.audit_done(result=error_msg) success = False self._run_catch_up_timer() @@ -342,7 +464,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = audit self._run.set() - self._stop_catch_up_timer() + self._stop_timers() if self.is_alive(): self.join() |