diff options
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r-- | policyhandler/policy_receiver.py | 320 |
1 files changed, 18 insertions, 302 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 7cf1869..d949c4b 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """ policy-receiver communicates with policy-engine @@ -25,323 +24,37 @@ on receiving the policy-notifications, the policy-receiver passes the notifications to policy-updater """ -import copy -import json -import logging -import ssl -import time -import urllib.parse -from datetime import datetime -from threading import Lock, Thread - -import websocket - -from .config import Config, Settings -from .onap.audit import Audit -from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION -from .policy_updater import PolicyUpdater -from .policy_utils import Utils from .service_activator import ServiceActivator -LOADED_POLICIES = 'loadedPolicies' -REMOVED_POLICIES = 'removedPolicies' -POLICY_VER = 'versionNo' -POLICY_MATCHES = 'matches' - -class _PolicyReceiver(Thread): - """web-socket to PolicyEngine""" - _logger = logging.getLogger("policy_handler.policy_receiver") - WS_STARTED = "started" - WS_START_COUNT = "start_count" - WS_CLOSE_COUNT = "close_count" - WS_ERROR_COUNT = "error_count" - WS_PONG_COUNT = "pong_count" - WS_MESSAGE_COUNT = "message_count" - WS_MESSAGE_TIMESTAMP = "message_timestamp" - WS_STATUS = "status" - WS_PING_INTERVAL_DEFAULT = 30 - WEB_SOCKET_HEALTH = "web_socket_health" - - def __init__(self, audit, policy_updater): - """web-socket inside the thread to receive policy notifications from PolicyEngine""" - Thread.__init__(self, name="policy_receiver", daemon=True) - - self._policy_updater = policy_updater - self._lock = Lock() - self._keep_running = True - self._settings = Settings(Config.FIELD_POLICY_ENGINE) - - self._sleep_before_restarting = 5 - self._web_socket_url = None - self._web_socket_sslopt = None - self._tls_wss_ca_mode = None - self._web_socket = None - self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT - self._web_socket_health = { - _PolicyReceiver.WS_START_COUNT: 0, - _PolicyReceiver.WS_CLOSE_COUNT: 0, - _PolicyReceiver.WS_ERROR_COUNT: 0, - _PolicyReceiver.WS_PONG_COUNT: 0, - _PolicyReceiver.WS_MESSAGE_COUNT: 0, - _PolicyReceiver.WS_STATUS: "created" - } - - Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health) - self.reconfigure(audit) - - def reconfigure(self, audit): - """configure and reconfigure the web-socket""" - with self._lock: - _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format( - json.dumps(self._get_health(), sort_keys=True)))) - self._sleep_before_restarting = 5 - self._settings.set_config(Config.discovered_config) - changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) - - if not changed: - self._settings.commit_change() - return False - - prev_web_socket_url = self._web_socket_url - prev_web_socket_sslopt = self._web_socket_sslopt - prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs - - self._web_socket_sslopt = None - - resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/", - config.get("path_notifications", "/pdp/notifications")) - - self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE) - - self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS) - if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60: - self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT - - if resturl.startswith("https:"): - self._web_socket_url = resturl.replace("https:", "wss:") - - verify = Config.get_tls_verify(self._tls_wss_ca_mode) - if verify is False: - self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE} - elif verify is True: - pass - else: - self._web_socket_sslopt = {'ca_certs': verify} - - else: - self._web_socket_url = resturl.replace("http:", "ws:") - - log_changed = ( - "changed web_socket_url(%s) or tls_wss_ca_mode(%s)" - " or ws_ping_interval_in_secs(%s): %s" % - (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs, - self._settings)) - if (self._web_socket_url == prev_web_socket_url - and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt) - and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs): - _PolicyReceiver._logger.info(audit.info("not {}".format(log_changed))) - self._settings.commit_change() - return False - - _PolicyReceiver._logger.info(audit.info(log_changed)) - self._settings.commit_change() - - self._stop_notifications() - return True - - def run(self): - """listen on web-socket and pass the policy notifications to policy-updater""" - _PolicyReceiver._logger.info("starting policy_receiver...") - websocket.enableTrace(True) - restarting = False - while True: - if not self._get_keep_running(): - break - - self._stop_notifications() - - if restarting: - with self._lock: - sleep_before_restarting = self._sleep_before_restarting - _PolicyReceiver._logger.info( - "going to sleep for %s secs before restarting policy-notifications", - sleep_before_restarting) - - time.sleep(sleep_before_restarting) - if not self._get_keep_running(): - break - - with self._lock: - web_socket_url = self._web_socket_url - sslopt = copy.deepcopy(self._web_socket_sslopt) - tls_wss_ca_mode = self._tls_wss_ca_mode - ws_ping_interval_in_secs = self._ws_ping_interval_in_secs - - _PolicyReceiver._logger.info( - "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)" - " ws_ping_interval_in_secs(%s)", - web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs) - - self._web_socket = websocket.WebSocketApp( - web_socket_url, - on_open=self._on_ws_open, - on_message=self._on_pdp_message, - on_close=self._on_ws_close, - on_error=self._on_ws_error, - on_pong=self._on_ws_pong - ) - - _PolicyReceiver._logger.info("waiting for policy-notifications...") - self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) - restarting = True - - Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH) - _PolicyReceiver._logger.info("exit policy-receiver") - - def _get_keep_running(self): - """thread-safe check whether to continue running""" - with self._lock: - keep_running = self._keep_running - return keep_running - - def _stop_notifications(self): - """close the web-socket == stops the notification service if running.""" - with self._lock: - if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: - self._web_socket.close() - _PolicyReceiver._logger.info("stopped receiving notifications from PDP") - - def _on_pdp_message(self, *args): - """received the notification from PDP""" - self._web_socket_health[_PolicyReceiver.WS_MESSAGE_COUNT] += 1 - self._web_socket_health[_PolicyReceiver.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow()) - try: - message = args and args[-1] - _PolicyReceiver._logger.info("Received notification message: %s", message) - _PolicyReceiver._logger.info("web_socket_health %s", - json.dumps(self._get_health(), sort_keys=True)) - if not message: - return - message = json.loads(message) - - if not message or not isinstance(message, dict): - _PolicyReceiver._logger.warning("unexpected message from PDP: %s", - json.dumps(message)) - return - - policies_updated = [ - {POLICY_NAME: policy.get(POLICY_NAME), - POLICY_VERSION: policy.get(POLICY_VER), - MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})} - for policy in message.get(LOADED_POLICIES, []) - ] - - policies_removed = [ - {POLICY_NAME: removed_policy.get(POLICY_NAME), - POLICY_VERSION: removed_policy.get(POLICY_VER)} - for removed_policy in message.get(REMOVED_POLICIES, []) - ] - - if not policies_updated and not policies_removed: - _PolicyReceiver._logger.info("no policy updated or removed") - return - - self._policy_updater.policy_update(policies_updated, policies_removed) - except Exception as ex: - error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), - "on_pdp_message", json.dumps(message)) - - _PolicyReceiver._logger.exception(error_msg) - - def _on_ws_error(self, error): - """report an error""" - _PolicyReceiver._logger.exception("policy-notification error %s", str(error)) - self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5 - - self._web_socket_health[_PolicyReceiver.WS_STATUS] = "error" - self._web_socket_health[_PolicyReceiver.WS_ERROR_COUNT] += 1 - self._web_socket_health["last_error"] = { - "error": str(error), "timestamp": str(datetime.utcnow()) - } - _PolicyReceiver._logger.info("web_socket_health %s", - json.dumps(self._get_health(), sort_keys=True)) - - def _on_ws_close(self, code, reason): - """restart web-socket on close""" - self._web_socket_health["last_closed"] = str(datetime.utcnow()) - self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed" - self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1 - _PolicyReceiver._logger.info( - "lost connection(%s, %s) to PDP web_socket_health %s", - code, reason, json.dumps(self._get_health(), sort_keys=True)) - - def _on_ws_open(self): - """started web-socket""" - self._web_socket_health[_PolicyReceiver.WS_STATUS] = _PolicyReceiver.WS_STARTED - self._web_socket_health[_PolicyReceiver.WS_START_COUNT] += 1 - self._web_socket_health[_PolicyReceiver.WS_STARTED] = datetime.utcnow() - _PolicyReceiver._logger.info("opened connection to PDP web_socket_health %s", - json.dumps(self._get_health(), sort_keys=True)) - - def _on_ws_pong(self, pong): - """pong = response to pinging the server of the web-socket""" - self._web_socket_health[_PolicyReceiver.WS_PONG_COUNT] += 1 - _PolicyReceiver._logger.info( - "pong(%s) from connection to PDP web_socket_health %s", - pong, json.dumps(self._get_health(), sort_keys=True)) - - def _get_health(self): - """returns the healthcheck of the web-socket as json""" - web_socket_health = copy.deepcopy(self._web_socket_health) - web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs - started = web_socket_health.get(_PolicyReceiver.WS_STARTED) - if started: - web_socket_health[_PolicyReceiver.WS_STARTED] = str(started) - web_socket_health["uptime"] = str(datetime.utcnow() - started) - return web_socket_health - - - def shutdown(self, audit): - """Shutdown the policy-receiver""" - _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver")) - with self._lock: - self._keep_running = False - - self._stop_notifications() - - if self.is_alive(): - self.join() - - class PolicyReceiver(object): """ policy-receiver - static singleton wrapper around two threads policy_updater - master thread for all scheduled actions - policy_receiver - listens to policy-engine through web-socket + policy_listener - listens to policy-engine through web-socket """ _policy_updater = None - _policy_receiver = None + _policy_listener = None @staticmethod def is_running(): """check whether the policy-receiver runs""" - return (PolicyReceiver._policy_receiver - and PolicyReceiver._policy_receiver.is_alive() + return (PolicyReceiver._policy_listener + and PolicyReceiver._policy_listener.is_alive() and PolicyReceiver._policy_updater and PolicyReceiver._policy_updater.is_alive()) @staticmethod - def _close_receiver(audit): + def _close_listener(audit): """stop the notification-handler""" - if PolicyReceiver._policy_receiver: - policy_receiver = PolicyReceiver._policy_receiver - PolicyReceiver._policy_receiver = None + if PolicyReceiver._policy_listener: + policy_receiver = PolicyReceiver._policy_listener + PolicyReceiver._policy_listener = None policy_receiver.shutdown(audit) @staticmethod def shutdown(audit): """shutdown the notification-handler and policy-updater""" - PolicyReceiver._close_receiver(audit) + PolicyReceiver._close_listener(audit) PolicyReceiver._policy_updater.shutdown(audit) @staticmethod @@ -359,23 +72,26 @@ class PolicyReceiver(object): """act on reconfiguration event""" active = ServiceActivator.is_active_mode_of_operation(audit) - if not PolicyReceiver._policy_receiver: + if not PolicyReceiver._policy_listener: if active: - PolicyReceiver._policy_receiver = _PolicyReceiver(audit, - PolicyReceiver._policy_updater) - PolicyReceiver._policy_receiver.start() + from . import pdp_client + PolicyReceiver._policy_listener = pdp_client.PolicyListener( + audit, PolicyReceiver._policy_updater + ) + PolicyReceiver._policy_listener.start() return if not active: - PolicyReceiver._close_receiver(audit) + PolicyReceiver._close_listener(audit) return - PolicyReceiver._policy_receiver.reconfigure(audit) + PolicyReceiver._policy_listener.reconfigure(audit) @staticmethod def run(audit): """run policy_updater and policy_receiver""" + from .policy_updater import PolicyUpdater PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure) PolicyReceiver._on_reconfigure(audit) |