aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_receiver.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r--policyhandler/policy_receiver.py131
1 files changed, 109 insertions, 22 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 3ae25fc..249c1f7 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -28,14 +28,16 @@ passes the notifications to policy-updater
import copy
import json
import logging
-import os
import ssl
import time
+import urllib.parse
+from datetime import datetime
from threading import Lock, Thread
import websocket
from .config import Config, Settings
+from .onap.audit import Audit
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
from .policy_utils import Utils
@@ -48,8 +50,17 @@ POLICY_MATCHES = 'matches'
class _PolicyReceiver(Thread):
"""web-socket to PolicyEngine"""
_logger = logging.getLogger("policy_handler.policy_receiver")
-
- def __init__(self):
+ WS_STARTED = "started"
+ WS_START_COUNT = "start_count"
+ WS_CLOSE_COUNT = "close_count"
+ WS_ERROR_COUNT = "error_count"
+ WS_PONG_COUNT = "pong_count"
+ WS_MESSAGE_COUNT = "message_count"
+ WS_MESSAGE_TIMESTAMP = "message_timestamp"
+ WS_STATUS = "status"
+ WS_PING_INTERVAL_DEFAULT = 180
+
+ def __init__(self, audit):
"""web-socket inside the thread to receive policy notifications from PolicyEngine"""
Thread.__init__(self, name="policy_receiver", daemon=True)
@@ -62,14 +73,26 @@ class _PolicyReceiver(Thread):
self._web_socket_sslopt = None
self._tls_wss_ca_mode = None
self._web_socket = None
- self.reconfigure()
-
- self._policy_updater = PolicyUpdater(self.reconfigure)
- self._policy_updater.start()
-
- def reconfigure(self):
+ self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT
+ self._web_socket_health = {
+ _PolicyReceiver.WS_START_COUNT: 0,
+ _PolicyReceiver.WS_CLOSE_COUNT: 0,
+ _PolicyReceiver.WS_ERROR_COUNT: 0,
+ _PolicyReceiver.WS_PONG_COUNT: 0,
+ _PolicyReceiver.WS_MESSAGE_COUNT: 0,
+ _PolicyReceiver.WS_STATUS: "created"
+ }
+
+ Audit.register_item_health("web_socket_health", self._get_health)
+ self._reconfigure(audit)
+
+ self._policy_updater = PolicyUpdater(self._reconfigure)
+
+ def _reconfigure(self, audit):
"""configure and reconfigure the web-socket"""
with self._lock:
+ _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format(
+ json.dumps(self._get_health(), sort_keys=True))))
self._sleep_before_restarting = 5
self._settings.set_config(Config.discovered_config)
changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
@@ -80,13 +103,19 @@ class _PolicyReceiver(Thread):
prev_web_socket_url = self._web_socket_url
prev_web_socket_sslopt = self._web_socket_sslopt
+ prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
self._web_socket_sslopt = None
- resturl = (config.get("url", "").lower()
- + config.get("path_notifications", "/pdp/notifications"))
+ resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/",
+ config.get("path_notifications", "/pdp/notifications"))
self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
+ self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS)
+ if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60:
+ self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT
+
if resturl.startswith("https:"):
self._web_socket_url = resturl.replace("https:", "wss:")
@@ -101,17 +130,19 @@ class _PolicyReceiver(Thread):
else:
self._web_socket_url = resturl.replace("http:", "ws:")
+ log_changed = (
+ "changed web_socket_url(%s) or tls_wss_ca_mode(%s)"
+ " or ws_ping_interval_in_secs(%s): %s" %
+ (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs,
+ self._settings))
if (self._web_socket_url == prev_web_socket_url
- and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)):
- _PolicyReceiver._logger.info(
- "not changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s",
- self._web_socket_url, self._tls_wss_ca_mode, self._settings)
+ and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)
+ and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs):
+ _PolicyReceiver._logger.info(audit.info("not {}".format(log_changed)))
self._settings.commit_change()
return False
- _PolicyReceiver._logger.info("changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s",
- self._web_socket_url, self._tls_wss_ca_mode,
- self._settings)
+ _PolicyReceiver._logger.info(audit.info(log_changed))
self._settings.commit_change()
self._stop_notifications()
@@ -119,6 +150,8 @@ class _PolicyReceiver(Thread):
def run(self):
"""listen on web-socket and pass the policy notifications to policy-updater"""
+ self._policy_updater.start()
+ _PolicyReceiver._logger.info("starting policy_receiver...")
websocket.enableTrace(True)
restarting = False
while True:
@@ -142,6 +175,7 @@ class _PolicyReceiver(Thread):
web_socket_url = self._web_socket_url
sslopt = copy.deepcopy(self._web_socket_sslopt)
tls_wss_ca_mode = self._tls_wss_ca_mode
+ ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
_PolicyReceiver._logger.info(
"connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)",
@@ -149,13 +183,15 @@ class _PolicyReceiver(Thread):
self._web_socket = websocket.WebSocketApp(
web_socket_url,
+ on_open=self._on_ws_open,
on_message=self._on_pdp_message,
on_close=self._on_ws_close,
- on_error=self._on_ws_error
+ on_error=self._on_ws_error,
+ on_pong=self._on_ws_pong
)
_PolicyReceiver._logger.info("waiting for policy-notifications...")
- self._web_socket.run_forever(sslopt=sslopt)
+ self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
restarting = True
_PolicyReceiver._logger.info("exit policy-receiver")
@@ -175,9 +211,13 @@ class _PolicyReceiver(Thread):
def _on_pdp_message(self, *args):
"""received the notification from PDP"""
+ self._web_socket_health[_PolicyReceiver.WS_MESSAGE_COUNT] += 1
+ self._web_socket_health[_PolicyReceiver.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow())
try:
message = args and args[-1]
_PolicyReceiver._logger.info("Received notification message: %s", message)
+ _PolicyReceiver._logger.info("web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
if not message:
return
message = json.loads(message)
@@ -216,9 +256,47 @@ class _PolicyReceiver(Thread):
_PolicyReceiver._logger.exception("policy-notification error %s", str(error))
self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5
+ self._web_socket_health[_PolicyReceiver.WS_STATUS] = "error"
+ self._web_socket_health[_PolicyReceiver.WS_ERROR_COUNT] += 1
+ self._web_socket_health["last_error"] = {
+ "error": str(error), "timestamp": str(datetime.utcnow())
+ }
+ _PolicyReceiver._logger.info("web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
def _on_ws_close(self, code, reason):
"""restart web-socket on close"""
- _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason)
+ self._web_socket_health["last_closed"] = str(datetime.utcnow())
+ self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed"
+ self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1
+ _PolicyReceiver._logger.info(
+ "lost connection(%s, %s) to PDP - restarting... web_socket_health %s",
+ code, reason, json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_open(self):
+ """started web-socket"""
+ self._web_socket_health[_PolicyReceiver.WS_STATUS] = _PolicyReceiver.WS_STARTED
+ self._web_socket_health[_PolicyReceiver.WS_START_COUNT] += 1
+ self._web_socket_health[_PolicyReceiver.WS_STARTED] = datetime.utcnow()
+ _PolicyReceiver._logger.info("opened connection to PDP web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_pong(self, pong):
+ """pong = response to pinging the server of the web-socket"""
+ self._web_socket_health[_PolicyReceiver.WS_PONG_COUNT] += 1
+ _PolicyReceiver._logger.info(
+ "pong(%s) from connection to PDP web_socket_health %s",
+ pong, json.dumps(self._get_health(), sort_keys=True))
+
+ def _get_health(self):
+ """returns the healthcheck of the web-socket as json"""
+ web_socket_health = copy.deepcopy(self._web_socket_health)
+ started = web_socket_health.get(_PolicyReceiver.WS_STARTED)
+ if started:
+ web_socket_health[_PolicyReceiver.WS_STARTED] = str(started)
+ web_socket_health["uptime"] = str(datetime.utcnow() - started)
+ return web_socket_health
+
def shutdown(self, audit):
"""Shutdown the policy-receiver"""
@@ -237,11 +315,20 @@ class _PolicyReceiver(Thread):
"""need to bring the latest policies to DCAE-Controller"""
self._policy_updater.catch_up(audit)
+ def is_running(self):
+ """check whether the policy-receiver and policy-updater are running"""
+ return self.is_alive() and self._policy_updater.is_alive()
+
class PolicyReceiver(object):
"""policy-receiver - static singleton wrapper"""
_policy_receiver = None
@staticmethod
+ def is_running():
+ """check whether the policy-receiver runs"""
+ return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running()
+
+ @staticmethod
def shutdown(audit):
"""Shutdown the notification-handler"""
PolicyReceiver._policy_receiver.shutdown(audit)
@@ -254,7 +341,7 @@ class PolicyReceiver(object):
@staticmethod
def run(audit):
"""Using policy-engine client to talk to policy engine"""
- PolicyReceiver._policy_receiver = _PolicyReceiver()
+ PolicyReceiver._policy_receiver = _PolicyReceiver(audit)
PolicyReceiver._policy_receiver.start()
PolicyReceiver.catch_up(audit)