diff options
author | Alex Shatov <alexs@att.com> | 2018-08-24 13:15:04 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-08-24 13:15:04 -0400 |
commit | 1d693376205c66af93283d04e8e9740c947a7d02 (patch) | |
tree | 9188af307614661c1afbe50cdaa2fa8a2cdc691c /policyhandler | |
parent | 1cddbc70e4799970dc606014ef79e025d6a8e722 (diff) |
4.2.0 policy-handler - periodic reconfigure
- reconfigure == periodically retrieve the policy-handler config
from consul-kv and compare to previous config and subconfigs.
If changed, reconfigure the subunits
- selectively change one or any settings for the following
= catch_up timer interval
= reconfigure timer interval
= deployment-handler url and params (thread-safe)
= policy-engine url and params (thread-safe)
= web-socket url to policy-engine (through a callback)
- each subunit has its own Settings that keep track of changes
- try-catch and metrics around discovery - consul API
- hidden the secrets from logs
- froze the web-socket version to 0.49.0 because 0.50.0
and 0.51.0 are broken - looking around for stable alternatives
- fixed-adapted the callbacks passed to the web-socket lib
that changed its API in 0.49.0 and later
- log the stack on the exception occurring in the web-socket lib
- unit test refactoring
Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-470
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/__main__.py | 12 | ||||
-rw-r--r-- | policyhandler/config.py | 174 | ||||
-rw-r--r-- | policyhandler/deploy_handler.py | 106 | ||||
-rw-r--r-- | policyhandler/discovery.py | 119 | ||||
-rw-r--r-- | policyhandler/onap/audit.py | 3 | ||||
-rw-r--r-- | policyhandler/policy_consts.py | 1 | ||||
-rw-r--r-- | policyhandler/policy_receiver.py | 66 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 111 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 144 | ||||
-rw-r--r-- | policyhandler/policy_utils.py | 12 | ||||
-rw-r--r-- | policyhandler/step_timer.py | 4 |
11 files changed, 556 insertions, 196 deletions
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py index 04ca657..63dc5da 100644 --- a/policyhandler/__main__.py +++ b/policyhandler/__main__.py @@ -35,20 +35,20 @@ from policyhandler.web_server import PolicyWeb def run_policy_handler(): """main run function for policy-handler""" - Config.load_from_file() - Config.discover() + Config.init_config() logger = logging.getLogger("policy_handler") sys.stdout = LogWriter(logger.info) sys.stderr = LogWriter(logger.error) logger.info("========== run_policy_handler ========== %s", __package__) - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) - - logger.info("starting policy_handler with config:") - logger.info(Audit.log_json_dumps(Config.settings)) + Audit.init(Config.system_name, Config.LOGGER_CONFIG_FILE_PATH) audit = Audit(req_message="start policy handler") + + Config.discover(audit) + logger.info("starting policy_handler with config: %s", Config.discovered_config) + PolicyReceiver.run(audit) PolicyWeb.run_forever(audit) diff --git a/policyhandler/config.py b/policyhandler/config.py index 8e6edf9..3d68235 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -25,64 +25,131 @@ import logging.config import os from .discovery import DiscoveryClient +from .onap.audit import Audit +from .policy_utils import Utils + +LOGS_DIR = 'logs' + +try: + os.makedirs(LOGS_DIR, mode=0o770, exist_ok=True) +except Exception: + pass logging.basicConfig( - filename='logs/policy_handler.log', \ - format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ - '%(threadName)s %(name)s.%(funcName)s: %(message)s', \ + filename=os.path.join(LOGS_DIR, 'policy_handler.log'), + format=('%(asctime)s.%(msecs)03d %(levelname)+8s ' + + '%(threadName)s %(name)s.%(funcName)s: %(message)s'), datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) +class Settings(object): + """settings of module or an application + that is the config filtered by the collection of config-keys. + + keeps track of changes versus the previous set_config unless committed + """ + def __init__(self, *config_keys): + """provide the collection of top level keys in config to limit the config""" + self._config_keys = config_keys + self._changed = False + self._config = None + self._prev_config = None + + def __str__(self): + """get str of the config""" + return Audit.json_dumps({ + "config_keys": self._config_keys, + "changed": self._changed, + "config": self._config, + "prev_config": self._prev_config + }) + + def is_loaded(self): + """whether loaded already""" + return bool(self._config) + + def commit_change(self): + """set the prev config to the latest config""" + self._prev_config = copy.deepcopy(self._config) + self._changed = False + + def _set_changed(self): + """determine whether the config changed""" + self._changed = not (self._prev_config + and Utils.are_the_same(self._prev_config, self._config, + Audit.json_dumps)) + + def set_config(self, config, auto_commit=False): + """update the config""" + self.commit_change() + + if isinstance(config, Settings): + config = config._config + + if not isinstance(config, dict): + config = {} + + self._config = copy.deepcopy(dict((k, v) for (k, v) in config.items() + if not self._config_keys or k in self._config_keys)) + + if auto_commit: + self.commit_change() + else: + self._set_changed() + + def is_changed(self): + """whether the config has changed""" + return self._changed + + def get_by_key(self, config_key, default=None): + """get the latest sub config by config_key and whether it has changed""" + if not config_key or not isinstance(config_key, str): + return False, default + + value = copy.deepcopy(self._config.get(config_key, default)) + if not self._prev_config: + return True, value + prev_value = self._prev_config.get(config_key, default) + return self._changed and not Utils.are_the_same(prev_value, value, Audit.json_dumps), value + + def update(self, config_key, value=None): + """set the latest sub config by config_key and determine whether the config has changed""" + if not config_key: + return + + self._config[config_key] = copy.deepcopy(value) + self._set_changed() + + class Config(object): """main config of the application""" + _logger = logging.getLogger("policy_handler.config") CONFIG_FILE_PATH = "etc/config.json" LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" SERVICE_NAME_POLICY_HANDLER = "policy_handler" + FIELD_SYSTEM = "system" FIELD_WSERVICE_PORT = "wservice_port" FIELD_POLICY_ENGINE = "policy_engine" + POOL_CONNECTIONS = "pool_connections" + DEPLOY_HANDLER = "deploy_handler" + THREAD_POOL_SIZE = "thread_pool_size" + POLICY_RETRY_COUNT = "policy_retry_count" + POLICY_RETRY_SLEEP = "policy_retry_sleep" + RECONFIGURE = "reconfigure" + TIMER_INTERVAL = "interval" + + system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 - _logger = logging.getLogger("policy_handler.config") - settings = None + _local_config = Settings() + discovered_config = Settings() @staticmethod - def merge(new_config): - """merge the new_config into current config - override the values""" - if not new_config: - return - - if not Config.settings: - Config.settings = new_config - return - - new_config = copy.deepcopy(new_config) - Config.settings.update(new_config) - - @staticmethod - def get_system_name(): - """find the name of the policy-handler system - to be used as the key in consul-kv for config of policy-handler - """ - return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) - - @staticmethod - def discover(): - """bring and merge the config settings from the discovery service""" - discovery_key = Config.get_system_name() - new_config = DiscoveryClient.get_value(discovery_key) - - if not new_config or not isinstance(new_config, dict): - Config._logger.warning("unexpected config from discovery: %s", new_config) + def init_config(file_path=None): + """read and store the config from config file""" + if Config._local_config.is_loaded(): + Config._logger.info("config already inited: %s", Config._local_config) return - Config._logger.debug("loaded config from discovery(%s): %s", \ - discovery_key, json.dumps(new_config)) - Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings)) - Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings)) - - @staticmethod - def load_from_file(file_path=None): - """read and store the config from config file""" if not file_path: file_path = Config.CONFIG_FILE_PATH @@ -101,6 +168,25 @@ class Config(object): logging.config.dictConfig(logging_config) Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) - Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("config loaded from file: %s", json.dumps(Config.settings)) - return True + + local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER) + Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) + + Config._local_config.set_config(local_config, auto_commit=True) + Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config) + + @staticmethod + def discover(audit): + """bring and merge the config settings from the discovery service""" + discovery_key = Config.system_name + new_config = DiscoveryClient.get_value(audit, discovery_key) + + if not new_config or not isinstance(new_config, dict): + Config._logger.warning("unexpected config from discovery: %s", new_config) + return + + Config._logger.debug("loaded config from discovery(%s): %s", + discovery_key, Audit.json_dumps(new_config)) + + Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) + Config._logger.info("config from discovery: %s", Config.discovered_config) diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 6d258f2..e308c1a 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -21,17 +21,18 @@ import json import logging from copy import copy, deepcopy +from threading import Lock import requests -from .config import Config +from .config import Config, Settings from .customize import CustomizerUser from .discovery import DiscoveryClient from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, POLICY_FILTER_MATCHES, POLICY_FILTERS, - REMOVED_POLICIES) + REMOVED_POLICIES, TARGET_ENTITY) class PolicyUpdateMessage(object): @@ -144,7 +145,11 @@ class PolicyUpdateMessage(object): class DeployHandler(object): """calling the deployment-handler web apis""" _logger = logging.getLogger("policy_handler.deploy_handler") + DEFAULT_TARGET_ENTITY = "deployment_handler" + _lazy_inited = False + _lock = Lock() + _settings = Settings(Config.POOL_CONNECTIONS, Config.DEPLOY_HANDLER) _requests_session = None _url = None @@ -157,11 +162,8 @@ class DeployHandler(object): server_instance_changed = False @staticmethod - def _lazy_init(audit, rediscover=False): - """ set static properties """ - if DeployHandler._lazy_inited and not rediscover: - return - + def _init(audit): + """set config""" DeployHandler._custom_kwargs = (CustomizerUser.get_customizer() .get_deploy_handler_kwargs(audit)) if (not DeployHandler._custom_kwargs @@ -169,18 +171,18 @@ class DeployHandler(object): DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: - pool_size = Config.settings.get("pool_connections", 20) DeployHandler._requests_session = requests.Session() + + changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10) + if changed: DeployHandler._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) DeployHandler._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) - config_dh = Config.settings.get("deploy_handler") + _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER) if config_dh and isinstance(config_dh, dict): # dns based routing to deployment-handler # config for policy-handler >= 2.4.0 @@ -192,7 +194,8 @@ class DeployHandler(object): # "cfy_tenant_name" : "default_tenant" # } # } - DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") + DeployHandler._target_entity = config_dh.get(TARGET_ENTITY, + DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = config_dh.get("url") DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", DeployHandler._max_msg_length_mb) @@ -205,19 +208,45 @@ class DeployHandler(object): if not isinstance(config_dh, dict): # config for policy-handler <= 2.3.1 # "deploy_handler" : "deployment_handler" - DeployHandler._target_entity = str(config_dh or "deployment_handler") + DeployHandler._target_entity = str(config_dh or DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity) DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' - DeployHandler._logger.info( - "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy) + DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity, + DeployHandler._url_policy, DeployHandler._settings) + DeployHandler._settings.commit_change() DeployHandler._lazy_inited = bool(DeployHandler._url) + @staticmethod + def reconfigure(audit): + """reconfigure""" + with DeployHandler._lock: + DeployHandler._settings.set_config(Config.discovered_config) + if not DeployHandler._settings.is_changed(): + DeployHandler._settings.commit_change() + return False + + DeployHandler._lazy_inited = False + DeployHandler._init(audit) + return True @staticmethod - def policy_update(audit, policy_update_message, rediscover=False): + def _lazy_init(audit): + """set config""" + if DeployHandler._lazy_inited: + return + + with DeployHandler._lock: + if DeployHandler._lazy_inited: + return + + DeployHandler._settings.set_config(Config.discovered_config) + DeployHandler._init(audit) + + @staticmethod + def policy_update(audit, policy_update_message): """ segments the big policy_update_message limited by size and sequatially sends each segment as put to deployment-handler at /policy. @@ -227,7 +256,7 @@ class DeployHandler(object): if not policy_update_message or policy_update_message.empty(): return - DeployHandler._lazy_init(audit, rediscover) + DeployHandler._lazy_init(audit) str_metrics = "policy_update {0}".format(str(policy_update_message)) @@ -277,11 +306,14 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.put( - DeployHandler._url_policy, json=message, - headers=headers, params=DeployHandler._query, - **DeployHandler._custom_kwargs - ) + with DeployHandler._lock: + session = DeployHandler._requests_session + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + res = session.put(url, json=message, + headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -311,12 +343,12 @@ class DeployHandler(object): @staticmethod - def get_deployed_policies(audit, rediscover=False): + def get_deployed_policies(audit): """ Retrieves policies and policy-filters from components that were deployed by deployment-handler """ - DeployHandler._lazy_init(audit, rediscover) + DeployHandler._lazy_init(audit) metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_policy) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} @@ -338,10 +370,13 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.get( - DeployHandler._url_policy, headers=headers, params=DeployHandler._query, - **DeployHandler._custom_kwargs - ) + with DeployHandler._lock: + session = DeployHandler._requests_session + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + res = session.get(url, headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -390,7 +425,6 @@ class DeployHandler(object): and prev_server_instance_uuid != DeployHandler._server_instance_uuid): DeployHandler.server_instance_changed = True - log_line = ("deployment_handler_changed: {1} != {0}" - .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) - metrics.info(log_line) - DeployHandler._logger.info(log_line) + DeployHandler._logger.info(metrics.info( + "deployment_handler_changed: {1} != {0}" + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))) diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index ce24c3d..4e6bc3d 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -25,6 +25,8 @@ import logging import requests from .customize import CustomizerUser +from .onap.audit import AuditHttpCode, Metrics + class DiscoveryClient(object): """talking to consul at http://consul:8500 @@ -40,56 +42,103 @@ class DiscoveryClient(object): -p <outport>:<innerport> ${APPNAME}:latest """ + CONSUL_ENTITY = "consul" CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}" CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}" _logger = logging.getLogger("policy_handler.discovery") @staticmethod - def get_service_url(audit, service_name): + def _discover_service(audit, service_name, service_path): """find the service record in consul""" - service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name) - log_line = "discover {0}".format(service_path) - DiscoveryClient._logger.info(log_line) - audit.info(log_line) response = requests.get(service_path) - - log_line = "response {0} for {1}: {2}".format( - response.status_code, service_path, response.text) - DiscoveryClient._logger.info(log_line) - audit.info(log_line) + DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format( + response.status_code, service_path, response.text))) response.raise_for_status() + status_code = response.status_code + service = response.json()[0] + return (status_code, + CustomizerUser.get_customizer().get_service_url(audit, service_name, service)) + + @staticmethod + def get_service_url(audit, service_name): + """find the service record in consul""" + service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name) + metrics = Metrics(aud_parent=audit, targetEntity=DiscoveryClient.CONSUL_ENTITY, + targetServiceName=service_path) + + log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, service_path) - service = response.json() - if not service: - log_line = "failed discover {0}".format(service_path) - DiscoveryClient._logger.error(log_line) - audit.error(log_line) - return - service = service[0] + DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + status_code = None + try: + (status_code, + service_url) = DiscoveryClient._discover_service(audit, service_name, service_path) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, + type(ex).__name__, str(ex))) + DiscoveryClient._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None - service_url = CustomizerUser.get_customizer().get_service_url(audit, service_name, service) if not service_url: - log_line = "failed to get service_url for {0}".format(service_name) - DiscoveryClient._logger.error(log_line) - audit.error(log_line) - return - - log_line = "got service_url: {0} for {1}".format(service_url, service_name) - DiscoveryClient._logger.info(log_line) - audit.info(log_line) + error_code = AuditHttpCode.DATA_ERROR.value + error_msg = "failed {}/{} to {}".format(status_code, error_code, log_line) + DiscoveryClient._logger.error(audit.error(error_msg)) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} {}".format(status_code, log_line) + DiscoveryClient._logger.info(audit.info("got service_url: {} after {}" + .format(service_url, log_line))) + + metrics.set_http_status_code(status_code) + audit.set_http_status_code(status_code) + metrics.metrics(log_line) return service_url @staticmethod - def get_value(key): - """get the value for the key from consul-kv""" - response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key)) + def _get_value_from_kv(url): + """get the value from consul-kv at discovery url""" + response = requests.get(url) response.raise_for_status() data = response.json() - if not data: - DiscoveryClient._logger.error("failed get_value %s", key) - return value = base64.b64decode(data[0]["Value"]).decode("utf-8") - DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s", - key, value, json.dumps(data)) - return json.loads(value) + return response.status_code, json.loads(value) + + @staticmethod + def get_value(audit, key): + """get the value for the key from consul-kv""" + discovery_url = DiscoveryClient.CONSUL_KV_MASK.format(key) + metrics = Metrics(aud_parent=audit, targetEntity=DiscoveryClient.CONSUL_ENTITY, + targetServiceName=discovery_url) + + log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url) + + DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + try: + status_code, value = DiscoveryClient._get_value_from_kv(discovery_url) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, + type(ex).__name__, str(ex))) + DiscoveryClient._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} {}".format(status_code, log_line) + metrics.set_http_status_code(status_code) + audit.set_http_status_code(status_code) + metrics.metrics(log_line) + return value diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index db85c18..1bee4a7 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -339,7 +339,7 @@ class _Audit(object): return obj @staticmethod - def log_json_dumps(obj, **kwargs): + def json_dumps(obj, **kwargs): """hide the known secret field values of the dictionary and return json.dumps""" if not isinstance(obj, dict): return json.dumps(obj, **kwargs) @@ -455,6 +455,7 @@ class Metrics(_Audit): self.info(log_line, **self.merge_all_kwargs(**kwargs)) _Audit._health.start(self._metrics_name, self.request_id) _Audit._health.start(METRICS_TOTAL_STATS, self.request_id) + return log_line def metrics(self, log_line, **kwargs): diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 51ac173..cde4551 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -36,3 +36,4 @@ POLICY_VERSIONS = "policy_versions" MATCHING_CONDITIONS = "matchingConditions" POLICY_NAMES = "policy_names" POLICY_FILTER_MATCHES = "policy_filter_matches" +TARGET_ENTITY = "target_entity" diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index bb33cd5..1edb24d 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -32,7 +32,7 @@ from threading import Lock, Thread import websocket -from .config import Config +from .config import Config, Settings from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater @@ -47,25 +47,50 @@ class _PolicyReceiver(Thread): def __init__(self): """web-socket inside the thread to receive policy notifications from PolicyEngine""" - Thread.__init__(self, name="policy_receiver") - self.daemon = True + Thread.__init__(self, name="policy_receiver", daemon=True) self._lock = Lock() self._keep_running = True + self._settings = Settings(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] - self.web_socket_url = resturl = config["url"] + config["path_pdp"] - - if resturl.startswith("https:"): - self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" - else: - self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" - + self._web_socket_url = None self._web_socket = None + self.reconfigure() - self._policy_updater = PolicyUpdater() + self._policy_updater = PolicyUpdater(self.reconfigure) self._policy_updater.start() + def reconfigure(self): + """configure and reconfigure the web-socket""" + with self._lock: + self._settings.set_config(Config.discovered_config) + changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + + if not changed: + self._settings.commit_change() + return False + + prev_web_socket_url = self._web_socket_url + resturl = config.get("url", "") + config.get("path_pdp", "") + + if resturl.startswith("https:"): + self._web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self._web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + if self._web_socket_url == prev_web_socket_url: + _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + return False + + _PolicyReceiver._logger.info("changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + + self._stop_notifications() + return True + def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" websocket.enableTrace(True) @@ -80,9 +105,9 @@ class _PolicyReceiver(Thread): time.sleep(5) _PolicyReceiver._logger.info( - "connecting to policy-notifications at: %s", self.web_socket_url) + "connecting to policy-notifications at: %s", self._web_socket_url) self._web_socket = websocket.WebSocketApp( - self.web_socket_url, + self._web_socket_url, on_message=self._on_pdp_message, on_close=self._on_ws_close, on_error=self._on_ws_error @@ -101,15 +126,16 @@ class _PolicyReceiver(Thread): return keep_running def _stop_notifications(self): - """Shuts down the AutoNotification service if running.""" + """close the web-socket == stops the notification service if running.""" with self._lock: if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: self._web_socket.close() _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") - def _on_pdp_message(self, _, message): + def _on_pdp_message(self, *args): """received the notification from PDP""" try: + message = args and args[-1] _PolicyReceiver._logger.info("Received notification message: %s", message) if not message: return @@ -144,13 +170,13 @@ class _PolicyReceiver(Thread): _PolicyReceiver._logger.exception(error_msg) - def _on_ws_error(self, _, error): + def _on_ws_error(self, error): """report an error""" - _PolicyReceiver._logger.error("policy-notification error: %s", error) + _PolicyReceiver._logger.exception("policy-notification error: %s", str(error)) - def _on_ws_close(self, _): + def _on_ws_close(self, code, reason): """restart web-socket on close""" - _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason) def shutdown(self, audit): """Shutdown the policy-receiver""" diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 6ec982a..d10f4bf 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -23,10 +23,11 @@ import json import logging import time from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock import requests -from .config import Config +from .config import Config, Settings from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, @@ -51,6 +52,11 @@ class PolicyRest(object): EXPECTED_VERSIONS = "expected_versions" IGNORE_POLICY_NAMES = "ignore_policy_names" + _lock = Lock() + _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, + Config.THREAD_POOL_SIZE, + Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) + _requests_session = None _url_get_config = None _headers = None @@ -60,39 +66,69 @@ class PolicyRest(object): _policy_retry_sleep = 0 @staticmethod - def _lazy_init(): + def _init(): """init static config""" - if PolicyRest._lazy_inited: - return - PolicyRest._lazy_inited = True + _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] + if not PolicyRest._requests_session: + PolicyRest._requests_session = requests.Session() - pool_size = Config.settings.get("pool_connections", 20) - PolicyRest._requests_session = requests.Session() - PolicyRest._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) - PolicyRest._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) + if changed: + PolicyRest._requests_session.mount( + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) + PolicyRest._requests_session.mount( + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) PolicyRest._url_get_config = (config["url"] + config["path_api"] + PolicyRest.POLICY_GET_CONFIG) PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) + _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( + Config.THREAD_POOL_SIZE, 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) + _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_COUNT, 1) + _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_SLEEP, 0) - PolicyRest._logger.info( - "PolicyClient url(%s) headers(%s)", - PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers)) + PolicyRest._logger.info("PDP url(%s) headers(%s): %s", + PolicyRest._url_get_config, + Metrics.json_dumps(PolicyRest._headers), + PolicyRest._settings) + + PolicyRest._settings.commit_change() + PolicyRest._lazy_inited = True + + @staticmethod + def reconfigure(): + """reconfigure""" + with PolicyRest._lock: + PolicyRest._settings.set_config(Config.discovered_config) + if not PolicyRest._settings.is_changed(): + PolicyRest._settings.commit_change() + return False + + PolicyRest._lazy_inited = False + PolicyRest._init() + return True + + @staticmethod + def _lazy_init(): + """init static config""" + if PolicyRest._lazy_inited: + return + + with PolicyRest._lock: + if PolicyRest._lazy_inited: + return + + PolicyRest._settings.set_config(Config.discovered_config) + PolicyRest._init() @staticmethod def _pdp_get_config(audit, json_body): @@ -100,26 +136,27 @@ class PolicyRest(object): metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) - msg = json.dumps(json_body) - headers = copy.copy(PolicyRest._headers) + with PolicyRest._lock: + session = PolicyRest._requests_session + url = PolicyRest._url_get_config + headers = copy.deepcopy(PolicyRest._headers) + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id - headers_str = Metrics.log_json_dumps(headers) + headers_str = Metrics.json_dumps(headers) + + msg = json.dumps(json_body) + log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str) + + PolicyRest._logger.info(metrics.metrics_start(log_line)) - log_line = "post to PDP {0} msg={1} headers={2}".format( - PolicyRest._url_get_config, msg, headers_str) - metrics.metrics_start(log_line) - PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post( - PolicyRest._url_get_config, json=json_body, headers=headers) + res = session.post(url, json=json_body, headers=headers) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) - error_msg = ( - "failed to post to PDP {0} {1}: {2} msg={3} headers={4}" - .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str)) + error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) PolicyRest._logger.exception(error_msg) metrics.set_http_status_code(error_code) @@ -127,9 +164,9 @@ class PolicyRest(object): metrics.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( - PolicyRest._url_get_config, res.status_code, msg, res.text, - Metrics.log_json_dumps(dict(res.request.headers.items()))) + log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format( + res.status_code, url, msg, res.text, + Metrics.json_dumps(dict(res.request.headers.items()))) status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 7733146..8909cc7 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -22,7 +22,7 @@ import json import logging from threading import Event, Lock, Thread -from .config import Config +from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, @@ -115,21 +115,45 @@ class PolicyUpdater(Thread): """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" _logger = logging.getLogger("policy_handler.policy_updater") - def __init__(self): + def __init__(self, on_reconfigure_receiver): """init static config of PolicyUpdater.""" - Thread.__init__(self, name="policy_updater") - self.daemon = True + Thread.__init__(self, name="policy_updater", daemon=True) + self._reconfigure_receiver = on_reconfigure_receiver self._lock = Lock() self._run = Event() + self._settings = Settings(CATCH_UP, Config.RECONFIGURE) self._catch_up_timer = None + self._reconfigure_timer = None + self._aud_shutdown = None self._aud_catch_up = None + self._aud_reconfigure = None self._policy_update = _PolicyUpdate() - catch_up_config = Config.settings.get(CATCH_UP, {}) - self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_interval = None + self._reconfigure_interval = None + self._set_timer_intervals() + + def _set_timer_intervals(self): + """set intervals on timers""" + self._settings.set_config(Config.discovered_config) + if not self._settings.is_changed(): + self._settings.commit_change() + return False + + _, catch_up = self._settings.get_by_key(CATCH_UP, {}) + self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60 + + _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) + self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 + + PolicyUpdater._logger.info( + "intervals: catch_up(%s) reconfigure(%s): %s", + self._catch_up_interval, self._reconfigure_interval, self._settings) + self._settings.commit_change() + return True def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" @@ -148,8 +172,20 @@ class PolicyUpdater(Thread): ) self._run.set() + def _reconfigure(self): + """job to check for and bring in the updated config for policy-handler""" + with self._lock: + if not self._aud_reconfigure: + self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) + PolicyUpdater._logger.info( + "reconfigure %s request_id %s", + self._aud_reconfigure.req_message, self._aud_reconfigure.request_id + ) + self._run.set() + def run(self): """wait and run the policy-update in thread""" + self._run_reconfigure_timer() while True: PolicyUpdater._logger.info("waiting for policy-updates...") self._run.wait() @@ -160,6 +196,11 @@ class PolicyUpdater(Thread): if not self._keep_running(): break + self._on_reconfigure() + + if not self._keep_running(): + break + if self._on_catch_up(): continue @@ -183,7 +224,7 @@ class PolicyUpdater(Thread): if self._catch_up_timer: self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) - self._catch_up_timer.next() + self._catch_up_timer.next(self._catch_up_interval) return self._catch_up_timer = StepTimer( @@ -196,14 +237,34 @@ class PolicyUpdater(Thread): self._logger.info("started catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.start() + def _run_reconfigure_timer(self): + """create and start the reconfigure timer""" + if not self._reconfigure_interval: + return + + if self._reconfigure_timer: + self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.next(self._reconfigure_interval) + return + + self._reconfigure_timer = StepTimer( + "reconfigure_timer", + self._reconfigure_interval, + PolicyUpdater._reconfigure, + PolicyUpdater._logger, + self + ) + self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.start() + def _pause_catch_up_timer(self): """pause catch_up_timer""" if self._catch_up_timer: self._logger.info("pause catch_up_timer") self._catch_up_timer.pause() - def _stop_catch_up_timer(self): - """stop and destroy the catch_up_timer""" + def _stop_timers(self): + """stop and destroy the catch_up and reconfigure timers""" if self._catch_up_timer: self._logger.info("stopping catch_up_timer") self._catch_up_timer.stop() @@ -211,6 +272,66 @@ class PolicyUpdater(Thread): self._catch_up_timer = None self._logger.info("stopped catch_up_timer") + if self._reconfigure_timer: + self._logger.info("stopping reconfigure_timer") + self._reconfigure_timer.stop() + self._reconfigure_timer.join() + self._reconfigure_timer = None + self._logger.info("stopped reconfigure_timer") + + def _on_reconfigure(self): + """bring the latest config and reconfigure""" + with self._lock: + aud_reconfigure = self._aud_reconfigure + if self._aud_reconfigure: + self._aud_reconfigure = None + + if not aud_reconfigure: + return + + log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id) + reconfigure_result = "" + try: + PolicyUpdater._logger.info(log_line) + Config.discover(aud_reconfigure) + if not Config.discovered_config.is_changed(): + reconfigure_result = " -- config not changed" + else: + reconfigure_result = " -- config changed for:" + if self._set_timer_intervals(): + reconfigure_result += " timer_intervals" + + if PolicyRest.reconfigure(): + reconfigure_result += " " + Config.FIELD_POLICY_ENGINE + + if DeployHandler.reconfigure(aud_reconfigure): + reconfigure_result += " " + Config.DEPLOY_HANDLER + + if self._reconfigure_receiver(): + reconfigure_result += " web-socket" + + reconfigure_result += " -- change: {}".format(Config.discovered_config) + + Config.discovered_config.commit_change() + aud_reconfigure.audit_done(result=reconfigure_result) + PolicyUpdater._logger.info(log_line + reconfigure_result) + + except Exception as ex: + error_msg = "crash {} {}{}: {}: {}".format( + "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) + + PolicyUpdater._logger.exception(error_msg) + aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_reconfigure.audit_done(result=error_msg) + + self._run_reconfigure_timer() + + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_reconfigure.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) + + def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: @@ -239,7 +360,7 @@ class PolicyUpdater(Thread): catch_up_result = "- not sending empty catch-up to deployment-handler" else: aud_catch_up.reset_http_status_not_found() - DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + DeployHandler.policy_update(aud_catch_up, catch_up_message) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" PolicyUpdater._logger.warning(catch_up_result) @@ -256,6 +377,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.exception(error_msg) aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_catch_up.audit_done(result=error_msg) success = False self._run_catch_up_timer() @@ -342,7 +464,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = audit self._run.set() - self._stop_catch_up_timer() + self._stop_timers() if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index da83935..08d26f0 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -141,8 +141,10 @@ class Utils(object): return json_str @staticmethod - def are_the_same(body_1, body_2): + def are_the_same(body_1, body_2, json_dumps=None): """check whether both objects are the same""" + if not json_dumps: + json_dumps = json.dumps if (body_1 and not body_2) or (not body_1 and body_2): Utils._logger.debug("only one is empty %s != %s", body_1, body_2) return False @@ -152,21 +154,21 @@ class Utils(object): if isinstance(body_1, list) and isinstance(body_2, list): if len(body_1) != len(body_2): - Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2)) + Utils._logger.debug("len %s != %s", json_dumps(body_1), json_dumps(body_2)) return False for val_1, val_2 in zip(body_1, body_2): - if not Utils.are_the_same(val_1, val_2): + if not Utils.are_the_same(val_1, val_2, json_dumps): return False return True if isinstance(body_1, dict) and isinstance(body_2, dict): if body_1.keys() ^ body_2.keys(): - Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2)) + Utils._logger.debug("keys %s != %s", json_dumps(body_1), json_dumps(body_2)) return False for key, val_1 in body_1.items(): - if not Utils.are_the_same(val_1, body_2[key]): + if not Utils.are_the_same(val_1, body_2[key], json_dumps): return False return True diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py index 768b400..0f4f8e4 100644 --- a/policyhandler/step_timer.py +++ b/policyhandler/step_timer.py @@ -71,9 +71,11 @@ class StepTimer(Thread): self._finished, ) - def next(self): + def next(self, interval=None): """continue with the next timeout""" with self._lock: + if interval: + self._interval = interval self._paused = False if self._waiting_for_timeout: self._next.set() |