aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_receiver.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r--policyhandler/policy_receiver.py102
1 files changed, 70 insertions, 32 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 249c1f7..7cf1869 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ from .onap.audit import Audit
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
from .policy_utils import Utils
+from .service_activator import ServiceActivator
LOADED_POLICIES = 'loadedPolicies'
REMOVED_POLICIES = 'removedPolicies'
@@ -58,12 +59,14 @@ class _PolicyReceiver(Thread):
WS_MESSAGE_COUNT = "message_count"
WS_MESSAGE_TIMESTAMP = "message_timestamp"
WS_STATUS = "status"
- WS_PING_INTERVAL_DEFAULT = 180
+ WS_PING_INTERVAL_DEFAULT = 30
+ WEB_SOCKET_HEALTH = "web_socket_health"
- def __init__(self, audit):
+ def __init__(self, audit, policy_updater):
"""web-socket inside the thread to receive policy notifications from PolicyEngine"""
Thread.__init__(self, name="policy_receiver", daemon=True)
+ self._policy_updater = policy_updater
self._lock = Lock()
self._keep_running = True
self._settings = Settings(Config.FIELD_POLICY_ENGINE)
@@ -83,12 +86,10 @@ class _PolicyReceiver(Thread):
_PolicyReceiver.WS_STATUS: "created"
}
- Audit.register_item_health("web_socket_health", self._get_health)
- self._reconfigure(audit)
+ Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health)
+ self.reconfigure(audit)
- self._policy_updater = PolicyUpdater(self._reconfigure)
-
- def _reconfigure(self, audit):
+ def reconfigure(self, audit):
"""configure and reconfigure the web-socket"""
with self._lock:
_PolicyReceiver._logger.info(audit.info("web_socket_health {}".format(
@@ -150,7 +151,6 @@ class _PolicyReceiver(Thread):
def run(self):
"""listen on web-socket and pass the policy notifications to policy-updater"""
- self._policy_updater.start()
_PolicyReceiver._logger.info("starting policy_receiver...")
websocket.enableTrace(True)
restarting = False
@@ -178,8 +178,9 @@ class _PolicyReceiver(Thread):
ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
_PolicyReceiver._logger.info(
- "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)",
- web_socket_url, json.dumps(sslopt), tls_wss_ca_mode)
+ "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)"
+ " ws_ping_interval_in_secs(%s)",
+ web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs)
self._web_socket = websocket.WebSocketApp(
web_socket_url,
@@ -194,6 +195,7 @@ class _PolicyReceiver(Thread):
self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
restarting = True
+ Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH)
_PolicyReceiver._logger.info("exit policy-receiver")
def _get_keep_running(self):
@@ -207,7 +209,7 @@ class _PolicyReceiver(Thread):
with self._lock:
if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
self._web_socket.close()
- _PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
+ _PolicyReceiver._logger.info("stopped receiving notifications from PDP")
def _on_pdp_message(self, *args):
"""received the notification from PDP"""
@@ -270,7 +272,7 @@ class _PolicyReceiver(Thread):
self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed"
self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1
_PolicyReceiver._logger.info(
- "lost connection(%s, %s) to PDP - restarting... web_socket_health %s",
+ "lost connection(%s, %s) to PDP web_socket_health %s",
code, reason, json.dumps(self._get_health(), sort_keys=True))
def _on_ws_open(self):
@@ -291,6 +293,7 @@ class _PolicyReceiver(Thread):
def _get_health(self):
"""returns the healthcheck of the web-socket as json"""
web_socket_health = copy.deepcopy(self._web_socket_health)
+ web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs
started = web_socket_health.get(_PolicyReceiver.WS_STARTED)
if started:
web_socket_health[_PolicyReceiver.WS_STARTED] = str(started)
@@ -300,7 +303,7 @@ class _PolicyReceiver(Thread):
def shutdown(self, audit):
"""Shutdown the policy-receiver"""
- _PolicyReceiver._logger.info("shutdown policy-receiver")
+ _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver"))
with self._lock:
self._keep_running = False
@@ -309,39 +312,74 @@ class _PolicyReceiver(Thread):
if self.is_alive():
self.join()
- self._policy_updater.shutdown(audit)
-
- def catch_up(self, audit):
- """need to bring the latest policies to DCAE-Controller"""
- self._policy_updater.catch_up(audit)
-
- def is_running(self):
- """check whether the policy-receiver and policy-updater are running"""
- return self.is_alive() and self._policy_updater.is_alive()
class PolicyReceiver(object):
- """policy-receiver - static singleton wrapper"""
+ """
+ policy-receiver - static singleton wrapper around two threads
+ policy_updater - master thread for all scheduled actions
+ policy_receiver - listens to policy-engine through web-socket
+ """
+ _policy_updater = None
_policy_receiver = None
@staticmethod
def is_running():
"""check whether the policy-receiver runs"""
- return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running()
+ return (PolicyReceiver._policy_receiver
+ and PolicyReceiver._policy_receiver.is_alive()
+ and PolicyReceiver._policy_updater
+ and PolicyReceiver._policy_updater.is_alive())
+
+ @staticmethod
+ def _close_receiver(audit):
+ """stop the notification-handler"""
+ if PolicyReceiver._policy_receiver:
+ policy_receiver = PolicyReceiver._policy_receiver
+ PolicyReceiver._policy_receiver = None
+ policy_receiver.shutdown(audit)
@staticmethod
def shutdown(audit):
- """Shutdown the notification-handler"""
- PolicyReceiver._policy_receiver.shutdown(audit)
+ """shutdown the notification-handler and policy-updater"""
+ PolicyReceiver._close_receiver(audit)
+ PolicyReceiver._policy_updater.shutdown(audit)
@staticmethod
def catch_up(audit):
- """bring the latest policies from policy-engine"""
- PolicyReceiver._policy_receiver.catch_up(audit)
+ """request to bring the latest policies to DCAE"""
+ PolicyReceiver._policy_updater.catch_up(audit)
+
+ @staticmethod
+ def reconfigure(audit):
+ """request to reconfigure the updated config for policy-handler"""
+ PolicyReceiver._policy_updater.reconfigure(audit)
+
+ @staticmethod
+ def _on_reconfigure(audit):
+ """act on reconfiguration event"""
+ active = ServiceActivator.is_active_mode_of_operation(audit)
+
+ if not PolicyReceiver._policy_receiver:
+ if active:
+ PolicyReceiver._policy_receiver = _PolicyReceiver(audit,
+ PolicyReceiver._policy_updater)
+ PolicyReceiver._policy_receiver.start()
+ return
+
+ if not active:
+ PolicyReceiver._close_receiver(audit)
+ return
+
+ PolicyReceiver._policy_receiver.reconfigure(audit)
+
@staticmethod
def run(audit):
- """Using policy-engine client to talk to policy engine"""
- PolicyReceiver._policy_receiver = _PolicyReceiver(audit)
- PolicyReceiver._policy_receiver.start()
+ """run policy_updater and policy_receiver"""
+ PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure)
+
+ PolicyReceiver._on_reconfigure(audit)
+
+ PolicyReceiver._policy_updater.start()
PolicyReceiver.catch_up(audit)