aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/pdp_api_v0/policy_listener.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/pdp_api_v0/policy_listener.py')
-rw-r--r--policyhandler/pdp_api_v0/policy_listener.py309
1 files changed, 309 insertions, 0 deletions
diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py
new file mode 100644
index 0000000..67e4c49
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_listener.py
@@ -0,0 +1,309 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""
+policy-listener communicates with policy-engine
+thru web-socket to receive push notifications
+on updates and removal of policies.
+
+on receiving the policy-notifications, the policy-receiver
+passes the notifications to policy-updater
+"""
+
+import copy
+import json
+import os
+import ssl
+import time
+import urllib.parse
+from datetime import datetime
+from threading import Lock, Thread
+
+import websocket
+
+from ..config import Config, Settings
+from ..onap.audit import Audit
+from ..utils import Utils
+from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
+
+LOADED_POLICIES = 'loadedPolicies'
+REMOVED_POLICIES = 'removedPolicies'
+POLICY_VER = 'versionNo'
+POLICY_MATCHES = 'matches'
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyListener(Thread):
+ """web-socket to PolicyEngine"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ WS_STARTED = "started"
+ WS_START_COUNT = "start_count"
+ WS_CLOSE_COUNT = "close_count"
+ WS_ERROR_COUNT = "error_count"
+ WS_PONG_COUNT = "pong_count"
+ WS_MESSAGE_COUNT = "message_count"
+ WS_MESSAGE_TIMESTAMP = "message_timestamp"
+ WS_STATUS = "status"
+ WS_PING_INTERVAL_DEFAULT = 30
+ WEB_SOCKET_HEALTH = "web_socket_health"
+
+ def __init__(self, audit, policy_updater):
+ """web-socket inside the thread to receive policy notifications from PolicyEngine"""
+ Thread.__init__(self, name="policy_receiver", daemon=True)
+
+ self._policy_updater = policy_updater
+ self._lock = Lock()
+ self._keep_running = True
+ self._settings = Settings(Config.FIELD_POLICY_ENGINE)
+
+ self._sleep_before_restarting = 5
+ self._web_socket_url = None
+ self._web_socket_sslopt = None
+ self._tls_wss_ca_mode = None
+ self._web_socket = None
+ self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT
+ self._web_socket_health = {
+ PolicyListener.WS_START_COUNT: 0,
+ PolicyListener.WS_CLOSE_COUNT: 0,
+ PolicyListener.WS_ERROR_COUNT: 0,
+ PolicyListener.WS_PONG_COUNT: 0,
+ PolicyListener.WS_MESSAGE_COUNT: 0,
+ PolicyListener.WS_STATUS: "created"
+ }
+
+ Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH, self._get_health)
+ self.reconfigure(audit)
+
+ def reconfigure(self, audit):
+ """configure and reconfigure the web-socket"""
+ with self._lock:
+ _LOGGER.info(audit.info("web_socket_health {}".format(
+ json.dumps(self._get_health(), sort_keys=True))))
+ self._sleep_before_restarting = 5
+ self._settings.set_config(Config.discovered_config)
+ changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+
+ if not changed:
+ self._settings.commit_change()
+ return False
+
+ prev_web_socket_url = self._web_socket_url
+ prev_web_socket_sslopt = self._web_socket_sslopt
+ prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
+ self._web_socket_sslopt = None
+
+ resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/",
+ config.get("path_notifications", "/pdp/notifications"))
+
+ self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
+
+ self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS)
+ if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60:
+ self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT
+
+ if resturl.startswith("https:"):
+ self._web_socket_url = resturl.replace("https:", "wss:")
+
+ verify = Config.get_tls_verify(self._tls_wss_ca_mode)
+ if verify is False:
+ self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE}
+ elif verify is True:
+ pass
+ else:
+ self._web_socket_sslopt = {'ca_certs': verify}
+
+ else:
+ self._web_socket_url = resturl.replace("http:", "ws:")
+
+ log_changed = (
+ "changed web_socket_url(%s) or tls_wss_ca_mode(%s)"
+ " or ws_ping_interval_in_secs(%s): %s" %
+ (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs,
+ self._settings))
+ if (self._web_socket_url == prev_web_socket_url
+ and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)
+ and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs):
+ _LOGGER.info(audit.info("not {}".format(log_changed)))
+ self._settings.commit_change()
+ return False
+
+ _LOGGER.info(audit.info(log_changed))
+ self._settings.commit_change()
+
+ self._stop_notifications()
+ return True
+
+ def run(self):
+ """listen on web-socket and pass the policy notifications to policy-updater"""
+ _LOGGER.info("starting policy_receiver...")
+ websocket.enableTrace(True)
+ restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ self._stop_notifications()
+
+ if restarting:
+ with self._lock:
+ sleep_before_restarting = self._sleep_before_restarting
+ _LOGGER.info(
+ "going to sleep for %s secs before restarting policy-notifications",
+ sleep_before_restarting)
+
+ time.sleep(sleep_before_restarting)
+ if not self._get_keep_running():
+ break
+
+ with self._lock:
+ web_socket_url = self._web_socket_url
+ sslopt = copy.deepcopy(self._web_socket_sslopt)
+ tls_wss_ca_mode = self._tls_wss_ca_mode
+ ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
+ _LOGGER.info(
+ "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)"
+ " ws_ping_interval_in_secs(%s)",
+ web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs)
+
+ self._web_socket = websocket.WebSocketApp(
+ web_socket_url,
+ on_open=self._on_ws_open,
+ on_message=self._on_pdp_message,
+ on_close=self._on_ws_close,
+ on_error=self._on_ws_error,
+ on_pong=self._on_ws_pong
+ )
+
+ _LOGGER.info("waiting for policy-notifications...")
+ self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
+ restarting = True
+
+ Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH)
+ _LOGGER.info("exit policy-receiver")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _stop_notifications(self):
+ """close the web-socket == stops the notification service if running."""
+ with self._lock:
+ if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
+ self._web_socket.close()
+ _LOGGER.info("stopped receiving notifications from PDP")
+
+ def _on_pdp_message(self, *args):
+ """received the notification from PDP"""
+ self._web_socket_health[PolicyListener.WS_MESSAGE_COUNT] += 1
+ self._web_socket_health[PolicyListener.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow())
+ try:
+ message = args and args[-1]
+ _LOGGER.info("Received notification message: %s", message)
+ _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True))
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _LOGGER.warning("unexpected message from PDP: %s", json.dumps(message))
+ return
+
+ policies_updated = [
+ {POLICY_NAME: policy.get(POLICY_NAME),
+ POLICY_VERSION: policy.get(POLICY_VER),
+ MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})}
+ for policy in message.get(LOADED_POLICIES, [])
+ ]
+
+ policies_removed = [
+ {POLICY_NAME: removed_policy.get(POLICY_NAME),
+ POLICY_VERSION: removed_policy.get(POLICY_VER)}
+ for removed_policy in message.get(REMOVED_POLICIES, [])
+ ]
+
+ if not policies_updated and not policies_removed:
+ _LOGGER.info("no policy updated or removed")
+ return
+
+ self._policy_updater.policy_update(policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _LOGGER.exception(error_msg)
+
+ def _on_ws_error(self, error):
+ """report an error"""
+ _LOGGER.exception("policy-notification error %s", str(error))
+ self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5
+
+ self._web_socket_health[PolicyListener.WS_STATUS] = "error"
+ self._web_socket_health[PolicyListener.WS_ERROR_COUNT] += 1
+ self._web_socket_health["last_error"] = {
+ "error": str(error), "timestamp": str(datetime.utcnow())
+ }
+ _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_close(self, code, reason):
+ """restart web-socket on close"""
+ self._web_socket_health["last_closed"] = str(datetime.utcnow())
+ self._web_socket_health[PolicyListener.WS_STATUS] = "closed"
+ self._web_socket_health[PolicyListener.WS_CLOSE_COUNT] += 1
+ _LOGGER.info(
+ "lost connection(%s, %s) to PDP web_socket_health %s",
+ code, reason, json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_open(self):
+ """started web-socket"""
+ self._web_socket_health[PolicyListener.WS_STATUS] = PolicyListener.WS_STARTED
+ self._web_socket_health[PolicyListener.WS_START_COUNT] += 1
+ self._web_socket_health[PolicyListener.WS_STARTED] = datetime.utcnow()
+ _LOGGER.info("opened connection to PDP web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_pong(self, pong):
+ """pong = response to pinging the server of the web-socket"""
+ self._web_socket_health[PolicyListener.WS_PONG_COUNT] += 1
+ _LOGGER.info(
+ "pong(%s) from connection to PDP web_socket_health %s",
+ pong, json.dumps(self._get_health(), sort_keys=True))
+
+ def _get_health(self):
+ """returns the healthcheck of the web-socket as json"""
+ web_socket_health = copy.deepcopy(self._web_socket_health)
+ web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs
+ started = web_socket_health.get(PolicyListener.WS_STARTED)
+ if started:
+ web_socket_health[PolicyListener.WS_STARTED] = str(started)
+ web_socket_health["uptime"] = str(datetime.utcnow() - started)
+ return web_socket_health
+
+
+ def shutdown(self, audit):
+ """Shutdown the policy-listener"""
+ _LOGGER.info(audit.info("shutdown policy-listener"))
+ with self._lock:
+ self._keep_running = False
+
+ self._stop_notifications()
+
+ if self.is_alive():
+ self.join()