diff options
author | Alex Shatov <alexs@att.com> | 2018-08-24 13:15:04 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-08-24 13:15:04 -0400 |
commit | 1d693376205c66af93283d04e8e9740c947a7d02 (patch) | |
tree | 9188af307614661c1afbe50cdaa2fa8a2cdc691c /policyhandler/policy_receiver.py | |
parent | 1cddbc70e4799970dc606014ef79e025d6a8e722 (diff) |
4.2.0 policy-handler - periodic reconfigure
- reconfigure == periodically retrieve the policy-handler config
from consul-kv and compare to previous config and subconfigs.
If changed, reconfigure the subunits
- selectively change one or any settings for the following
= catch_up timer interval
= reconfigure timer interval
= deployment-handler url and params (thread-safe)
= policy-engine url and params (thread-safe)
= web-socket url to policy-engine (through a callback)
- each subunit has its own Settings that keep track of changes
- try-catch and metrics around discovery - consul API
- hidden the secrets from logs
- froze the web-socket version to 0.49.0 because 0.50.0
and 0.51.0 are broken - looking around for stable alternatives
- fixed-adapted the callbacks passed to the web-socket lib
that changed its API in 0.49.0 and later
- log the stack on the exception occurring in the web-socket lib
- unit test refactoring
Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-470
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r-- | policyhandler/policy_receiver.py | 66 |
1 files changed, 46 insertions, 20 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index bb33cd5..1edb24d 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -32,7 +32,7 @@ from threading import Lock, Thread import websocket -from .config import Config +from .config import Config, Settings from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater @@ -47,25 +47,50 @@ class _PolicyReceiver(Thread): def __init__(self): """web-socket inside the thread to receive policy notifications from PolicyEngine""" - Thread.__init__(self, name="policy_receiver") - self.daemon = True + Thread.__init__(self, name="policy_receiver", daemon=True) self._lock = Lock() self._keep_running = True + self._settings = Settings(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] - self.web_socket_url = resturl = config["url"] + config["path_pdp"] - - if resturl.startswith("https:"): - self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" - else: - self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" - + self._web_socket_url = None self._web_socket = None + self.reconfigure() - self._policy_updater = PolicyUpdater() + self._policy_updater = PolicyUpdater(self.reconfigure) self._policy_updater.start() + def reconfigure(self): + """configure and reconfigure the web-socket""" + with self._lock: + self._settings.set_config(Config.discovered_config) + changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + + if not changed: + self._settings.commit_change() + return False + + prev_web_socket_url = self._web_socket_url + resturl = config.get("url", "") + config.get("path_pdp", "") + + if resturl.startswith("https:"): + self._web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self._web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + if self._web_socket_url == prev_web_socket_url: + _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + return False + + _PolicyReceiver._logger.info("changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + + self._stop_notifications() + return True + def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" websocket.enableTrace(True) @@ -80,9 +105,9 @@ class _PolicyReceiver(Thread): time.sleep(5) _PolicyReceiver._logger.info( - "connecting to policy-notifications at: %s", self.web_socket_url) + "connecting to policy-notifications at: %s", self._web_socket_url) self._web_socket = websocket.WebSocketApp( - self.web_socket_url, + self._web_socket_url, on_message=self._on_pdp_message, on_close=self._on_ws_close, on_error=self._on_ws_error @@ -101,15 +126,16 @@ class _PolicyReceiver(Thread): return keep_running def _stop_notifications(self): - """Shuts down the AutoNotification service if running.""" + """close the web-socket == stops the notification service if running.""" with self._lock: if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: self._web_socket.close() _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") - def _on_pdp_message(self, _, message): + def _on_pdp_message(self, *args): """received the notification from PDP""" try: + message = args and args[-1] _PolicyReceiver._logger.info("Received notification message: %s", message) if not message: return @@ -144,13 +170,13 @@ class _PolicyReceiver(Thread): _PolicyReceiver._logger.exception(error_msg) - def _on_ws_error(self, _, error): + def _on_ws_error(self, error): """report an error""" - _PolicyReceiver._logger.error("policy-notification error: %s", error) + _PolicyReceiver._logger.exception("policy-notification error: %s", str(error)) - def _on_ws_close(self, _): + def _on_ws_close(self, code, reason): """restart web-socket on close""" - _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason) def shutdown(self, audit): """Shutdown the policy-receiver""" |