diff options
Diffstat (limited to 'policyhandler/pdp_api_v0/policy_listener.py')
-rw-r--r-- | policyhandler/pdp_api_v0/policy_listener.py | 309 |
1 files changed, 309 insertions, 0 deletions
diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py new file mode 100644 index 0000000..67e4c49 --- /dev/null +++ b/policyhandler/pdp_api_v0/policy_listener.py @@ -0,0 +1,309 @@ +# ================================================================================ +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +""" +policy-listener communicates with policy-engine +thru web-socket to receive push notifications +on updates and removal of policies. + +on receiving the policy-notifications, the policy-receiver +passes the notifications to policy-updater +""" + +import copy +import json +import os +import ssl +import time +import urllib.parse +from datetime import datetime +from threading import Lock, Thread + +import websocket + +from ..config import Config, Settings +from ..onap.audit import Audit +from ..utils import Utils +from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION + +LOADED_POLICIES = 'loadedPolicies' +REMOVED_POLICIES = 'removedPolicies' +POLICY_VER = 'versionNo' +POLICY_MATCHES = 'matches' + +_LOGGER = Utils.get_logger(__file__) + +class PolicyListener(Thread): + """web-socket to PolicyEngine""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + WS_STARTED = "started" + WS_START_COUNT = "start_count" + WS_CLOSE_COUNT = "close_count" + WS_ERROR_COUNT = "error_count" + WS_PONG_COUNT = "pong_count" + WS_MESSAGE_COUNT = "message_count" + WS_MESSAGE_TIMESTAMP = "message_timestamp" + WS_STATUS = "status" + WS_PING_INTERVAL_DEFAULT = 30 + WEB_SOCKET_HEALTH = "web_socket_health" + + def __init__(self, audit, policy_updater): + """web-socket inside the thread to receive policy notifications from PolicyEngine""" + Thread.__init__(self, name="policy_receiver", daemon=True) + + self._policy_updater = policy_updater + self._lock = Lock() + self._keep_running = True + self._settings = Settings(Config.FIELD_POLICY_ENGINE) + + self._sleep_before_restarting = 5 + self._web_socket_url = None + self._web_socket_sslopt = None + self._tls_wss_ca_mode = None + self._web_socket = None + self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT + self._web_socket_health = { + PolicyListener.WS_START_COUNT: 0, + PolicyListener.WS_CLOSE_COUNT: 0, + PolicyListener.WS_ERROR_COUNT: 0, + PolicyListener.WS_PONG_COUNT: 0, + PolicyListener.WS_MESSAGE_COUNT: 0, + PolicyListener.WS_STATUS: "created" + } + + Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH, self._get_health) + self.reconfigure(audit) + + def reconfigure(self, audit): + """configure and reconfigure the web-socket""" + with self._lock: + _LOGGER.info(audit.info("web_socket_health {}".format( + json.dumps(self._get_health(), sort_keys=True)))) + self._sleep_before_restarting = 5 + self._settings.set_config(Config.discovered_config) + changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + + if not changed: + self._settings.commit_change() + return False + + prev_web_socket_url = self._web_socket_url + prev_web_socket_sslopt = self._web_socket_sslopt + prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs + + self._web_socket_sslopt = None + + resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/", + config.get("path_notifications", "/pdp/notifications")) + + self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE) + + self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS) + if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60: + self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT + + if resturl.startswith("https:"): + self._web_socket_url = resturl.replace("https:", "wss:") + + verify = Config.get_tls_verify(self._tls_wss_ca_mode) + if verify is False: + self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE} + elif verify is True: + pass + else: + self._web_socket_sslopt = {'ca_certs': verify} + + else: + self._web_socket_url = resturl.replace("http:", "ws:") + + log_changed = ( + "changed web_socket_url(%s) or tls_wss_ca_mode(%s)" + " or ws_ping_interval_in_secs(%s): %s" % + (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs, + self._settings)) + if (self._web_socket_url == prev_web_socket_url + and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt) + and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs): + _LOGGER.info(audit.info("not {}".format(log_changed))) + self._settings.commit_change() + return False + + _LOGGER.info(audit.info(log_changed)) + self._settings.commit_change() + + self._stop_notifications() + return True + + def run(self): + """listen on web-socket and pass the policy notifications to policy-updater""" + _LOGGER.info("starting policy_receiver...") + websocket.enableTrace(True) + restarting = False + while True: + if not self._get_keep_running(): + break + + self._stop_notifications() + + if restarting: + with self._lock: + sleep_before_restarting = self._sleep_before_restarting + _LOGGER.info( + "going to sleep for %s secs before restarting policy-notifications", + sleep_before_restarting) + + time.sleep(sleep_before_restarting) + if not self._get_keep_running(): + break + + with self._lock: + web_socket_url = self._web_socket_url + sslopt = copy.deepcopy(self._web_socket_sslopt) + tls_wss_ca_mode = self._tls_wss_ca_mode + ws_ping_interval_in_secs = self._ws_ping_interval_in_secs + + _LOGGER.info( + "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)" + " ws_ping_interval_in_secs(%s)", + web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs) + + self._web_socket = websocket.WebSocketApp( + web_socket_url, + on_open=self._on_ws_open, + on_message=self._on_pdp_message, + on_close=self._on_ws_close, + on_error=self._on_ws_error, + on_pong=self._on_ws_pong + ) + + _LOGGER.info("waiting for policy-notifications...") + self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) + restarting = True + + Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH) + _LOGGER.info("exit policy-receiver") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _stop_notifications(self): + """close the web-socket == stops the notification service if running.""" + with self._lock: + if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: + self._web_socket.close() + _LOGGER.info("stopped receiving notifications from PDP") + + def _on_pdp_message(self, *args): + """received the notification from PDP""" + self._web_socket_health[PolicyListener.WS_MESSAGE_COUNT] += 1 + self._web_socket_health[PolicyListener.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow()) + try: + message = args and args[-1] + _LOGGER.info("Received notification message: %s", message) + _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True)) + if not message: + return + message = json.loads(message) + + if not message or not isinstance(message, dict): + _LOGGER.warning("unexpected message from PDP: %s", json.dumps(message)) + return + + policies_updated = [ + {POLICY_NAME: policy.get(POLICY_NAME), + POLICY_VERSION: policy.get(POLICY_VER), + MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})} + for policy in message.get(LOADED_POLICIES, []) + ] + + policies_removed = [ + {POLICY_NAME: removed_policy.get(POLICY_NAME), + POLICY_VERSION: removed_policy.get(POLICY_VER)} + for removed_policy in message.get(REMOVED_POLICIES, []) + ] + + if not policies_updated and not policies_removed: + _LOGGER.info("no policy updated or removed") + return + + self._policy_updater.policy_update(policies_updated, policies_removed) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_pdp_message", json.dumps(message)) + + _LOGGER.exception(error_msg) + + def _on_ws_error(self, error): + """report an error""" + _LOGGER.exception("policy-notification error %s", str(error)) + self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5 + + self._web_socket_health[PolicyListener.WS_STATUS] = "error" + self._web_socket_health[PolicyListener.WS_ERROR_COUNT] += 1 + self._web_socket_health["last_error"] = { + "error": str(error), "timestamp": str(datetime.utcnow()) + } + _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_close(self, code, reason): + """restart web-socket on close""" + self._web_socket_health["last_closed"] = str(datetime.utcnow()) + self._web_socket_health[PolicyListener.WS_STATUS] = "closed" + self._web_socket_health[PolicyListener.WS_CLOSE_COUNT] += 1 + _LOGGER.info( + "lost connection(%s, %s) to PDP web_socket_health %s", + code, reason, json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_open(self): + """started web-socket""" + self._web_socket_health[PolicyListener.WS_STATUS] = PolicyListener.WS_STARTED + self._web_socket_health[PolicyListener.WS_START_COUNT] += 1 + self._web_socket_health[PolicyListener.WS_STARTED] = datetime.utcnow() + _LOGGER.info("opened connection to PDP web_socket_health %s", + json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_pong(self, pong): + """pong = response to pinging the server of the web-socket""" + self._web_socket_health[PolicyListener.WS_PONG_COUNT] += 1 + _LOGGER.info( + "pong(%s) from connection to PDP web_socket_health %s", + pong, json.dumps(self._get_health(), sort_keys=True)) + + def _get_health(self): + """returns the healthcheck of the web-socket as json""" + web_socket_health = copy.deepcopy(self._web_socket_health) + web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs + started = web_socket_health.get(PolicyListener.WS_STARTED) + if started: + web_socket_health[PolicyListener.WS_STARTED] = str(started) + web_socket_health["uptime"] = str(datetime.utcnow() - started) + return web_socket_health + + + def shutdown(self, audit): + """Shutdown the policy-listener""" + _LOGGER.info(audit.info("shutdown policy-listener")) + with self._lock: + self._keep_running = False + + self._stop_notifications() + + if self.is_alive(): + self.join() |