diff options
Diffstat (limited to 'policyhandler/policy_receiver.py')
-rw-r--r-- | policyhandler/policy_receiver.py | 102 |
1 files changed, 70 insertions, 32 deletions
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 249c1f7..7cf1869 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ from .onap.audit import Audit from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater from .policy_utils import Utils +from .service_activator import ServiceActivator LOADED_POLICIES = 'loadedPolicies' REMOVED_POLICIES = 'removedPolicies' @@ -58,12 +59,14 @@ class _PolicyReceiver(Thread): WS_MESSAGE_COUNT = "message_count" WS_MESSAGE_TIMESTAMP = "message_timestamp" WS_STATUS = "status" - WS_PING_INTERVAL_DEFAULT = 180 + WS_PING_INTERVAL_DEFAULT = 30 + WEB_SOCKET_HEALTH = "web_socket_health" - def __init__(self, audit): + def __init__(self, audit, policy_updater): """web-socket inside the thread to receive policy notifications from PolicyEngine""" Thread.__init__(self, name="policy_receiver", daemon=True) + self._policy_updater = policy_updater self._lock = Lock() self._keep_running = True self._settings = Settings(Config.FIELD_POLICY_ENGINE) @@ -83,12 +86,10 @@ class _PolicyReceiver(Thread): _PolicyReceiver.WS_STATUS: "created" } - Audit.register_item_health("web_socket_health", self._get_health) - self._reconfigure(audit) + Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health) + self.reconfigure(audit) - self._policy_updater = PolicyUpdater(self._reconfigure) - - def _reconfigure(self, audit): + def reconfigure(self, audit): """configure and reconfigure the web-socket""" with self._lock: _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format( @@ -150,7 +151,6 @@ class _PolicyReceiver(Thread): def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" - self._policy_updater.start() _PolicyReceiver._logger.info("starting policy_receiver...") websocket.enableTrace(True) restarting = False @@ -178,8 +178,9 @@ class _PolicyReceiver(Thread): ws_ping_interval_in_secs = self._ws_ping_interval_in_secs _PolicyReceiver._logger.info( - "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)", - web_socket_url, json.dumps(sslopt), tls_wss_ca_mode) + "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)" + " ws_ping_interval_in_secs(%s)", + web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs) self._web_socket = websocket.WebSocketApp( web_socket_url, @@ -194,6 +195,7 @@ class _PolicyReceiver(Thread): self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) restarting = True + Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH) _PolicyReceiver._logger.info("exit policy-receiver") def _get_keep_running(self): @@ -207,7 +209,7 @@ class _PolicyReceiver(Thread): with self._lock: if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: self._web_socket.close() - _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") + _PolicyReceiver._logger.info("stopped receiving notifications from PDP") def _on_pdp_message(self, *args): """received the notification from PDP""" @@ -270,7 +272,7 @@ class _PolicyReceiver(Thread): self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed" self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1 _PolicyReceiver._logger.info( - "lost connection(%s, %s) to PDP - restarting... web_socket_health %s", + "lost connection(%s, %s) to PDP web_socket_health %s", code, reason, json.dumps(self._get_health(), sort_keys=True)) def _on_ws_open(self): @@ -291,6 +293,7 @@ class _PolicyReceiver(Thread): def _get_health(self): """returns the healthcheck of the web-socket as json""" web_socket_health = copy.deepcopy(self._web_socket_health) + web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs started = web_socket_health.get(_PolicyReceiver.WS_STARTED) if started: web_socket_health[_PolicyReceiver.WS_STARTED] = str(started) @@ -300,7 +303,7 @@ class _PolicyReceiver(Thread): def shutdown(self, audit): """Shutdown the policy-receiver""" - _PolicyReceiver._logger.info("shutdown policy-receiver") + _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver")) with self._lock: self._keep_running = False @@ -309,39 +312,74 @@ class _PolicyReceiver(Thread): if self.is_alive(): self.join() - self._policy_updater.shutdown(audit) - - def catch_up(self, audit): - """need to bring the latest policies to DCAE-Controller""" - self._policy_updater.catch_up(audit) - - def is_running(self): - """check whether the policy-receiver and policy-updater are running""" - return self.is_alive() and self._policy_updater.is_alive() class PolicyReceiver(object): - """policy-receiver - static singleton wrapper""" + """ + policy-receiver - static singleton wrapper around two threads + policy_updater - master thread for all scheduled actions + policy_receiver - listens to policy-engine through web-socket + """ + _policy_updater = None _policy_receiver = None @staticmethod def is_running(): """check whether the policy-receiver runs""" - return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running() + return (PolicyReceiver._policy_receiver + and PolicyReceiver._policy_receiver.is_alive() + and PolicyReceiver._policy_updater + and PolicyReceiver._policy_updater.is_alive()) + + @staticmethod + def _close_receiver(audit): + """stop the notification-handler""" + if PolicyReceiver._policy_receiver: + policy_receiver = PolicyReceiver._policy_receiver + PolicyReceiver._policy_receiver = None + policy_receiver.shutdown(audit) @staticmethod def shutdown(audit): - """Shutdown the notification-handler""" - PolicyReceiver._policy_receiver.shutdown(audit) + """shutdown the notification-handler and policy-updater""" + PolicyReceiver._close_receiver(audit) + PolicyReceiver._policy_updater.shutdown(audit) @staticmethod def catch_up(audit): - """bring the latest policies from policy-engine""" - PolicyReceiver._policy_receiver.catch_up(audit) + """request to bring the latest policies to DCAE""" + PolicyReceiver._policy_updater.catch_up(audit) + + @staticmethod + def reconfigure(audit): + """request to reconfigure the updated config for policy-handler""" + PolicyReceiver._policy_updater.reconfigure(audit) + + @staticmethod + def _on_reconfigure(audit): + """act on reconfiguration event""" + active = ServiceActivator.is_active_mode_of_operation(audit) + + if not PolicyReceiver._policy_receiver: + if active: + PolicyReceiver._policy_receiver = _PolicyReceiver(audit, + PolicyReceiver._policy_updater) + PolicyReceiver._policy_receiver.start() + return + + if not active: + PolicyReceiver._close_receiver(audit) + return + + PolicyReceiver._policy_receiver.reconfigure(audit) + @staticmethod def run(audit): - """Using policy-engine client to talk to policy engine""" - PolicyReceiver._policy_receiver = _PolicyReceiver(audit) - PolicyReceiver._policy_receiver.start() + """run policy_updater and policy_receiver""" + PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure) + + PolicyReceiver._on_reconfigure(audit) + + PolicyReceiver._policy_updater.start() PolicyReceiver.catch_up(audit) |