diff options
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r-- | policyhandler/policy_receiver.py | 66 |
1 files changed, 46 insertions, 20 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index bb33cd5..1edb24d 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -32,7 +32,7 @@ from threading import Lock, Thread import websocket -from .config import Config +from .config import Config, Settings from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater @@ -47,25 +47,50 @@ class _PolicyReceiver(Thread): def __init__(self): """web-socket inside the thread to receive policy notifications from PolicyEngine""" - Thread.__init__(self, name="policy_receiver") - self.daemon = True + Thread.__init__(self, name="policy_receiver", daemon=True) self._lock = Lock() self._keep_running = True + self._settings = Settings(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] - self.web_socket_url = resturl = config["url"] + config["path_pdp"] - - if resturl.startswith("https:"): - self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" - else: - self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" - + self._web_socket_url = None self._web_socket = None + self.reconfigure() - self._policy_updater = PolicyUpdater() + self._policy_updater = PolicyUpdater(self.reconfigure) self._policy_updater.start() + def reconfigure(self): + """configure and reconfigure the web-socket""" + with self._lock: + self._settings.set_config(Config.discovered_config) + changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + + if not changed: + self._settings.commit_change() + return False + + prev_web_socket_url = self._web_socket_url + resturl = config.get("url", "") + config.get("path_pdp", "") + + if resturl.startswith("https:"): + self._web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self._web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + if self._web_socket_url == prev_web_socket_url: + _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + return False + + _PolicyReceiver._logger.info("changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + + self._stop_notifications() + return True + def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" websocket.enableTrace(True) @@ -80,9 +105,9 @@ class _PolicyReceiver(Thread): time.sleep(5) _PolicyReceiver._logger.info( - "connecting to policy-notifications at: %s", self.web_socket_url) + "connecting to policy-notifications at: %s", self._web_socket_url) self._web_socket = websocket.WebSocketApp( - self.web_socket_url, + self._web_socket_url, on_message=self._on_pdp_message, on_close=self._on_ws_close, on_error=self._on_ws_error @@ -101,15 +126,16 @@ class _PolicyReceiver(Thread): return keep_running def _stop_notifications(self): - """Shuts down the AutoNotification service if running.""" + """close the web-socket == stops the notification service if running.""" with self._lock: if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: self._web_socket.close() _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") - def _on_pdp_message(self, _, message): + def _on_pdp_message(self, *args): """received the notification from PDP""" try: + message = args and args[-1] _PolicyReceiver._logger.info("Received notification message: %s", message) if not message: return @@ -144,13 +170,13 @@ class _PolicyReceiver(Thread): _PolicyReceiver._logger.exception(error_msg) - def _on_ws_error(self, _, error): + def _on_ws_error(self, error): """report an error""" - _PolicyReceiver._logger.error("policy-notification error: %s", error) + _PolicyReceiver._logger.exception("policy-notification error: %s", str(error)) - def _on_ws_close(self, _): + def _on_ws_close(self, code, reason): """restart web-socket on close""" - _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason) def shutdown(self, audit): """Shutdown the policy-receiver""" |