summaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_receiver.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-08-24 13:15:04 -0400
committerAlex Shatov <alexs@att.com>2018-08-24 13:15:04 -0400
commit1d693376205c66af93283d04e8e9740c947a7d02 (patch)
tree9188af307614661c1afbe50cdaa2fa8a2cdc691c /policyhandler/policy_receiver.py
parent1cddbc70e4799970dc606014ef79e025d6a8e722 (diff)
4.2.0 policy-handler - periodic reconfigure
- reconfigure == periodically retrieve the policy-handler config from consul-kv and compare to previous config and subconfigs. If changed, reconfigure the subunits - selectively change one or any settings for the following = catch_up timer interval = reconfigure timer interval = deployment-handler url and params (thread-safe) = policy-engine url and params (thread-safe) = web-socket url to policy-engine (through a callback) - each subunit has its own Settings that keep track of changes - try-catch and metrics around discovery - consul API - hidden the secrets from logs - froze the web-socket version to 0.49.0 because 0.50.0 and 0.51.0 are broken - looking around for stable alternatives - fixed-adapted the callbacks passed to the web-socket lib that changed its API in 0.49.0 and later - log the stack on the exception occurring in the web-socket lib - unit test refactoring Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-470
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r--policyhandler/policy_receiver.py66
1 files changed, 46 insertions, 20 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index bb33cd5..1edb24d 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -32,7 +32,7 @@ from threading import Lock, Thread
import websocket
-from .config import Config
+from .config import Config, Settings
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
@@ -47,25 +47,50 @@ class _PolicyReceiver(Thread):
def __init__(self):
"""web-socket inside the thread to receive policy notifications from PolicyEngine"""
- Thread.__init__(self, name="policy_receiver")
- self.daemon = True
+ Thread.__init__(self, name="policy_receiver", daemon=True)
self._lock = Lock()
self._keep_running = True
+ self._settings = Settings(Config.FIELD_POLICY_ENGINE)
- config = Config.settings[Config.FIELD_POLICY_ENGINE]
- self.web_socket_url = resturl = config["url"] + config["path_pdp"]
-
- if resturl.startswith("https:"):
- self.web_socket_url = resturl.replace("https:", "wss:") + "notifications"
- else:
- self.web_socket_url = resturl.replace("http:", "ws:") + "notifications"
-
+ self._web_socket_url = None
self._web_socket = None
+ self.reconfigure()
- self._policy_updater = PolicyUpdater()
+ self._policy_updater = PolicyUpdater(self.reconfigure)
self._policy_updater.start()
+ def reconfigure(self):
+ """configure and reconfigure the web-socket"""
+ with self._lock:
+ self._settings.set_config(Config.discovered_config)
+ changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+
+ if not changed:
+ self._settings.commit_change()
+ return False
+
+ prev_web_socket_url = self._web_socket_url
+ resturl = config.get("url", "") + config.get("path_pdp", "")
+
+ if resturl.startswith("https:"):
+ self._web_socket_url = resturl.replace("https:", "wss:") + "notifications"
+ else:
+ self._web_socket_url = resturl.replace("http:", "ws:") + "notifications"
+
+ if self._web_socket_url == prev_web_socket_url:
+ _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s",
+ self._web_socket_url, self._settings)
+ self._settings.commit_change()
+ return False
+
+ _PolicyReceiver._logger.info("changed web_socket_url(%s): %s",
+ self._web_socket_url, self._settings)
+ self._settings.commit_change()
+
+ self._stop_notifications()
+ return True
+
def run(self):
"""listen on web-socket and pass the policy notifications to policy-updater"""
websocket.enableTrace(True)
@@ -80,9 +105,9 @@ class _PolicyReceiver(Thread):
time.sleep(5)
_PolicyReceiver._logger.info(
- "connecting to policy-notifications at: %s", self.web_socket_url)
+ "connecting to policy-notifications at: %s", self._web_socket_url)
self._web_socket = websocket.WebSocketApp(
- self.web_socket_url,
+ self._web_socket_url,
on_message=self._on_pdp_message,
on_close=self._on_ws_close,
on_error=self._on_ws_error
@@ -101,15 +126,16 @@ class _PolicyReceiver(Thread):
return keep_running
def _stop_notifications(self):
- """Shuts down the AutoNotification service if running."""
+ """close the web-socket == stops the notification service if running."""
with self._lock:
if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
self._web_socket.close()
_PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
- def _on_pdp_message(self, _, message):
+ def _on_pdp_message(self, *args):
"""received the notification from PDP"""
try:
+ message = args and args[-1]
_PolicyReceiver._logger.info("Received notification message: %s", message)
if not message:
return
@@ -144,13 +170,13 @@ class _PolicyReceiver(Thread):
_PolicyReceiver._logger.exception(error_msg)
- def _on_ws_error(self, _, error):
+ def _on_ws_error(self, error):
"""report an error"""
- _PolicyReceiver._logger.error("policy-notification error: %s", error)
+ _PolicyReceiver._logger.exception("policy-notification error: %s", str(error))
- def _on_ws_close(self, _):
+ def _on_ws_close(self, code, reason):
"""restart web-socket on close"""
- _PolicyReceiver._logger.info("lost connection to PDP - restarting...")
+ _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason)
def shutdown(self, audit):
"""Shutdown the policy-receiver"""