diff options
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r-- | policyhandler/policy_rest.py | 111 |
1 files changed, 74 insertions, 37 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 6ec982a..d10f4bf 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -23,10 +23,11 @@ import json import logging import time from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock import requests -from .config import Config +from .config import Config, Settings from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, @@ -51,6 +52,11 @@ class PolicyRest(object): EXPECTED_VERSIONS = "expected_versions" IGNORE_POLICY_NAMES = "ignore_policy_names" + _lock = Lock() + _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, + Config.THREAD_POOL_SIZE, + Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) + _requests_session = None _url_get_config = None _headers = None @@ -60,39 +66,69 @@ class PolicyRest(object): _policy_retry_sleep = 0 @staticmethod - def _lazy_init(): + def _init(): """init static config""" - if PolicyRest._lazy_inited: - return - PolicyRest._lazy_inited = True + _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) - config = Config.settings[Config.FIELD_POLICY_ENGINE] + if not PolicyRest._requests_session: + PolicyRest._requests_session = requests.Session() - pool_size = Config.settings.get("pool_connections", 20) - PolicyRest._requests_session = requests.Session() - PolicyRest._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) - PolicyRest._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) + if changed: + PolicyRest._requests_session.mount( + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) + PolicyRest._requests_session.mount( + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) PolicyRest._url_get_config = (config["url"] + config["path_api"] + PolicyRest.POLICY_GET_CONFIG) PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) + _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( + Config.THREAD_POOL_SIZE, 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 - PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) + _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_COUNT, 1) + _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_SLEEP, 0) - PolicyRest._logger.info( - "PolicyClient url(%s) headers(%s)", - PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers)) + PolicyRest._logger.info("PDP url(%s) headers(%s): %s", + PolicyRest._url_get_config, + Metrics.json_dumps(PolicyRest._headers), + PolicyRest._settings) + + PolicyRest._settings.commit_change() + PolicyRest._lazy_inited = True + + @staticmethod + def reconfigure(): + """reconfigure""" + with PolicyRest._lock: + PolicyRest._settings.set_config(Config.discovered_config) + if not PolicyRest._settings.is_changed(): + PolicyRest._settings.commit_change() + return False + + PolicyRest._lazy_inited = False + PolicyRest._init() + return True + + @staticmethod + def _lazy_init(): + """init static config""" + if PolicyRest._lazy_inited: + return + + with PolicyRest._lock: + if PolicyRest._lazy_inited: + return + + PolicyRest._settings.set_config(Config.discovered_config) + PolicyRest._init() @staticmethod def _pdp_get_config(audit, json_body): @@ -100,26 +136,27 @@ class PolicyRest(object): metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) - msg = json.dumps(json_body) - headers = copy.copy(PolicyRest._headers) + with PolicyRest._lock: + session = PolicyRest._requests_session + url = PolicyRest._url_get_config + headers = copy.deepcopy(PolicyRest._headers) + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id - headers_str = Metrics.log_json_dumps(headers) + headers_str = Metrics.json_dumps(headers) + + msg = json.dumps(json_body) + log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str) + + PolicyRest._logger.info(metrics.metrics_start(log_line)) - log_line = "post to PDP {0} msg={1} headers={2}".format( - PolicyRest._url_get_config, msg, headers_str) - metrics.metrics_start(log_line) - PolicyRest._logger.info(log_line) res = None try: - res = PolicyRest._requests_session.post( - PolicyRest._url_get_config, json=json_body, headers=headers) + res = session.post(url, json=json_body, headers=headers) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) - error_msg = ( - "failed to post to PDP {0} {1}: {2} msg={3} headers={4}" - .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str)) + error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) PolicyRest._logger.exception(error_msg) metrics.set_http_status_code(error_code) @@ -127,9 +164,9 @@ class PolicyRest(object): metrics.metrics(error_msg) return (error_code, None) - log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( - PolicyRest._url_get_config, res.status_code, msg, res.text, - Metrics.log_json_dumps(dict(res.request.headers.items()))) + log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format( + res.status_code, url, msg, res.text, + Metrics.json_dumps(dict(res.request.headers.items()))) status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) |