aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_rest.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_rest.py')
-rw-r--r--policyhandler/policy_rest.py111
1 files changed, 74 insertions, 37 deletions
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 6ec982a..d10f4bf 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -23,10 +23,11 @@ import json
import logging
import time
from multiprocessing.dummy import Pool as ThreadPool
+from threading import Lock
import requests
-from .config import Config
+from .config import Config, Settings
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
@@ -51,6 +52,11 @@ class PolicyRest(object):
EXPECTED_VERSIONS = "expected_versions"
IGNORE_POLICY_NAMES = "ignore_policy_names"
+ _lock = Lock()
+ _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS,
+ Config.THREAD_POOL_SIZE,
+ Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP)
+
_requests_session = None
_url_get_config = None
_headers = None
@@ -60,39 +66,69 @@ class PolicyRest(object):
_policy_retry_sleep = 0
@staticmethod
- def _lazy_init():
+ def _init():
"""init static config"""
- if PolicyRest._lazy_inited:
- return
- PolicyRest._lazy_inited = True
+ _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
- config = Config.settings[Config.FIELD_POLICY_ENGINE]
+ if not PolicyRest._requests_session:
+ PolicyRest._requests_session = requests.Session()
- pool_size = Config.settings.get("pool_connections", 20)
- PolicyRest._requests_session = requests.Session()
- PolicyRest._requests_session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
- PolicyRest._requests_session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
+ changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20)
+ if changed:
+ PolicyRest._requests_session.mount(
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
+ PolicyRest._requests_session.mount(
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
PolicyRest._url_get_config = (config["url"] + config["path_api"]
+ PolicyRest.POLICY_GET_CONFIG)
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
- PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
+ _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
+ Config.THREAD_POOL_SIZE, 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
- PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
+ _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_COUNT, 1)
+ _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_SLEEP, 0)
- PolicyRest._logger.info(
- "PolicyClient url(%s) headers(%s)",
- PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers))
+ PolicyRest._logger.info("PDP url(%s) headers(%s): %s",
+ PolicyRest._url_get_config,
+ Metrics.json_dumps(PolicyRest._headers),
+ PolicyRest._settings)
+
+ PolicyRest._settings.commit_change()
+ PolicyRest._lazy_inited = True
+
+ @staticmethod
+ def reconfigure():
+ """reconfigure"""
+ with PolicyRest._lock:
+ PolicyRest._settings.set_config(Config.discovered_config)
+ if not PolicyRest._settings.is_changed():
+ PolicyRest._settings.commit_change()
+ return False
+
+ PolicyRest._lazy_inited = False
+ PolicyRest._init()
+ return True
+
+ @staticmethod
+ def _lazy_init():
+ """init static config"""
+ if PolicyRest._lazy_inited:
+ return
+
+ with PolicyRest._lock:
+ if PolicyRest._lazy_inited:
+ return
+
+ PolicyRest._settings.set_config(Config.discovered_config)
+ PolicyRest._init()
@staticmethod
def _pdp_get_config(audit, json_body):
@@ -100,26 +136,27 @@ class PolicyRest(object):
metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
targetServiceName=PolicyRest._url_get_config)
- msg = json.dumps(json_body)
- headers = copy.copy(PolicyRest._headers)
+ with PolicyRest._lock:
+ session = PolicyRest._requests_session
+ url = PolicyRest._url_get_config
+ headers = copy.deepcopy(PolicyRest._headers)
+
headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
- headers_str = Metrics.log_json_dumps(headers)
+ headers_str = Metrics.json_dumps(headers)
+
+ msg = json.dumps(json_body)
+ log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str)
+
+ PolicyRest._logger.info(metrics.metrics_start(log_line))
- log_line = "post to PDP {0} msg={1} headers={2}".format(
- PolicyRest._url_get_config, msg, headers_str)
- metrics.metrics_start(log_line)
- PolicyRest._logger.info(log_line)
res = None
try:
- res = PolicyRest._requests_session.post(
- PolicyRest._url_get_config, json=json_body, headers=headers)
+ res = session.post(url, json=json_body, headers=headers)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- error_msg = (
- "failed to post to PDP {0} {1}: {2} msg={3} headers={4}"
- .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str))
+ error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
PolicyRest._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
@@ -127,9 +164,9 @@ class PolicyRest(object):
metrics.metrics(error_msg)
return (error_code, None)
- log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
- PolicyRest._url_get_config, res.status_code, msg, res.text,
- Metrics.log_json_dumps(dict(res.request.headers.items())))
+ log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format(
+ res.status_code, url, msg, res.text,
+ Metrics.json_dumps(dict(res.request.headers.items())))
status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)