From ebc1a062328e53e97e4d24ed111534cfc567a809 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Thu, 31 Jan 2019 16:07:48 -0500 Subject: 4.6.0 policy-handler - active-passive DCAEGEN2-931: - exposed POST /reconfigure endpoint on the web-server that initiates the reconfigure process right away DCAEGEN2-932: - mode_of_operation: active or passive = active is as before this change = in passive mode the policy-handler * closes the web-socket to PDP * skips the periodic catch_ups * still periodically checks for reconfigure * still allows usig the web-server to retrieve policies from PDP - default is active - when mode_of_operation changes from passive to active, the policy-handler invokes the catch_up right away - config-kv contains the optional override field mode_of_operation = changing the mode_of_operation in config-kv and invoking POST /reconfigure will bring the new value and change the mode of operation of the policy-handler if no service_activator section is provided in consul-kv record - if config-kv contains the service_activator section, = the policy-handler registers with service_activator - untested = and receives the mode_of_operation - untested = service_activator can POST-notify the policy-handler to initiate the /reconfigure - reduced the default web-socket ping interval from 180 to 30 seconds because PDP changed its default timeout on the web-socket from 400 seconds to 50 seconds Change-Id: If7dd21c008d9906aca97939be65dfa9c2f007535 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-931 Issue-ID: DCAEGEN2-932 --- LICENSE.txt | 2 +- policyhandler/__main__.py | 7 +- policyhandler/config.py | 4 +- policyhandler/onap/audit.py | 13 ++- policyhandler/policy_consts.py | 3 +- policyhandler/policy_receiver.py | 102 +++++++++++------ policyhandler/policy_updater.py | 62 +++++++++-- policyhandler/service_activator.py | 217 +++++++++++++++++++++++++++++++++++++ policyhandler/web_server.py | 21 +++- pom.xml | 4 +- setup.py | 4 +- tests/mock_settings.py | 5 +- version.properties | 2 +- 13 files changed, 383 insertions(+), 63 deletions(-) create mode 100644 policyhandler/service_activator.py diff --git a/LICENSE.txt b/LICENSE.txt index 14cb17c..c142ce1 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,7 +1,7 @@ /* * ============LICENSE_START========================================== * =================================================================== -* Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +* Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. * =================================================================== * * Unless otherwise specified, all software contained herein is licensed diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py index 63dc5da..798a2e1 100644 --- a/policyhandler/__main__.py +++ b/policyhandler/__main__.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ from policyhandler import LogWriter from policyhandler.config import Config from policyhandler.onap.audit import Audit from policyhandler.policy_receiver import PolicyReceiver +from policyhandler.service_activator import ServiceActivator from policyhandler.web_server import PolicyWeb @@ -47,7 +48,9 @@ def run_policy_handler(): audit = Audit(req_message="start policy handler") Config.discover(audit) - logger.info("starting policy_handler with config: %s", Config.discovered_config) + ServiceActivator.determine_mode_of_operation(audit) + logger.info(audit.info( + "starting policy_handler with config: {}".format(Config.discovered_config))) PolicyReceiver.run(audit) PolicyWeb.run_forever(audit) diff --git a/policyhandler/config.py b/policyhandler/config.py index 5184f7f..e6e74cc 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -152,6 +152,8 @@ class Config(object): CONSUL_TIMEOUT_IN_SECS = "consul_timeout_in_secs" WS_PING_INTERVAL_IN_SECS = "ws_ping_interval_in_secs" DEFAULT_TIMEOUT_IN_SECS = 60 + SERVICE_ACTIVATOR = "service_activator" + MODE_OF_OPERATION = "mode_of_operation" system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index d63d0b2..db4498a 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -185,12 +185,15 @@ class _Audit(object): @staticmethod - def register_item_health(health_name, health_getter): + def register_item_health(health_name, health_getter=None): """ register the health-checker for the additional item by its health_name and the function health_getter that returns its health status as json """ - _Audit._health_checkers[health_name] = health_getter + if health_getter: + _Audit._health_checkers[health_name] = health_getter + elif health_name in _Audit._health_checkers: + del _Audit._health_checkers[health_name] def health(self, full=False): """returns json for health check""" @@ -401,8 +404,8 @@ class Audit(_Audit): self._started = time.time() self._start_event = Audit._logger_audit.getStartRecordEvent() - self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ - .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})" + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) def audit_done(self, result=None, **kwargs): diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index cde4551..8f3ec76 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ POLICY_CONFIG = 'config' CATCH_UP = "catch_up" AUTO_CATCH_UP = "auto catch_up" +AUTO_RECONFIGURE = "auto reconfigure" LATEST_POLICIES = "latest_policies" REMOVED_POLICIES = "removed_policies" ERRORED_POLICIES = "errored_policies" diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 249c1f7..7cf1869 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ from .onap.audit import Audit from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater from .policy_utils import Utils +from .service_activator import ServiceActivator LOADED_POLICIES = 'loadedPolicies' REMOVED_POLICIES = 'removedPolicies' @@ -58,12 +59,14 @@ class _PolicyReceiver(Thread): WS_MESSAGE_COUNT = "message_count" WS_MESSAGE_TIMESTAMP = "message_timestamp" WS_STATUS = "status" - WS_PING_INTERVAL_DEFAULT = 180 + WS_PING_INTERVAL_DEFAULT = 30 + WEB_SOCKET_HEALTH = "web_socket_health" - def __init__(self, audit): + def __init__(self, audit, policy_updater): """web-socket inside the thread to receive policy notifications from PolicyEngine""" Thread.__init__(self, name="policy_receiver", daemon=True) + self._policy_updater = policy_updater self._lock = Lock() self._keep_running = True self._settings = Settings(Config.FIELD_POLICY_ENGINE) @@ -83,12 +86,10 @@ class _PolicyReceiver(Thread): _PolicyReceiver.WS_STATUS: "created" } - Audit.register_item_health("web_socket_health", self._get_health) - self._reconfigure(audit) + Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health) + self.reconfigure(audit) - self._policy_updater = PolicyUpdater(self._reconfigure) - - def _reconfigure(self, audit): + def reconfigure(self, audit): """configure and reconfigure the web-socket""" with self._lock: _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format( @@ -150,7 +151,6 @@ class _PolicyReceiver(Thread): def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" - self._policy_updater.start() _PolicyReceiver._logger.info("starting policy_receiver...") websocket.enableTrace(True) restarting = False @@ -178,8 +178,9 @@ class _PolicyReceiver(Thread): ws_ping_interval_in_secs = self._ws_ping_interval_in_secs _PolicyReceiver._logger.info( - "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)", - web_socket_url, json.dumps(sslopt), tls_wss_ca_mode) + "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)" + " ws_ping_interval_in_secs(%s)", + web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs) self._web_socket = websocket.WebSocketApp( web_socket_url, @@ -194,6 +195,7 @@ class _PolicyReceiver(Thread): self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) restarting = True + Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH) _PolicyReceiver._logger.info("exit policy-receiver") def _get_keep_running(self): @@ -207,7 +209,7 @@ class _PolicyReceiver(Thread): with self._lock: if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: self._web_socket.close() - _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") + _PolicyReceiver._logger.info("stopped receiving notifications from PDP") def _on_pdp_message(self, *args): """received the notification from PDP""" @@ -270,7 +272,7 @@ class _PolicyReceiver(Thread): self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed" self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1 _PolicyReceiver._logger.info( - "lost connection(%s, %s) to PDP - restarting... web_socket_health %s", + "lost connection(%s, %s) to PDP web_socket_health %s", code, reason, json.dumps(self._get_health(), sort_keys=True)) def _on_ws_open(self): @@ -291,6 +293,7 @@ class _PolicyReceiver(Thread): def _get_health(self): """returns the healthcheck of the web-socket as json""" web_socket_health = copy.deepcopy(self._web_socket_health) + web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs started = web_socket_health.get(_PolicyReceiver.WS_STARTED) if started: web_socket_health[_PolicyReceiver.WS_STARTED] = str(started) @@ -300,7 +303,7 @@ class _PolicyReceiver(Thread): def shutdown(self, audit): """Shutdown the policy-receiver""" - _PolicyReceiver._logger.info("shutdown policy-receiver") + _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver")) with self._lock: self._keep_running = False @@ -309,39 +312,74 @@ class _PolicyReceiver(Thread): if self.is_alive(): self.join() - self._policy_updater.shutdown(audit) - - def catch_up(self, audit): - """need to bring the latest policies to DCAE-Controller""" - self._policy_updater.catch_up(audit) - - def is_running(self): - """check whether the policy-receiver and policy-updater are running""" - return self.is_alive() and self._policy_updater.is_alive() class PolicyReceiver(object): - """policy-receiver - static singleton wrapper""" + """ + policy-receiver - static singleton wrapper around two threads + policy_updater - master thread for all scheduled actions + policy_receiver - listens to policy-engine through web-socket + """ + _policy_updater = None _policy_receiver = None @staticmethod def is_running(): """check whether the policy-receiver runs""" - return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running() + return (PolicyReceiver._policy_receiver + and PolicyReceiver._policy_receiver.is_alive() + and PolicyReceiver._policy_updater + and PolicyReceiver._policy_updater.is_alive()) + + @staticmethod + def _close_receiver(audit): + """stop the notification-handler""" + if PolicyReceiver._policy_receiver: + policy_receiver = PolicyReceiver._policy_receiver + PolicyReceiver._policy_receiver = None + policy_receiver.shutdown(audit) @staticmethod def shutdown(audit): - """Shutdown the notification-handler""" - PolicyReceiver._policy_receiver.shutdown(audit) + """shutdown the notification-handler and policy-updater""" + PolicyReceiver._close_receiver(audit) + PolicyReceiver._policy_updater.shutdown(audit) @staticmethod def catch_up(audit): - """bring the latest policies from policy-engine""" - PolicyReceiver._policy_receiver.catch_up(audit) + """request to bring the latest policies to DCAE""" + PolicyReceiver._policy_updater.catch_up(audit) + + @staticmethod + def reconfigure(audit): + """request to reconfigure the updated config for policy-handler""" + PolicyReceiver._policy_updater.reconfigure(audit) + + @staticmethod + def _on_reconfigure(audit): + """act on reconfiguration event""" + active = ServiceActivator.is_active_mode_of_operation(audit) + + if not PolicyReceiver._policy_receiver: + if active: + PolicyReceiver._policy_receiver = _PolicyReceiver(audit, + PolicyReceiver._policy_updater) + PolicyReceiver._policy_receiver.start() + return + + if not active: + PolicyReceiver._close_receiver(audit) + return + + PolicyReceiver._policy_receiver.reconfigure(audit) + @staticmethod def run(audit): - """Using policy-engine client to talk to policy engine""" - PolicyReceiver._policy_receiver = _PolicyReceiver(audit) - PolicyReceiver._policy_receiver.start() + """run policy_updater and policy_receiver""" + PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure) + + PolicyReceiver._on_reconfigure(audit) + + PolicyReceiver._policy_updater.start() PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index fb6c8b6..af1ea4b 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,11 +25,13 @@ from threading import Event, Lock, Thread from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode -from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, - POLICY_NAME, POLICY_NAMES, POLICY_VERSION) +from .policy_consts import (AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP, + POLICY_BODY, POLICY_ID, POLICY_NAME, POLICY_NAMES, + POLICY_VERSION) from .policy_matcher import PolicyMatcher from .policy_rest import PolicyRest from .policy_utils import PolicyUtils +from .service_activator import ServiceActivator from .step_timer import StepTimer @@ -172,11 +174,11 @@ class PolicyUpdater(Thread): ) self._run.set() - def _reconfigure(self): + def reconfigure(self, audit=None): """job to check for and bring in the updated config for policy-handler""" with self._lock: if not self._aud_reconfigure: - self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) + self._aud_reconfigure = audit or Audit(req_message=AUTO_RECONFIGURE) PolicyUpdater._logger.info( "%s request_id %s", self._aud_reconfigure.req_message, self._aud_reconfigure.request_id @@ -251,7 +253,7 @@ class PolicyUpdater(Thread): self._reconfigure_timer = StepTimer( "reconfigure_timer", self._reconfigure_interval, - PolicyUpdater._reconfigure, + PolicyUpdater.reconfigure, PolicyUpdater._logger, self ) @@ -293,30 +295,52 @@ class PolicyUpdater(Thread): log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id) reconfigure_result = "" try: + need_to_catch_up = False PolicyUpdater._logger.info(log_line) + + active_prev = ServiceActivator.is_active_mode_of_operation(aud_reconfigure) Config.discover(aud_reconfigure) + if not Config.discovered_config.is_changed(): + active = ServiceActivator.determine_mode_of_operation(aud_reconfigure) reconfigure_result = " -- config not changed" else: - reconfigure_result = " -- config changed for:" + changed_configs = [] + + if ServiceActivator.reconfigure(aud_reconfigure): + changed_configs.append(Config.SERVICE_ACTIVATOR) + active = ServiceActivator.determine_mode_of_operation(aud_reconfigure) + if self._set_timer_intervals(): - reconfigure_result += " timer_intervals" + changed_configs.append("timer_intervals") if PolicyRest.reconfigure(): - reconfigure_result += " " + Config.FIELD_POLICY_ENGINE + need_to_catch_up = True + changed_configs.append(Config.FIELD_POLICY_ENGINE) if DeployHandler.reconfigure(aud_reconfigure): - reconfigure_result += " " + Config.DEPLOY_HANDLER + need_to_catch_up = True + changed_configs.append(Config.DEPLOY_HANDLER) if self._reconfigure_receiver(aud_reconfigure): - reconfigure_result += " web-socket" + need_to_catch_up = True + changed_configs.append("web-socket") + + reconfigure_result = " -- config changed on {} changes: {}".format( + json.dumps(changed_configs), Config.discovered_config) - reconfigure_result += " -- change: {}".format(Config.discovered_config) + need_to_catch_up = need_to_catch_up or (active and not active_prev) + if need_to_catch_up: + reconfigure_result += " -- going to catch_up" Config.discovered_config.commit_change() aud_reconfigure.audit_done(result=reconfigure_result) PolicyUpdater._logger.info(log_line + reconfigure_result) + if need_to_catch_up: + self._pause_catch_up_timer() + self.catch_up() + except Exception as ex: error_msg = "crash {} {}{}: {}: {}".format( "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) @@ -344,6 +368,20 @@ class PolicyUpdater(Thread): if not aud_catch_up: return False + if not ServiceActivator.is_active_mode_of_operation(aud_catch_up): + catch_up_result = "passive -- skip catch_up {0} request_id {1}".format( + aud_catch_up.req_message, aud_catch_up.request_id + ) + self._pause_catch_up_timer() + aud_catch_up.audit_done(result=catch_up_result) + PolicyUpdater._logger.info(catch_up_result) + self._run_catch_up_timer() + + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_catch_up.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) + return False + log_line = "catch_up {0} request_id {1}".format( aud_catch_up.req_message, aud_catch_up.request_id ) diff --git a/policyhandler/service_activator.py b/policyhandler/service_activator.py new file mode 100644 index 0000000..d51d11c --- /dev/null +++ b/policyhandler/service_activator.py @@ -0,0 +1,217 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +""" + ask service_activator for the mode_of_operation + that is whether the current site/cluster is active versus passive + + active is the default and expects the polisy-handler + to receive the push notifications from policy-engine + as well as to periodically run the catch_up process + + passive expects the polisy-handler + to stop listening for the policy-updates from the policy-engine + and to stop doing the periodic catch_up +""" + +import json +import logging +from copy import deepcopy +from urllib.parse import urljoin + +import requests + +from .config import Config, Settings +from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics +from .policy_consts import TARGET_ENTITY + + +class ServiceActivator(object): + """calling the service_activator web api to determine the mode_of_operation""" + _logger = logging.getLogger("policy_handler.service_activator") + DEFAULT_TARGET_ENTITY = "service_activator" + DEFAULT_TIMEOUT_IN_SECS = 10 + MODE_OF_OPERATION_ACTIVE = "active" + + _lazy_inited = False + _settings = Settings(Config.MODE_OF_OPERATION, Config.SERVICE_ACTIVATOR) + + _mode_of_operation = None + _url = None + _url_register = None + _post_register = {} + _target_entity = None + _custom_kwargs = {} + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS + + + @staticmethod + def _init(audit): + """ + initialize service-activator client config based on discovered config: + + "mode_of_operation" : "active", + "service_activator" : { + "target_entity" : "service_activator", + "url" : "https://service-activator-service:123", + "path_register" : "/register", + "tls_ca_mode" : "cert_directory", + "timeout_in_secs": 20, + "post_register" : { + "component_name" : "policy_handler", + "reconfigure_path" : "/reconfigure", + "http_protocol" : "http" + } + } + """ + ServiceActivator._custom_kwargs = {} + ServiceActivator._url = ServiceActivator._url_register = "" + + try: + _, ServiceActivator._mode_of_operation = ServiceActivator._settings.get_by_key( + Config.MODE_OF_OPERATION, ServiceActivator._mode_of_operation) + + _, config_sa = ServiceActivator._settings.get_by_key(Config.SERVICE_ACTIVATOR) + if config_sa and isinstance(config_sa, dict): + ServiceActivator._target_entity = config_sa.get( + TARGET_ENTITY, ServiceActivator.DEFAULT_TARGET_ENTITY) + ServiceActivator._url = config_sa.get("url", "") + if ServiceActivator._url: + ServiceActivator._url_register = urljoin(ServiceActivator._url, + config_sa.get("path_register", "")) + ServiceActivator._post_register = deepcopy(config_sa.get("post_register", {})) + tls_ca_mode = config_sa.get(Config.TLS_CA_MODE) + ServiceActivator._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + + ServiceActivator._logger.info(audit.info( + "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)", + ServiceActivator._target_entity, ServiceActivator._url_register, + tls_ca_mode, json.dumps(ServiceActivator._custom_kwargs))) + + ServiceActivator._timeout_in_secs = config_sa.get(Config.TIMEOUT_IN_SECS) + if not ServiceActivator._timeout_in_secs or ServiceActivator._timeout_in_secs < 1: + ServiceActivator._timeout_in_secs = ServiceActivator.DEFAULT_TIMEOUT_IN_SECS + + ServiceActivator._settings.commit_change() + except Exception: + pass + ServiceActivator._lazy_inited = True + + @staticmethod + def reconfigure(audit): + """reconfigure""" + ServiceActivator._settings.set_config(Config.discovered_config) + if not ServiceActivator._settings.is_changed(): + ServiceActivator._settings.commit_change() + return False + + ServiceActivator._lazy_inited = False + ServiceActivator._init(audit) + return True + + @staticmethod + def _lazy_init(audit): + """set config""" + if ServiceActivator._lazy_inited: + return + + ServiceActivator._settings.set_config(Config.discovered_config) + ServiceActivator._init(audit) + + @staticmethod + def is_active_mode_of_operation(audit): + """ + mode_of_operation - whether the service is + active == True or passive == False + based on the current value of the mode_of_operation + """ + active = (ServiceActivator._mode_of_operation is None + or ServiceActivator._mode_of_operation + == ServiceActivator.MODE_OF_OPERATION_ACTIVE) + ServiceActivator._logger.info(audit.info( + "mode_of_operation = {} active = {}".format( + ServiceActivator._mode_of_operation, active))) + return active + + @staticmethod + def determine_mode_of_operation(audit): + """retrieves the mode_of_operation from service_activator""" + try: + ServiceActivator._lazy_init(audit) + + target_entity = ServiceActivator._target_entity + + if not ServiceActivator._url: + ServiceActivator._logger.info(audit.info( + "no url found for {}".format(target_entity))) + return ServiceActivator.is_active_mode_of_operation(audit) + + url = ServiceActivator._url_register + json_body = deepcopy(ServiceActivator._post_register) + timeout_in_secs = ServiceActivator._timeout_in_secs + custom_kwargs = deepcopy(ServiceActivator._custom_kwargs) + + metrics = Metrics(aud_parent=audit, + targetEntity="{} determine_mode_of_operation".format(target_entity), + targetServiceName=url) + headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + + log_action = "post to {} at {}".format(target_entity, url) + log_data = "headers={}, json_body={}, timeout_in_secs={}, custom_kwargs({})".format( + json.dumps(headers), json.dumps(json_body), timeout_in_secs, + json.dumps(custom_kwargs)) + log_line = log_action + " " + log_data + + ServiceActivator._logger.info(log_line) + metrics.metrics_start(log_line) + + res = None + try: + res = requests.post(url, json=json_body, headers=headers, timeout=timeout_in_secs, + **custom_kwargs) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = "failed to {} {}: {} {}".format( + log_action, type(ex).__name__, str(ex), log_data) + ServiceActivator._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return ServiceActivator.is_active_mode_of_operation(audit) + + metrics.set_http_status_code(res.status_code) + audit.set_http_status_code(res.status_code) + + log_line = "response {} from {}: text={} {}".format( + res.status_code, log_action, res.text, log_data) + metrics.metrics(log_line) + + if res.status_code != requests.codes.ok: + ServiceActivator._logger.error(log_line) + return ServiceActivator.is_active_mode_of_operation(audit) + + result = res.json() or {} + + ServiceActivator._mode_of_operation = (result.get(Config.MODE_OF_OPERATION) + or ServiceActivator._mode_of_operation) + return ServiceActivator.is_active_mode_of_operation(audit) + + except Exception as ex: + return ServiceActivator.is_active_mode_of_operation(audit) diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 73e7fbc..dc76353 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -208,7 +208,24 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) PolicyReceiver.catch_up(audit) - res = {"catch-up requested": started} + res = {"catch-up requested": started, "request_id": audit.request_id} + PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res)) + audit.info_requested(started) + return res + + @cherrypy.expose + @cherrypy.tools.json_out() + def reconfigure(self): + """schedule reconfigure""" + started = str(datetime.utcnow()) + req_info = _PolicyWeb._get_request_info(cherrypy.request) + audit = Audit(job_name="reconfigure", req_message=req_info, + headers=cherrypy.request.headers) + + PolicyWeb.logger.info("%s", req_info) + PolicyReceiver.reconfigure(audit) + + res = {"reconfigure requested": started, "request_id": audit.request_id} PolicyWeb.logger.info("requested %s: %s", req_info, json.dumps(res)) audit.info_requested(started) return res diff --git a/pom.xml b/pom.xml index 97fc4e4..5b7f7a2 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@