From 1d693376205c66af93283d04e8e9740c947a7d02 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Fri, 24 Aug 2018 13:15:04 -0400 Subject: 4.2.0 policy-handler - periodic reconfigure - reconfigure == periodically retrieve the policy-handler config from consul-kv and compare to previous config and subconfigs. If changed, reconfigure the subunits - selectively change one or any settings for the following = catch_up timer interval = reconfigure timer interval = deployment-handler url and params (thread-safe) = policy-engine url and params (thread-safe) = web-socket url to policy-engine (through a callback) - each subunit has its own Settings that keep track of changes - try-catch and metrics around discovery - consul API - hidden the secrets from logs - froze the web-socket version to 0.49.0 because 0.50.0 and 0.51.0 are broken - looking around for stable alternatives - fixed-adapted the callbacks passed to the web-socket lib that changed its API in 0.49.0 and later - log the stack on the exception occurring in the web-socket lib - unit test refactoring Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-470 --- etc_upload/config.json | 33 -------- policyhandler/__main__.py | 12 +-- policyhandler/config.py | 174 +++++++++++++++++++++++++++++---------- policyhandler/deploy_handler.py | 106 ++++++++++++++++-------- policyhandler/discovery.py | 119 ++++++++++++++++++-------- policyhandler/onap/audit.py | 3 +- policyhandler/policy_consts.py | 1 + policyhandler/policy_receiver.py | 66 ++++++++++----- policyhandler/policy_rest.py | 111 ++++++++++++++++--------- policyhandler/policy_updater.py | 144 +++++++++++++++++++++++++++++--- policyhandler/policy_utils.py | 12 +-- policyhandler/step_timer.py | 4 +- pom.xml | 2 +- requirements.txt | 2 +- run_policy.sh | 1 + setup.py | 4 +- tests/mock_config.json | 36 ++++++++ tests/mock_settings.py | 79 +++++++++++++++--- tests/test_policy_utils.py | 2 +- tests/test_policyhandler.py | 105 ++++++++++------------- tests/test_step_timer.py | 2 +- version.properties | 2 +- 22 files changed, 711 insertions(+), 309 deletions(-) delete mode 100644 etc_upload/config.json create mode 100644 tests/mock_config.json diff --git a/etc_upload/config.json b/etc_upload/config.json deleted file mode 100644 index 2e3b412..0000000 --- a/etc_upload/config.json +++ /dev/null @@ -1,33 +0,0 @@ -{ - "policy_handler" : { - "system" : "policy_handler", - "thread_pool_size" : 4, - "pool_connections" : 20, - "policy_retry_count" : 5, - "policy_retry_sleep" : 5, - "catch_up" : { - "interval" : 1200 - }, - "policy_engine" : { - "url" : "https://policy_engine:8081", - "path_pdp" : "/pdp/", - "path_api" : "/pdp/api/", - "headers" : { - "Accept" : "application/json", - "Content-Type" : "application/json", - "ClientAuth" : "Basic blah", - "Authorization" : "Basic blah", - "Environment" : "TEST" - }, - "target_entity" : "policy_engine" - }, - "deploy_handler" : { - "target_entity" : "deployment_handler", - "url" : "http://deployment_handler:8188", - "max_msg_length_mb" : 5, - "query" : { - "cfy_tenant_name" : "default_tenant" - } - } - } -} diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py index 04ca657..63dc5da 100644 --- a/policyhandler/__main__.py +++ b/policyhandler/__main__.py @@ -35,20 +35,20 @@ from policyhandler.web_server import PolicyWeb def run_policy_handler(): """main run function for policy-handler""" - Config.load_from_file() - Config.discover() + Config.init_config() logger = logging.getLogger("policy_handler") sys.stdout = LogWriter(logger.info) sys.stderr = LogWriter(logger.error) logger.info("========== run_policy_handler ========== %s", __package__) - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) - - logger.info("starting policy_handler with config:") - logger.info(Audit.log_json_dumps(Config.settings)) + Audit.init(Config.system_name, Config.LOGGER_CONFIG_FILE_PATH) audit = Audit(req_message="start policy handler") + + Config.discover(audit) + logger.info("starting policy_handler with config: %s", Config.discovered_config) + PolicyReceiver.run(audit) PolicyWeb.run_forever(audit) diff --git a/policyhandler/config.py b/policyhandler/config.py index 8e6edf9..3d68235 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -25,64 +25,131 @@ import logging.config import os from .discovery import DiscoveryClient +from .onap.audit import Audit +from .policy_utils import Utils + +LOGS_DIR = 'logs' + +try: + os.makedirs(LOGS_DIR, mode=0o770, exist_ok=True) +except Exception: + pass logging.basicConfig( - filename='logs/policy_handler.log', \ - format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ - '%(threadName)s %(name)s.%(funcName)s: %(message)s', \ + filename=os.path.join(LOGS_DIR, 'policy_handler.log'), + format=('%(asctime)s.%(msecs)03d %(levelname)+8s ' + + '%(threadName)s %(name)s.%(funcName)s: %(message)s'), datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) +class Settings(object): + """settings of module or an application + that is the config filtered by the collection of config-keys. + + keeps track of changes versus the previous set_config unless committed + """ + def __init__(self, *config_keys): + """provide the collection of top level keys in config to limit the config""" + self._config_keys = config_keys + self._changed = False + self._config = None + self._prev_config = None + + def __str__(self): + """get str of the config""" + return Audit.json_dumps({ + "config_keys": self._config_keys, + "changed": self._changed, + "config": self._config, + "prev_config": self._prev_config + }) + + def is_loaded(self): + """whether loaded already""" + return bool(self._config) + + def commit_change(self): + """set the prev config to the latest config""" + self._prev_config = copy.deepcopy(self._config) + self._changed = False + + def _set_changed(self): + """determine whether the config changed""" + self._changed = not (self._prev_config + and Utils.are_the_same(self._prev_config, self._config, + Audit.json_dumps)) + + def set_config(self, config, auto_commit=False): + """update the config""" + self.commit_change() + + if isinstance(config, Settings): + config = config._config + + if not isinstance(config, dict): + config = {} + + self._config = copy.deepcopy(dict((k, v) for (k, v) in config.items() + if not self._config_keys or k in self._config_keys)) + + if auto_commit: + self.commit_change() + else: + self._set_changed() + + def is_changed(self): + """whether the config has changed""" + return self._changed + + def get_by_key(self, config_key, default=None): + """get the latest sub config by config_key and whether it has changed""" + if not config_key or not isinstance(config_key, str): + return False, default + + value = copy.deepcopy(self._config.get(config_key, default)) + if not self._prev_config: + return True, value + prev_value = self._prev_config.get(config_key, default) + return self._changed and not Utils.are_the_same(prev_value, value, Audit.json_dumps), value + + def update(self, config_key, value=None): + """set the latest sub config by config_key and determine whether the config has changed""" + if not config_key: + return + + self._config[config_key] = copy.deepcopy(value) + self._set_changed() + + class Config(object): """main config of the application""" + _logger = logging.getLogger("policy_handler.config") CONFIG_FILE_PATH = "etc/config.json" LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" SERVICE_NAME_POLICY_HANDLER = "policy_handler" + FIELD_SYSTEM = "system" FIELD_WSERVICE_PORT = "wservice_port" FIELD_POLICY_ENGINE = "policy_engine" + POOL_CONNECTIONS = "pool_connections" + DEPLOY_HANDLER = "deploy_handler" + THREAD_POOL_SIZE = "thread_pool_size" + POLICY_RETRY_COUNT = "policy_retry_count" + POLICY_RETRY_SLEEP = "policy_retry_sleep" + RECONFIGURE = "reconfigure" + TIMER_INTERVAL = "interval" + + system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 - _logger = logging.getLogger("policy_handler.config") - settings = None + _local_config = Settings() + discovered_config = Settings() @staticmethod - def merge(new_config): - """merge the new_config into current config - override the values""" - if not new_config: - return - - if not Config.settings: - Config.settings = new_config - return - - new_config = copy.deepcopy(new_config) - Config.settings.update(new_config) - - @staticmethod - def get_system_name(): - """find the name of the policy-handler system - to be used as the key in consul-kv for config of policy-handler - """ - return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER) - - @staticmethod - def discover(): - """bring and merge the config settings from the discovery service""" - discovery_key = Config.get_system_name() - new_config = DiscoveryClient.get_value(discovery_key) - - if not new_config or not isinstance(new_config, dict): - Config._logger.warning("unexpected config from discovery: %s", new_config) + def init_config(file_path=None): + """read and store the config from config file""" + if Config._local_config.is_loaded(): + Config._logger.info("config already inited: %s", Config._local_config) return - Config._logger.debug("loaded config from discovery(%s): %s", \ - discovery_key, json.dumps(new_config)) - Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings)) - Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings)) - - @staticmethod - def load_from_file(file_path=None): - """read and store the config from config file""" if not file_path: file_path = Config.CONFIG_FILE_PATH @@ -101,6 +168,25 @@ class Config(object): logging.config.dictConfig(logging_config) Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) - Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("config loaded from file: %s", json.dumps(Config.settings)) - return True + + local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER) + Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) + + Config._local_config.set_config(local_config, auto_commit=True) + Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config) + + @staticmethod + def discover(audit): + """bring and merge the config settings from the discovery service""" + discovery_key = Config.system_name + new_config = DiscoveryClient.get_value(audit, discovery_key) + + if not new_config or not isinstance(new_config, dict): + Config._logger.warning("unexpected config from discovery: %s", new_config) + return + + Config._logger.debug("loaded config from discovery(%s): %s", + discovery_key, Audit.json_dumps(new_config)) + + Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) + Config._logger.info("config from discovery: %s", Config.discovered_config) diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 6d258f2..e308c1a 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -21,17 +21,18 @@ import json import logging from copy import copy, deepcopy +from threading import Lock import requests -from .config import Config +from .config import Config, Settings from .customize import CustomizerUser from .discovery import DiscoveryClient from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, POLICY_FILTER_MATCHES, POLICY_FILTERS, - REMOVED_POLICIES) + REMOVED_POLICIES, TARGET_ENTITY) class PolicyUpdateMessage(object): @@ -144,7 +145,11 @@ class PolicyUpdateMessage(object): class DeployHandler(object): """calling the deployment-handler web apis""" _logger = logging.getLogger("policy_handler.deploy_handler") + DEFAULT_TARGET_ENTITY = "deployment_handler" + _lazy_inited = False + _lock = Lock() + _settings = Settings(Config.POOL_CONNECTIONS, Config.DEPLOY_HANDLER) _requests_session = None _url = None @@ -157,11 +162,8 @@ class DeployHandler(object): server_instance_changed = False @staticmethod - def _lazy_init(audit, rediscover=False): - """ set static properties """ - if DeployHandler._lazy_inited and not rediscover: - return - + def _init(audit): + """set config""" DeployHandler._custom_kwargs = (CustomizerUser.get_customizer() .get_deploy_handler_kwargs(audit)) if (not DeployHandler._custom_kwargs @@ -169,18 +171,18 @@ class DeployHandler(object): DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: - pool_size = Config.settings.get("pool_connections", 20) DeployHandler._requests_session = requests.Session() + + changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10) + if changed: DeployHandler._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) DeployHandler._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) - config_dh = Config.settings.get("deploy_handler") + _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER) if config_dh and isinstance(config_dh, dict): # dns based routing to deployment-handler # config for policy-handler >= 2.4.0 @@ -192,7 +194,8 @@ class DeployHandler(object): # "cfy_tenant_name" : "default_tenant" # } # } - DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") + DeployHandler._target_entity = config_dh.get(TARGET_ENTITY, + DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = config_dh.get("url") DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", DeployHandler._max_msg_length_mb) @@ -205,19 +208,45 @@ class DeployHandler(object): if not isinstance(config_dh, dict): # config for policy-handler <= 2.3.1 # "deploy_handler" : "deployment_handler" - DeployHandler._target_entity = str(config_dh or "deployment_handler") + DeployHandler._target_entity = str(config_dh or DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity) DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' - DeployHandler._logger.info( - "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy) + DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity, + DeployHandler._url_policy, DeployHandler._settings) + DeployHandler._settings.commit_change() DeployHandler._lazy_inited = bool(DeployHandler._url) + @staticmethod + def reconfigure(audit): + """reconfigure""" + with DeployHandler._lock: + DeployHandler._settings.set_config(Config.discovered_config) + if not DeployHandler._settings.is_changed(): + DeployHandler._settings.commit_change() + return False + + DeployHandler._lazy_inited = False + DeployHandler._init(audit) + return True @staticmethod - def policy_update(audit, policy_update_message, rediscover=False): + def _lazy_init(audit): + """set config""" + if DeployHandler._lazy_inited: + return + + with DeployHandler._lock: + if DeployHandler._lazy_inited: + return + + DeployHandler._settings.set_config(Config.discovered_config) + DeployHandler._init(audit) + + @staticmethod + def policy_update(audit, policy_update_message): """ segments the big policy_update_message limited by size and sequatially sends each segment as put to deployment-handler at /policy. @@ -227,7 +256,7 @@ class DeployHandler(object): if not policy_update_message or policy_update_message.empty(): return - DeployHandler._lazy_init(audit, rediscover) + DeployHandler._lazy_init(audit) str_metrics = "policy_update {0}".format(str(policy_update_message)) @@ -277,11 +306,14 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.put( - DeployHandler._url_policy, json=message, - headers=headers, params=DeployHandler._query, - **DeployHandler._custom_kwargs - ) + with DeployHandler._lock: + session = DeployHandler._requests_session + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + res = session.put(url, json=message, + headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -311,12 +343,12 @@ class DeployHandler(object): @staticmethod - def get_deployed_policies(audit, rediscover=False): + def get_deployed_policies(audit): """ Retrieves policies and policy-filters from components that were deployed by deployment-handler """ - DeployHandler._lazy_init(audit, rediscover) + DeployHandler._lazy_init(audit) metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_policy) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} @@ -338,10 +370,13 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.get( - DeployHandler._url_policy, headers=headers, params=DeployHandler._query, - **DeployHandler._custom_kwargs - ) + with DeployHandler._lock: + session = DeployHandler._requests_session + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + res = session.get(url, headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -390,7 +425,6 @@ class DeployHandler(object): and prev_server_instance_uuid != DeployHandler._server_instance_uuid): DeployHandler.server_instance_changed = True - log_line = ("deployment_handler_changed: {1} != {0}" - .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) - metrics.info(log_line) - DeployHandler._logger.info(log_line) + DeployHandler._logger.info(metrics.info( + "deployment_handler_changed: {1} != {0}" + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))) diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index ce24c3d..4e6bc3d 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -25,6 +25,8 @@ import logging import requests from .customize import CustomizerUser +from .onap.audit import AuditHttpCode, Metrics + class DiscoveryClient(object): """talking to consul at http://consul:8500 @@ -40,56 +42,103 @@ class DiscoveryClient(object): -p : ${APPNAME}:latest """ + CONSUL_ENTITY = "consul" CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}" CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}" _logger = logging.getLogger("policy_handler.discovery") @staticmethod - def get_service_url(audit, service_name): + def _discover_service(audit, service_name, service_path): """find the service record in consul""" - service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name) - log_line = "discover {0}".format(service_path) - DiscoveryClient._logger.info(log_line) - audit.info(log_line) response = requests.get(service_path) - - log_line = "response {0} for {1}: {2}".format( - response.status_code, service_path, response.text) - DiscoveryClient._logger.info(log_line) - audit.info(log_line) + DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format( + response.status_code, service_path, response.text))) response.raise_for_status() + status_code = response.status_code + service = response.json()[0] + return (status_code, + CustomizerUser.get_customizer().get_service_url(audit, service_name, service)) + + @staticmethod + def get_service_url(audit, service_name): + """find the service record in consul""" + service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name) + metrics = Metrics(aud_parent=audit, targetEntity=DiscoveryClient.CONSUL_ENTITY, + targetServiceName=service_path) + + log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, service_path) - service = response.json() - if not service: - log_line = "failed discover {0}".format(service_path) - DiscoveryClient._logger.error(log_line) - audit.error(log_line) - return - service = service[0] + DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + status_code = None + try: + (status_code, + service_url) = DiscoveryClient._discover_service(audit, service_name, service_path) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, + type(ex).__name__, str(ex))) + DiscoveryClient._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None - service_url = CustomizerUser.get_customizer().get_service_url(audit, service_name, service) if not service_url: - log_line = "failed to get service_url for {0}".format(service_name) - DiscoveryClient._logger.error(log_line) - audit.error(log_line) - return - - log_line = "got service_url: {0} for {1}".format(service_url, service_name) - DiscoveryClient._logger.info(log_line) - audit.info(log_line) + error_code = AuditHttpCode.DATA_ERROR.value + error_msg = "failed {}/{} to {}".format(status_code, error_code, log_line) + DiscoveryClient._logger.error(audit.error(error_msg)) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} {}".format(status_code, log_line) + DiscoveryClient._logger.info(audit.info("got service_url: {} after {}" + .format(service_url, log_line))) + + metrics.set_http_status_code(status_code) + audit.set_http_status_code(status_code) + metrics.metrics(log_line) return service_url @staticmethod - def get_value(key): - """get the value for the key from consul-kv""" - response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key)) + def _get_value_from_kv(url): + """get the value from consul-kv at discovery url""" + response = requests.get(url) response.raise_for_status() data = response.json() - if not data: - DiscoveryClient._logger.error("failed get_value %s", key) - return value = base64.b64decode(data[0]["Value"]).decode("utf-8") - DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s", - key, value, json.dumps(data)) - return json.loads(value) + return response.status_code, json.loads(value) + + @staticmethod + def get_value(audit, key): + """get the value for the key from consul-kv""" + discovery_url = DiscoveryClient.CONSUL_KV_MASK.format(key) + metrics = Metrics(aud_parent=audit, targetEntity=DiscoveryClient.CONSUL_ENTITY, + targetServiceName=discovery_url) + + log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url) + + DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + try: + status_code, value = DiscoveryClient._get_value_from_kv(discovery_url) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, + type(ex).__name__, str(ex))) + DiscoveryClient._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} {}".format(status_code, log_line) + metrics.set_http_status_code(status_code) + audit.set_http_status_code(status_code) + metrics.metrics(log_line) + return value diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index db85c18..1bee4a7 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -339,7 +339,7 @@ class _Audit(object): return obj @staticmethod - def log_json_dumps(obj, **kwargs): + def json_dumps(obj, **kwargs): """hide the known secret field values of the dictionary and return json.dumps""" if not isinstance(obj, dict): return json.dumps(obj, **kwargs) @@ -455,6 +455,7 @@ class Metrics(_Audit): self.info(log_line, **self.merge_all_kwargs(**kwargs)) _Audit._health.start(self._metrics_name, self.request_id) _Audit._health.start(METRICS_TOTAL_STATS, self.request_id) + return log_line def metrics(self, log_line, **kwargs): diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 51ac173..cde4551 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -36,3 +36,4 @@ POLICY_VERSIONS = "policy_versions" MATCHING_CONDITIONS = "matchingConditions" POLICY_NAMES = "policy_names" POLICY_FILTER_MATCHES = "policy_filter_matches" +TARGET_ENTITY = "target_entity" diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index bb33cd5..1edb24d 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -32,7 +32,7 @@ from threading import Lock, Thread import websocket -from .config import Config +from .config import Config, Settings from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater @@ -47,25 +47,50 @@ class _PolicyReceiver(Thread): def __init__(self): """web-socket inside the thread to receive policy notifications from PolicyEngine""" - Thread.__init__(self, name="policy_receiver") - self.daemon = True + Thread.__init__(self, name="policy_receiver", daemon=True) self._lock = Lock() self._keep_running = True + self._settings = Settings(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] - self.web_socket_url = resturl = config["url"] + config["path_pdp"] - - if resturl.startswith("https:"): - self.web_socket_url = resturl.replace("https:", "wss:") + "notifications" - else: - self.web_socket_url = resturl.replace("http:", "ws:") + "notifications" - + self._web_socket_url = None self._web_socket = None + self.reconfigure() - self._policy_updater = PolicyUpdater() + self._policy_updater = PolicyUpdater(self.reconfigure) self._policy_updater.start() + def reconfigure(self): + """configure and reconfigure the web-socket""" + with self._lock: + self._settings.set_config(Config.discovered_config) + changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + + if not changed: + self._settings.commit_change() + return False + + prev_web_socket_url = self._web_socket_url + resturl = config.get("url", "") + config.get("path_pdp", "") + + if resturl.startswith("https:"): + self._web_socket_url = resturl.replace("https:", "wss:") + "notifications" + else: + self._web_socket_url = resturl.replace("http:", "ws:") + "notifications" + + if self._web_socket_url == prev_web_socket_url: + _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + return False + + _PolicyReceiver._logger.info("changed web_socket_url(%s): %s", + self._web_socket_url, self._settings) + self._settings.commit_change() + + self._stop_notifications() + return True + def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" websocket.enableTrace(True) @@ -80,9 +105,9 @@ class _PolicyReceiver(Thread): time.sleep(5) _PolicyReceiver._logger.info( - "connecting to policy-notifications at: %s", self.web_socket_url) + "connecting to policy-notifications at: %s", self._web_socket_url) self._web_socket = websocket.WebSocketApp( - self.web_socket_url, + self._web_socket_url, on_message=self._on_pdp_message, on_close=self._on_ws_close, on_error=self._on_ws_error @@ -101,15 +126,16 @@ class _PolicyReceiver(Thread): return keep_running def _stop_notifications(self): - """Shuts down the AutoNotification service if running.""" + """close the web-socket == stops the notification service if running.""" with self._lock: if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: self._web_socket.close() _PolicyReceiver._logger.info("Stopped receiving notifications from PDP") - def _on_pdp_message(self, _, message): + def _on_pdp_message(self, *args): """received the notification from PDP""" try: + message = args and args[-1] _PolicyReceiver._logger.info("Received notification message: %s", message) if not message: return @@ -144,13 +170,13 @@ class _PolicyReceiver(Thread): _PolicyReceiver._logger.exception(error_msg) - def _on_ws_error(self, _, error): + def _on_ws_error(self, error): """report an error""" - _PolicyReceiver._logger.error("policy-notification error: %s", error) + _PolicyReceiver._logger.exception("policy-notification error: %s", str(error)) - def _on_ws_close(self, _): + def _on_ws_close(self, code, reason): """restart web-socket on close""" - _PolicyReceiver._logger.info("lost connection to PDP - restarting...") + _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason) def shutdown(self, audit): """Shutdown the policy-receiver""" diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 6ec982a..d10f4bf 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -23,10 +23,11 @@ import json import logging import time from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock import requests -from .config import Config +from .config import Config, Settings from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, @@ -51,6 +52,11 @@ class PolicyRest(object): EXPECTED_VERSIONS = "expected_versions" IGNORE_POLICY_NAMES = "ignore_policy_names" + _lock = Lock() + _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, + Config.THREAD_POOL_SIZE, + Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) + _requests_session = None _url_get_config = None _headers = None @@ -60,39 +66,69 @@ class PolicyRest(object): _policy_retry_sleep = 0 @staticmethod - def _lazy_init(): + def _init(): """init static config""" - if PolicyRest._lazy_inited: - return - PolicyRest._lazy_inited = True + _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] + if not PolicyRest._requests_session: + PolicyRest._requests_session = requests.Session() - pool_size = Config.settings.get("pool_connections", 20) - PolicyRest._requests_session = requests.Session() - PolicyRest._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) - PolicyRest._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) + if changed: + PolicyRest._requests_session.mount( + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) + PolicyRest._requests_session.mount( + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) PolicyRest._url_get_config = (config["url"] + config["path_api"] + PolicyRest.POLICY_GET_CONFIG) PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) + _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( + Config.THREAD_POOL_SIZE, 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) + _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_COUNT, 1) + _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_SLEEP, 0) - PolicyRest._logger.info( - "PolicyClient url(%s) headers(%s)", - PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers)) + PolicyRest._logger.info("PDP url(%s) headers(%s): %s", + PolicyRest._url_get_config, + Metrics.json_dumps(PolicyRest._headers), + PolicyRest._settings) + + PolicyRest._settings.commit_change() + PolicyRest._lazy_inited = True + + @staticmethod + def reconfigure(): + """reconfigure""" + with PolicyRest._lock: + PolicyRest._settings.set_config(Config.discovered_config) + if not PolicyRest._settings.is_changed(): + PolicyRest._settings.commit_change() + return False + + PolicyRest._lazy_inited = False + PolicyRest._init() + return True + + @staticmethod + def _lazy_init(): + """init static config""" + if PolicyRest._lazy_inited: + return + + with PolicyRest._lock: + if PolicyRest._lazy_inited: + return + + PolicyRest._settings.set_config(Config.discovered_config) + PolicyRest._init() @staticmethod def _pdp_get_config(audit, json_body): @@ -100,26 +136,27 @@ class PolicyRest(object): metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) - msg = json.dumps(json_body) - headers = copy.copy(PolicyRest._headers) + with PolicyRest._lock: + session = PolicyRest._requests_session + url = PolicyRest._url_get_config + headers = copy.deepcopy(PolicyRest._headers) + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id - headers_str = Metrics.log_json_dumps(headers) + headers_str = Metrics.json_dumps(headers) + + msg = json.dumps(json_body) + log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str) + + PolicyRest._logger.info(metrics.metrics_start(log_line)) - log_line = "post to PDP {0} msg={1} headers={2}".format( - PolicyRest._url_get_config, msg, headers_str) - metrics.metrics_start(log_line) - PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post( - PolicyRest._url_get_config, json=json_body, headers=headers) + res = session.post(url, json=json_body, headers=headers) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) - error_msg = ( - "failed to post to PDP {0} {1}: {2} msg={3} headers={4}" - .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str)) + error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) PolicyRest._logger.exception(error_msg) metrics.set_http_status_code(error_code) @@ -127,9 +164,9 @@ class PolicyRest(object): metrics.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( - PolicyRest._url_get_config, res.status_code, msg, res.text, - Metrics.log_json_dumps(dict(res.request.headers.items()))) + log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format( + res.status_code, url, msg, res.text, + Metrics.json_dumps(dict(res.request.headers.items()))) status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 7733146..8909cc7 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -22,7 +22,7 @@ import json import logging from threading import Event, Lock, Thread -from .config import Config +from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, @@ -115,21 +115,45 @@ class PolicyUpdater(Thread): """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" _logger = logging.getLogger("policy_handler.policy_updater") - def __init__(self): + def __init__(self, on_reconfigure_receiver): """init static config of PolicyUpdater.""" - Thread.__init__(self, name="policy_updater") - self.daemon = True + Thread.__init__(self, name="policy_updater", daemon=True) + self._reconfigure_receiver = on_reconfigure_receiver self._lock = Lock() self._run = Event() + self._settings = Settings(CATCH_UP, Config.RECONFIGURE) self._catch_up_timer = None + self._reconfigure_timer = None + self._aud_shutdown = None self._aud_catch_up = None + self._aud_reconfigure = None self._policy_update = _PolicyUpdate() - catch_up_config = Config.settings.get(CATCH_UP, {}) - self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_interval = None + self._reconfigure_interval = None + self._set_timer_intervals() + + def _set_timer_intervals(self): + """set intervals on timers""" + self._settings.set_config(Config.discovered_config) + if not self._settings.is_changed(): + self._settings.commit_change() + return False + + _, catch_up = self._settings.get_by_key(CATCH_UP, {}) + self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60 + + _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) + self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 + + PolicyUpdater._logger.info( + "intervals: catch_up(%s) reconfigure(%s): %s", + self._catch_up_interval, self._reconfigure_interval, self._settings) + self._settings.commit_change() + return True def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" @@ -148,8 +172,20 @@ class PolicyUpdater(Thread): ) self._run.set() + def _reconfigure(self): + """job to check for and bring in the updated config for policy-handler""" + with self._lock: + if not self._aud_reconfigure: + self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) + PolicyUpdater._logger.info( + "reconfigure %s request_id %s", + self._aud_reconfigure.req_message, self._aud_reconfigure.request_id + ) + self._run.set() + def run(self): """wait and run the policy-update in thread""" + self._run_reconfigure_timer() while True: PolicyUpdater._logger.info("waiting for policy-updates...") self._run.wait() @@ -157,6 +193,11 @@ class PolicyUpdater(Thread): with self._lock: self._run.clear() + if not self._keep_running(): + break + + self._on_reconfigure() + if not self._keep_running(): break @@ -183,7 +224,7 @@ class PolicyUpdater(Thread): if self._catch_up_timer: self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) - self._catch_up_timer.next() + self._catch_up_timer.next(self._catch_up_interval) return self._catch_up_timer = StepTimer( @@ -196,14 +237,34 @@ class PolicyUpdater(Thread): self._logger.info("started catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.start() + def _run_reconfigure_timer(self): + """create and start the reconfigure timer""" + if not self._reconfigure_interval: + return + + if self._reconfigure_timer: + self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.next(self._reconfigure_interval) + return + + self._reconfigure_timer = StepTimer( + "reconfigure_timer", + self._reconfigure_interval, + PolicyUpdater._reconfigure, + PolicyUpdater._logger, + self + ) + self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) + self._reconfigure_timer.start() + def _pause_catch_up_timer(self): """pause catch_up_timer""" if self._catch_up_timer: self._logger.info("pause catch_up_timer") self._catch_up_timer.pause() - def _stop_catch_up_timer(self): - """stop and destroy the catch_up_timer""" + def _stop_timers(self): + """stop and destroy the catch_up and reconfigure timers""" if self._catch_up_timer: self._logger.info("stopping catch_up_timer") self._catch_up_timer.stop() @@ -211,6 +272,66 @@ class PolicyUpdater(Thread): self._catch_up_timer = None self._logger.info("stopped catch_up_timer") + if self._reconfigure_timer: + self._logger.info("stopping reconfigure_timer") + self._reconfigure_timer.stop() + self._reconfigure_timer.join() + self._reconfigure_timer = None + self._logger.info("stopped reconfigure_timer") + + def _on_reconfigure(self): + """bring the latest config and reconfigure""" + with self._lock: + aud_reconfigure = self._aud_reconfigure + if self._aud_reconfigure: + self._aud_reconfigure = None + + if not aud_reconfigure: + return + + log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id) + reconfigure_result = "" + try: + PolicyUpdater._logger.info(log_line) + Config.discover(aud_reconfigure) + if not Config.discovered_config.is_changed(): + reconfigure_result = " -- config not changed" + else: + reconfigure_result = " -- config changed for:" + if self._set_timer_intervals(): + reconfigure_result += " timer_intervals" + + if PolicyRest.reconfigure(): + reconfigure_result += " " + Config.FIELD_POLICY_ENGINE + + if DeployHandler.reconfigure(aud_reconfigure): + reconfigure_result += " " + Config.DEPLOY_HANDLER + + if self._reconfigure_receiver(): + reconfigure_result += " web-socket" + + reconfigure_result += " -- change: {}".format(Config.discovered_config) + + Config.discovered_config.commit_change() + aud_reconfigure.audit_done(result=reconfigure_result) + PolicyUpdater._logger.info(log_line + reconfigure_result) + + except Exception as ex: + error_msg = "crash {} {}{}: {}: {}".format( + "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) + + PolicyUpdater._logger.exception(error_msg) + aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_reconfigure.audit_done(result=error_msg) + + self._run_reconfigure_timer() + + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_reconfigure.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) + + def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: @@ -239,7 +360,7 @@ class PolicyUpdater(Thread): catch_up_result = "- not sending empty catch-up to deployment-handler" else: aud_catch_up.reset_http_status_not_found() - DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + DeployHandler.policy_update(aud_catch_up, catch_up_message) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" PolicyUpdater._logger.warning(catch_up_result) @@ -256,6 +377,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.exception(error_msg) aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + aud_catch_up.audit_done(result=error_msg) success = False self._run_catch_up_timer() @@ -342,7 +464,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = audit self._run.set() - self._stop_catch_up_timer() + self._stop_timers() if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index da83935..08d26f0 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -141,8 +141,10 @@ class Utils(object): return json_str @staticmethod - def are_the_same(body_1, body_2): + def are_the_same(body_1, body_2, json_dumps=None): """check whether both objects are the same""" + if not json_dumps: + json_dumps = json.dumps if (body_1 and not body_2) or (not body_1 and body_2): Utils._logger.debug("only one is empty %s != %s", body_1, body_2) return False @@ -152,21 +154,21 @@ class Utils(object): if isinstance(body_1, list) and isinstance(body_2, list): if len(body_1) != len(body_2): - Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2)) + Utils._logger.debug("len %s != %s", json_dumps(body_1), json_dumps(body_2)) return False for val_1, val_2 in zip(body_1, body_2): - if not Utils.are_the_same(val_1, val_2): + if not Utils.are_the_same(val_1, val_2, json_dumps): return False return True if isinstance(body_1, dict) and isinstance(body_2, dict): if body_1.keys() ^ body_2.keys(): - Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2)) + Utils._logger.debug("keys %s != %s", json_dumps(body_1), json_dumps(body_2)) return False for key, val_1 in body_1.items(): - if not Utils.are_the_same(val_1, body_2[key]): + if not Utils.are_the_same(val_1, body_2[key], json_dumps): return False return True diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py index 768b400..0f4f8e4 100644 --- a/policyhandler/step_timer.py +++ b/policyhandler/step_timer.py @@ -71,9 +71,11 @@ class StepTimer(Thread): self._finished, ) - def next(self): + def next(self, interval=None): """continue with the next timeout""" with self._lock: + if interval: + self._interval = interval self._paused = False if self._waiting_for_timeout: self._next.set() diff --git a/pom.xml b/pom.xml index 6230822..54b234c 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform policy-handler dcaegen2-platform-policy-handler - 4.1.0-SNAPSHOT + 4.2.0-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/requirements.txt b/requirements.txt index 72c36de..df9b56d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ CherryPy>=15.0.0,<16.0.0 psutil>=5.4.5,<6.0.0 requests>=2.18.4,<3.0.0 -websocket-client>=0.48.0,<1.0.0 +websocket-client==0.49.0 diff --git a/run_policy.sh b/run_policy.sh index a7ddde4..1d411b2 100644 --- a/run_policy.sh +++ b/run_policy.sh @@ -25,6 +25,7 @@ echo "---------------------------------------------" STARTED=$(date +%Y-%m-%d_%T.%N) echo "${STARTED}: running ${BASH_SOURCE[0]}" echo "APP_VER =" $(python setup.py --version) +echo "HOSTNAME =" ${HOSTNAME} (uname -a; echo "/etc/hosts"; cat /etc/hosts; pwd) python -m policyhandler & diff --git a/setup.py b/setup.py index f8c4aeb..c620e2b 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='policyhandler', description='DCAE-Controller policy-handler to communicate with policy-engine', - version="4.1.0", + version="4.2.0", author='Alex Shatov', packages=['policyhandler'], zip_safe=False, @@ -31,7 +31,7 @@ setup( "CherryPy>=15.0.0,<16.0.0", "psutil>=5.4.5,<6.0.0", "requests>=2.18.4,<3.0.0", - "websocket-client>=0.48.0,<1.0.0" + "websocket-client==0.49.0" ], keywords='policy dcae controller', classifiers=[ diff --git a/tests/mock_config.json b/tests/mock_config.json new file mode 100644 index 0000000..98b0d19 --- /dev/null +++ b/tests/mock_config.json @@ -0,0 +1,36 @@ +{ + "policy_handler" : { + "system" : "policy_handler", + "thread_pool_size" : 4, + "pool_connections" : 20, + "policy_retry_count" : 5, + "policy_retry_sleep" : 5, + "catch_up" : { + "interval" : 10 + }, + "reconfigure" : { + "interval" : 10 + }, + "policy_engine" : { + "url" : "https://pdp-server:8081", + "path_pdp" : "/pdp/", + "path_api" : "/pdp/api/", + "headers" : { + "Accept" : "application/json", + "Content-Type" : "application/json", + "ClientAuth" : "Basic user", + "Authorization" : "Basic auth", + "Environment" : "TEST" + }, + "target_entity" : "policy_engine" + }, + "deploy_handler" : { + "target_entity" : "deployment_handler", + "url" : "http://deployment_handler:8188", + "max_msg_length_mb" : 5, + "query" : { + "cfy_tenant_name" : "default_tenant" + } + } + } +} diff --git a/tests/mock_settings.py b/tests/mock_settings.py index 017ad7e..7e05ecf 100644 --- a/tests/mock_settings.py +++ b/tests/mock_settings.py @@ -17,43 +17,100 @@ # ECOMP is a trademark and service mark of AT&T Intellectual Property. """settings that are general to all tests""" +import copy import json import logging import sys import uuid from datetime import datetime +from functools import wraps from policyhandler import LogWriter from policyhandler.config import Config +from policyhandler.discovery import DiscoveryClient from policyhandler.onap.audit import Audit +class MonkeyHttpResponse(object): + """Monkey http reposne""" + def __init__(self, headers): + self.headers = headers or {} + + +class MonkeyedResponse(object): + """Monkey response""" + def __init__(self, full_path, res_json, json_body=None, headers=None): + self.full_path = full_path + self.req_json = json_body or {} + self.status_code = 200 + self.request = MonkeyHttpResponse(headers) + self.res = res_json + self.text = json.dumps(self.res) + + def json(self): + """returns json of response""" + return self.res + + def raise_for_status(self): + """ignoring""" + pass + + +def _fix_discover_config(func): + """the decorator""" + if not func: + return None + + def mocked_discover_get_value(*_): + """monkeypatch for get from consul""" + return copy.deepcopy(Settings.mock_config) + + @wraps(func) + def wrapper(*args, **kwargs): + """override the DiscoveryClient.get_value to fake discovering the config""" + + old_get_value = DiscoveryClient.get_value + DiscoveryClient.get_value = mocked_discover_get_value + + func_result = func(*args, **kwargs) + + DiscoveryClient.get_value = old_get_value + + return func_result + return wrapper + class Settings(object): """init all locals""" + _loaded = False logger = None RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z' - dicovered_config = None + mock_config = None deploy_handler_instance_uuid = str(uuid.uuid4()) @staticmethod + @_fix_discover_config def init(): """init configs""" - Config.load_from_file() - - with open("etc_upload/config.json", 'r') as config_json: - Settings.dicovered_config = json.load(config_json) + if Settings._loaded: + return + Settings._loaded = True - Config.load_from_file("etc_upload/config.json") + Config.init_config() - Config.settings["catch_up"] = {"interval": 10} + with open("tests/mock_config.json", 'r') as config_json: + Settings.mock_config = json.load(config_json) Settings.logger = logging.getLogger("policy_handler.unit_test") sys.stdout = LogWriter(Settings.logger.info) sys.stderr = LogWriter(Settings.logger.error) - print("print ========== run_policy_handler ==========") + print("print is expected to be in the log") Settings.logger.info("========== run_policy_handler ==========") - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) + Audit.init(Config.system_name, Config.LOGGER_CONFIG_FILE_PATH) + audit = Audit(req_message="start testing policy handler") + + Config.discover(audit) + + Settings.logger.info("testing policy_handler with config: %s", Config.discovered_config) - Settings.logger.info("starting policy_handler with config:") - Settings.logger.info(Audit.log_json_dumps(Config.settings)) + audit.audit_done(" -- started") diff --git a/tests/test_policy_utils.py b/tests/test_policy_utils.py index b88f1ea..dcf6ccb 100644 --- a/tests/test_policy_utils.py +++ b/tests/test_policy_utils.py @@ -25,7 +25,7 @@ import re from policyhandler.config import Config from policyhandler.policy_utils import RegexCoarser -Config.load_from_file() +Config.init_config() LOGGER = logging.getLogger("policy_handler.unit_test_policy_utils") diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index dec760a..d14aeea 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -18,16 +18,18 @@ """test of the package for policy-handler of DCAE-Controller""" +import base64 import copy import json import re import time import uuid -import pytest import cherrypy from cherrypy.test.helper import CPWebCase +import pytest + from policyhandler.config import Config from policyhandler.deploy_handler import DeployHandler from policyhandler.discovery import DiscoveryClient @@ -42,57 +44,10 @@ from policyhandler.policy_rest import PolicyRest from policyhandler.policy_utils import PolicyUtils, Utils from policyhandler.web_server import _PolicyWeb -from .mock_settings import Settings +from .mock_settings import MonkeyedResponse, Settings Settings.init() -class MonkeyHttpResponse(object): - """Monkey http reposne""" - def __init__(self, headers): - self.headers = headers or {} - - -class MonkeyedResponse(object): - """Monkey response""" - def __init__(self, full_path, res_json, json_body=None, headers=None): - self.full_path = full_path - self.req_json = json_body or {} - self.status_code = 200 - self.request = MonkeyHttpResponse(headers) - self.res = res_json - self.text = json.dumps(self.res) - - def json(self): - """returns json of response""" - return self.res - - def raise_for_status(self): - """ignoring""" - pass - - -def monkeyed_discovery(full_path): - """monkeypatch for get from consul""" - res_json = {} - if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format(Config.settings["deploy_handler"]): - res_json = [{ - "ServiceAddress": "1.1.1.1", - "ServicePort": "123" - }] - elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.get_system_name()): - res_json = copy.deepcopy(Settings.dicovered_config) - return MonkeyedResponse(full_path, res_json) - - -@pytest.fixture() -def fix_discovery(monkeypatch): - """monkeyed discovery request.get""" - Settings.logger.info("setup fix_discovery") - monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery) - yield fix_discovery # provide the fixture value - Settings.logger.info("teardown fix_discovery") - - class MonkeyPolicyBody(object): """policy body that policy-engine returns""" @staticmethod @@ -190,7 +145,6 @@ class MockPolicyEngine(object): for k, v in MockPolicyEngine.gen_all_policies_latest().items() if re.match(match_to_policy_name, k)) - MockPolicyEngine.init() @@ -289,9 +243,30 @@ def fix_select_latest_policies_boom(monkeypatch): yield fix_select_latest_policies_boom Settings.logger.info("teardown fix_select_latest_policies_boom") +@pytest.fixture() +def fix_discovery(monkeypatch): + """monkeyed discovery request.get""" + def monkeyed_discovery(full_path): + """monkeypatch for get from consul""" + res_json = {} + if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format( + Config.discovered_config.get_by_key(Config.DEPLOY_HANDLER)): + res_json = [{ + "ServiceAddress": "1.1.1.1", + "ServicePort": "123" + }] + elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.system_name): + res_json = [{"Value": base64.b64encode( + json.dumps(Settings.mock_config).encode()).decode("utf-8")}] + return MonkeyedResponse(full_path, res_json) + + Settings.logger.info("setup fix_discovery") + monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery) + yield fix_discovery # provide the fixture value + Settings.logger.info("teardown fix_discovery") @pytest.fixture() -def fix_deploy_handler(monkeypatch, fix_discovery): +def fix_deploy_handler(monkeypatch): """monkeyed requests to deployment-handler""" def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None): """monkeypatch for policy-update request.put to deploy_handler""" @@ -313,11 +288,12 @@ def fix_deploy_handler(monkeypatch, fix_discovery): monkeyed_deploy_handler_get) yield fix_deploy_handler # provide the fixture value + audit.audit_done("teardown") Settings.logger.info("teardown fix_deploy_handler") @pytest.fixture() -def fix_deploy_handler_fail(monkeypatch, fix_discovery): +def fix_deploy_handler_fail(monkeypatch): """monkeyed failed discovery request.get""" def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None): """monkeypatch for deploy_handler""" @@ -335,16 +311,16 @@ def fix_deploy_handler_fail(monkeypatch, fix_discovery): None, headers) @staticmethod - def monkeyed_deploy_handler_init(audit_ignore, rediscover=False): + def monkeyed_deploy_handler_init(audit_ignore): """monkeypatch for deploy_handler init""" DeployHandler._url = None Settings.logger.info("setup fix_deploy_handler_fail") - config_catch_up = Config.settings["catch_up"] - Config.settings["catch_up"] = {"interval": 1} + config_catch_up = Config.discovered_config.get_by_key("catch_up") + Config.discovered_config.update("catch_up", {"interval": 1}) audit = Audit(req_message="fix_deploy_handler_fail") - DeployHandler._lazy_init(audit, rediscover=True) + DeployHandler._lazy_init(audit) monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init', monkeyed_deploy_handler_init) @@ -354,8 +330,9 @@ def fix_deploy_handler_fail(monkeypatch, fix_discovery): monkeyed_deploy_handler_get) yield fix_deploy_handler_fail + audit.audit_done("teardown") Settings.logger.info("teardown fix_deploy_handler_fail") - Config.settings["catch_up"] = config_catch_up + Config.discovered_config.update("catch_up", config_catch_up) @pytest.fixture() @@ -438,7 +415,7 @@ def fix_policy_receiver_websocket(monkeypatch): Settings.logger.info("teardown fix_policy_receiver_websocket") -def test_get_policy_latest(fix_pdp_post): +def test_get_policy_latest(fix_pdp_post, fix_discovery): """test /policy_latest/""" policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) @@ -453,7 +430,7 @@ def test_get_policy_latest(fix_pdp_post): -@pytest.mark.usefixtures("fix_pdp_post") +@pytest.mark.usefixtures("fix_pdp_post", "fix_discovery") class WebServerTest(CPWebCase): """testing the web-server - runs tests in alphabetical order of method names""" def setup_server(): @@ -630,6 +607,7 @@ class WebServerTest(CPWebCase): WebServerTest.do_gc_test = False Settings.logger.info("shutdown...") + audit.audit_done("shutdown") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') @@ -637,7 +615,7 @@ class WebServerTest(CPWebCase): time.sleep(1) -@pytest.mark.usefixtures("fix_pdp_post_boom") +@pytest.mark.usefixtures("fix_pdp_post_boom", "fix_discovery") class WebServerPDPBoomTest(CPWebCase): """testing the web-server - runs tests in alphabetical order of method names""" def setup_server(): @@ -791,6 +769,7 @@ class WebServerPDPBoomTest(CPWebCase): WebServerPDPBoomTest.do_gc_test = False Settings.logger.info("shutdown...") + audit.audit_done("shutdown") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') @@ -798,7 +777,7 @@ class WebServerPDPBoomTest(CPWebCase): time.sleep(1) -@pytest.mark.usefixtures("fix_pdp_post", "fix_select_latest_policies_boom") +@pytest.mark.usefixtures("fix_pdp_post", "fix_select_latest_policies_boom", "fix_discovery") class WebServerInternalBoomTest(CPWebCase): """testing the web-server - runs tests in alphabetical order of method names""" def setup_server(): @@ -952,6 +931,7 @@ class WebServerInternalBoomTest(CPWebCase): WebServerInternalBoomTest.do_gc_test = False Settings.logger.info("shutdown...") + audit.audit_done("shutdown") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') @@ -962,7 +942,8 @@ class WebServerInternalBoomTest(CPWebCase): @pytest.mark.usefixtures( "fix_pdp_post_big", "fix_deploy_handler_fail", - "fix_policy_receiver_websocket" + "fix_policy_receiver_websocket", + "fix_discovery" ) def test_catch_ups_failed_dh(): """test run policy handler with catchups and failed deployment-handler""" diff --git a/tests/test_step_timer.py b/tests/test_step_timer.py index ff89388..d4a0df1 100644 --- a/tests/test_step_timer.py +++ b/tests/test_step_timer.py @@ -26,7 +26,7 @@ from datetime import datetime from policyhandler.config import Config from policyhandler.step_timer import StepTimer -Config.load_from_file() +Config.init_config() class MockTimerController(object): diff --git a/version.properties b/version.properties index 03b71bc..29d7138 100644 --- a/version.properties +++ b/version.properties @@ -1,5 +1,5 @@ major=4 -minor=1 +minor=2 patch=0 base_version=${major}.${minor}.${patch} release_version=${base_version} -- cgit 1.2.3-korg