summaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-06-21 09:19:07 -0400
committerAlex Shatov <alexs@att.com>2018-06-21 09:19:07 -0400
commitd444b320dea07248dc69c81d0ce9ea5fc353e701 (patch)
treecacd244c5e2424faca5e088dc3ca3baf9afd8511 /policyhandler
parentc9ec231483d905f3a391c3985c2c2762344ed5c1 (diff)
3.0.1 policy-handler - cleaning sonar smells
- no change of functionality or API - removed the unused enum34>=1.1.6 from requirements.txt and setup.py - refactored run_policy.sh to redirect the stdout+stderr only once - refactoring to remove smells+vulnerability reported by sonar -- renamed Config.config to Config.settings -- removed the commented out code in customizer.py -- renamed StepTimer.NEXT to StepTimer.STATE_NEXT to avoid the naming confusion with the method StepTimer.next. Also renamed the related StepTimer.STATE_* constants -- refactored several functions by extracting methods to eliminate 4 out of 5 "brain-overload" smells reported by sonar -- moved the literal string for the socket_host "0.0.0.0" to a constant on the web-server to avoid the reported vulnerability Change-Id: I4c7d47d41c6ecd7cb28f6704f5dad2053c1ca7d6 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-515
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/__main__.py2
-rw-r--r--policyhandler/config.py16
-rw-r--r--policyhandler/customize/customizer.py4
-rw-r--r--policyhandler/deploy_handler.py2
-rw-r--r--policyhandler/policy_receiver.py4
-rw-r--r--policyhandler/policy_rest.py391
-rw-r--r--policyhandler/policy_updater.py2
-rw-r--r--policyhandler/step_timer.py75
-rw-r--r--policyhandler/web_server.py3
9 files changed, 269 insertions, 230 deletions
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py
index 1f17d9d..04ca657 100644
--- a/policyhandler/__main__.py
+++ b/policyhandler/__main__.py
@@ -46,7 +46,7 @@ def run_policy_handler():
Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
logger.info("starting policy_handler with config:")
- logger.info(Audit.log_json_dumps(Config.config))
+ logger.info(Audit.log_json_dumps(Config.settings))
audit = Audit(req_message="start policy handler")
PolicyReceiver.run(audit)
diff --git a/policyhandler/config.py b/policyhandler/config.py
index 39d528b..8e6edf9 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -42,7 +42,7 @@ class Config(object):
FIELD_POLICY_ENGINE = "policy_engine"
wservice_port = 25577
_logger = logging.getLogger("policy_handler.config")
- config = None
+ settings = None
@staticmethod
def merge(new_config):
@@ -50,19 +50,19 @@ class Config(object):
if not new_config:
return
- if not Config.config:
- Config.config = new_config
+ if not Config.settings:
+ Config.settings = new_config
return
new_config = copy.deepcopy(new_config)
- Config.config.update(new_config)
+ Config.settings.update(new_config)
@staticmethod
def get_system_name():
"""find the name of the policy-handler system
to be used as the key in consul-kv for config of policy-handler
"""
- return (Config.config or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER)
+ return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER)
@staticmethod
def discover():
@@ -76,9 +76,9 @@ class Config(object):
Config._logger.debug("loaded config from discovery(%s): %s", \
discovery_key, json.dumps(new_config))
- Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.config))
+ Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings))
Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- Config._logger.info("merged config from discovery: %s", json.dumps(Config.config))
+ Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings))
@staticmethod
def load_from_file(file_path=None):
@@ -102,5 +102,5 @@ class Config(object):
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- Config._logger.info("config loaded from file: %s", json.dumps(Config.config))
+ Config._logger.info("config loaded from file: %s", json.dumps(Config.settings))
return True
diff --git a/policyhandler/customize/customizer.py b/policyhandler/customize/customizer.py
index 38928e9..9ab5967 100644
--- a/policyhandler/customize/customizer.py
+++ b/policyhandler/customize/customizer.py
@@ -31,8 +31,4 @@ class Customizer(CustomizerBase):
see README.md for the sample of the customizer.py
"""
- # def __init__(self):
- # """class that contains the customization"""
- # super().__init__()
-
pass
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 4ea5ad1..ea703f4 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -66,7 +66,7 @@ class DeployHandler(object):
requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
)
- config_dh = Config.config.get("deploy_handler")
+ config_dh = Config.settings.get("deploy_handler")
if config_dh and isinstance(config_dh, dict):
# dns based routing to deployment-handler
# config for policy-handler >= 2.4.0
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 280e3c6..e1584a3 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -55,7 +55,7 @@ class _PolicyReceiver(Thread):
self._lock = Lock()
self._keep_running = True
- config = Config.config[Config.FIELD_POLICY_ENGINE]
+ config = Config.settings[Config.FIELD_POLICY_ENGINE]
self.web_socket_url = resturl = config["url"] + config["path_pdp"]
if resturl.startswith("https:"):
@@ -66,7 +66,7 @@ class _PolicyReceiver(Thread):
self._web_socket = None
scope_prefixes = [scope_prefix.replace(".", "[.]")
- for scope_prefix in Config.config["scope_prefixes"]]
+ for scope_prefix in Config.settings["scope_prefixes"]]
self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
_PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
self._policy_updater = PolicyUpdater()
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 16c38bb..c8018f6 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -36,17 +36,17 @@ from .policy_utils import PolicyUtils
class PolicyRest(object):
- """ policy-engine """
+ """using the http API to policy-engine"""
_logger = logging.getLogger("policy_handler.policy_rest")
_lazy_inited = False
POLICY_GET_CONFIG = 'getConfig'
- POLICY_CONFIG_STATUS = "policyConfigStatus"
- CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
- CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
- POLICY_CONFIG_MESSAGE = "policyConfigMessage"
- NO_RESPONSE_RECEIVED = "No Response Received"
- POLICY_ENGINE_STATUS_CODE_ERROR = 400
- PE_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
+ PDP_CONFIG_STATUS = "policyConfigStatus"
+ PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
+ PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
+ PDP_CONFIG_MESSAGE = "policyConfigMessage"
+ PDP_NO_RESPONSE_RECEIVED = "No Response Received"
+ PDP_STATUS_CODE_ERROR = 400
+ PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
MIN_VERSION_EXPECTED = "min_version_expected"
IGNORE_POLICY_NAMES = "ignore_policy_names"
@@ -68,7 +68,7 @@ class PolicyRest(object):
return
PolicyRest._lazy_inited = True
- config = Config.config[Config.FIELD_POLICY_ENGINE]
+ config = Config.settings[Config.FIELD_POLICY_ENGINE]
pool_size = config.get("pool_connections", 20)
PolicyRest._requests_session = requests.Session()
@@ -85,15 +85,15 @@ class PolicyRest(object):
+ config["path_api"] + PolicyRest.POLICY_GET_CONFIG
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
- PolicyRest._thread_pool_size = Config.config.get("thread_pool_size", 4)
+ PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._scope_prefixes = Config.config["scope_prefixes"]
+ PolicyRest._scope_prefixes = Config.settings["scope_prefixes"]
PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \
len(PolicyRest._scope_prefixes))
- PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1
- PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
+ PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
+ PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
PolicyRest._logger.info(
"PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
@@ -137,6 +137,19 @@ class PolicyRest(object):
PolicyRest._url_get_config, res.status_code, msg, res.text,
Metrics.log_json_dumps(dict(res.request.headers.items())))
+ status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
+
+ if status_code:
+ return status_code, res_data
+
+ metrics.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
+ PolicyRest._logger.info(log_line)
+ return res.status_code, res_data
+
+ @staticmethod
+ def _extract_pdp_res_data(audit, metrics, log_line, res):
+ """special treatment of pdp response"""
res_data = None
if res.status_code == requests.codes.ok:
res_data = res.json()
@@ -145,7 +158,7 @@ class PolicyRest(object):
rslt = res_data[0]
if rslt and not rslt.get(POLICY_NAME):
res_data = None
- if rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.NO_RESPONSE_RECEIVED:
+ if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
error_msg = "unexpected {0}".format(log_line)
@@ -153,30 +166,31 @@ class PolicyRest(object):
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
- return (error_code, None)
+ return error_code, None
+ return None, res_data
- elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR:
+ if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR:
try:
- rslt = res.json()
- if rslt and isinstance(rslt, list) and len(rslt) == 1:
- rslt = rslt[0]
- if rslt and not rslt.get(POLICY_NAME) \
- and rslt.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_NOT_FOUND \
- and rslt.get(PolicyRest.POLICY_CONFIG_MESSAGE) == PolicyRest.PE_DATA_NOT_FOUND:
- status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
- info_msg = "not found {0}".format(log_line)
-
- PolicyRest._logger.info(info_msg)
- metrics.set_http_status_code(status_code)
- metrics.metrics(info_msg)
- return (status_code, None)
+ res_data = res.json()
except ValueError:
- pass
+ return None, None
+
+ if not res_data or not isinstance(res_data, list) or len(res_data) != 1:
+ return None, None
+
+ rslt = res_data[0]
+ if (rslt and not rslt.get(POLICY_NAME)
+ and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
+ and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
+ status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
+ info_msg = "not found {0}".format(log_line)
+
+ PolicyRest._logger.info(info_msg)
+ metrics.set_http_status_code(status_code)
+ metrics.metrics(info_msg)
+ return status_code, None
+ return None, None
- metrics.set_http_status_code(res.status_code)
- metrics.metrics(log_line)
- PolicyRest._logger.info(log_line)
- return res.status_code, res_data
@staticmethod
def _validate_policy(policy):
@@ -188,76 +202,20 @@ class PolicyRest(object):
return bool(
policy_body
- and policy_body.get(PolicyRest.POLICY_CONFIG_STATUS) == PolicyRest.CONFIG_RETRIEVED
+ and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED
and policy_body.get(POLICY_CONFIG)
)
@staticmethod
def get_latest_policy(aud_policy_id):
- """Get the latest policy for the policy_id from the policy-engine"""
+ """safely try retrieving the latest policy for the policy_id from the policy-engine"""
audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format(
policy_id, min_version_expected, json.dumps(ignore_policy_names))
try:
- PolicyRest._lazy_init()
- status_code = 0
- retry_get_config = audit.kwargs.get("retry_get_config")
- policy_configs = None
- latest_policy = None
- expect_policy_removed = (ignore_policy_names and not min_version_expected)
-
- for retry in range(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug(str_metrics)
-
- status_code, policy_configs = PolicyRest._pdp_get_config(
- audit, {POLICY_NAME:policy_id}
- )
-
- PolicyRest._logger.debug("%s %s policy_configs: %s",
- status_code, policy_id, json.dumps(policy_configs or []))
-
- latest_policy = PolicyUtils.select_latest_policy(
- policy_configs, min_version_expected, ignore_policy_names
- )
-
- if not latest_policy and not expect_policy_removed:
- audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
- .format(policy_id, json.dumps(policy_configs or [])),
- error_code=AuditResponseCode.DATA_ERROR)
-
- if (latest_policy
- or not retry_get_config
- or (expect_policy_removed and not policy_configs)
- or not PolicyRest._policy_retry_sleep
- or audit.is_serious_error(status_code)):
- break
-
- if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
- .format(PolicyRest._url_get_config, retry, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
- break
-
- audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
- retry, PolicyRest._url_get_config,
- PolicyRest._policy_retry_sleep, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
- time.sleep(PolicyRest._policy_retry_sleep)
-
- if (expect_policy_removed and not latest_policy
- and AuditHttpCode.RESPONSE_ERROR.value == status_code):
- audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
- return None
-
- audit.set_http_status_code(status_code)
- if not PolicyRest._validate_policy(latest_policy):
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
- "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
- error_code=AuditResponseCode.DATA_ERROR)
-
- return latest_policy
+ return PolicyRest._get_latest_policy(
+ audit, policy_id, min_version_expected, ignore_policy_names, str_metrics)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
@@ -271,100 +229,92 @@ class PolicyRest(object):
@staticmethod
- def get_latest_updated_policies(aud_policy_updates):
- """Get the latest policies of the list of policy_names from the policy-engine"""
- audit, policies_updated, policies_removed = aud_policy_updates
- if not policies_updated and not policies_removed:
- return None, None
+ def _get_latest_policy(audit, policy_id,
+ min_version_expected, ignore_policy_names, str_metrics):
+ """retry several times getting the latest policy for the policy_id from the policy-engine"""
+ PolicyRest._lazy_init()
+ latest_policy = None
+ status_code = 0
+ retry_get_config = audit.kwargs.get("retry_get_config")
+ expect_policy_removed = (ignore_policy_names and not min_version_expected)
+
+ for retry in range(1, PolicyRest._policy_retry_count + 1):
+ PolicyRest._logger.debug(str_metrics)
- str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
- len(policies_updated), json.dumps(policies_updated),
- len(policies_removed), json.dumps(policies_removed))
+ done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
+ audit, policy_id, min_version_expected, ignore_policy_names,
+ expect_policy_removed)
- target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity)
- try:
- PolicyRest._lazy_init()
- metrics = Metrics(aud_parent=audit,
- targetEntity=target_entity,
- targetServiceName=PolicyRest._url_get_config)
+ if done or not retry_get_config or not PolicyRest._policy_retry_sleep:
+ break
- metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
- PolicyRest._logger.debug(str_metrics)
+ if retry == PolicyRest._policy_retry_count:
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ .format(PolicyRest._url_get_config, retry, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
+ break
- policies_to_find = {}
- for (policy_name, policy_version) in policies_updated:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id or not policy_version.isdigit():
- continue
- policy = policies_to_find.get(policy_id)
- if not policy:
- policies_to_find[policy_id] = {
- POLICY_ID: policy_id,
- PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
- PolicyRest.IGNORE_POLICY_NAMES: {}
- }
- continue
- if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
- policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
-
- for (policy_name, _) in policies_removed:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id:
- continue
- policy = policies_to_find.get(policy_id)
- if not policy:
- policies_to_find[policy_id] = {
- POLICY_ID: policy_id,
- PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
- }
- continue
- policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
-
- apns = [(audit, policy_id,
- policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
- policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
- for (policy_id, policy_to_find) in policies_to_find.items()]
-
- policies = None
- apns_length = len(apns)
- if apns_length == 1:
- policies = [PolicyRest.get_latest_policy(apns[0])]
- else:
- pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
- policies = pool.map(PolicyRest.get_latest_policy, apns)
- pool.close()
- pool.join()
+ audit.warn(
+ "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ retry, PolicyRest._url_get_config,
+ PolicyRest._policy_retry_sleep, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
+ time.sleep(PolicyRest._policy_retry_sleep)
+
+ if (expect_policy_removed and not latest_policy
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code):
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
+ audit.set_http_status_code(status_code)
+ if not PolicyRest._validate_policy(latest_policy):
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return latest_policy
+
+ @staticmethod
+ def _get_latest_policy_once(audit, policy_id,
+ min_version_expected, ignore_policy_names,
+ expect_policy_removed):
+ """single attempt to get the latest policy for the policy_id from the policy-engine"""
- metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)))
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
- updated_policies = dict((policy[POLICY_ID], policy)
- for policy in policies
- if policy and policy.get(POLICY_ID))
+ PolicyRest._logger.debug("%s %s policy_configs: %s",
+ status_code, policy_id, json.dumps(policy_configs or []))
- removed_policies = dict((policy_id, True)
- for (policy_id, policy_to_find) in policies_to_find.items()
- if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
- and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
- and policy_id not in updated_policies)
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_configs, min_version_expected, ignore_policy_names
+ )
+
+ if not latest_policy and not expect_policy_removed:
+ audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ .format(policy_id, json.dumps(policy_configs or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ done = bool(latest_policy
+ or (expect_policy_removed and not policy_configs)
+ or audit.is_serious_error(status_code))
- errored_policies = dict((policy_id, policy_to_find)
- for (policy_id, policy_to_find) in policies_to_find.items()
- if policy_id not in updated_policies
- and policy_id not in removed_policies)
+ return done, latest_policy, status_code
- PolicyRest._logger.debug(
- "result updated_policies %s, removed_policies %s, errored_policies %s",
- json.dumps(updated_policies), json.dumps(removed_policies),
- json.dumps(errored_policies))
+ @staticmethod
+ def get_latest_updated_policies(aud_policy_updates):
+ """safely try retrieving the latest policies for the list of policy_names"""
+ audit, policies_updated, policies_removed = aud_policy_updates
+ if not policies_updated and not policies_removed:
+ return None, None
- if errored_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
- "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
- error_code=AuditResponseCode.DATA_ERROR)
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
- return updated_policies, removed_policies
+ try:
+ return PolicyRest._get_latest_updated_policies(
+ audit, str_metrics, policies_updated, policies_removed)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
@@ -376,6 +326,93 @@ class PolicyRest(object):
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None, None
+ @staticmethod
+ def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
+ """Get the latest policies of the list of policy_names from the policy-engine"""
+ PolicyRest._lazy_init()
+ metrics = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ PolicyRest._logger.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_name, policy_version) in policies_updated:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+ policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+
+ for (policy_name, _) in policies_removed:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.items()]
+
+ policies = None
+ apns_length = len(apns)
+ if apns_length == 1:
+ policies = [PolicyRest.get_latest_policy(apns[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
+ policies = pool.map(PolicyRest.get_latest_policy, apns)
+ pool.close()
+ pool.join()
+
+ metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)))
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ PolicyRest._logger.debug(
+ "result updated_policies %s, removed_policies %s, errored_policies %s",
+ json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return updated_policies, removed_policies
+
@staticmethod
def _get_latest_policies(aud_policy_filter):
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index cff7b41..5ba7c29 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -47,7 +47,7 @@ class PolicyUpdater(Thread):
self._aud_shutdown = None
self._aud_catch_up = None
- catch_up_config = Config.config.get(CATCH_UP, {})
+ catch_up_config = Config.settings.get(CATCH_UP, {})
self._catch_up_interval = catch_up_config.get("interval") or 15*60
self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
self._catch_up_skips = 0
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
index 2a13dd5..768b400 100644
--- a/policyhandler/step_timer.py
+++ b/policyhandler/step_timer.py
@@ -25,12 +25,12 @@ from threading import Event, RLock, Thread
class StepTimer(Thread):
"""call on_time after interval number of seconds, then wait to continue"""
- INIT = "init"
- NEXT = "next"
- STARTED = "started"
- PAUSED = "paused"
- STOPPING = "stopping"
- STOPPED = "stopped"
+ STATE_INIT = "init"
+ STATE_NEXT = "next"
+ STATE_STARTED = "started"
+ STATE_PAUSED = "paused"
+ STATE_STOPPING = "stopping"
+ STATE_STOPPED = "stopped"
def __init__(self, name, interval, on_time, logger, *args, **kwargs):
"""create step timer with controlled start. next step and pause"""
@@ -49,7 +49,7 @@ class StepTimer(Thread):
self._paused = False
self._finished = False
- self._request = StepTimer.INIT
+ self._request = StepTimer.STATE_INIT
self._req_count = 0
self._req_time = 0
self._req_ts = datetime.utcnow()
@@ -80,14 +80,14 @@ class StepTimer(Thread):
self._timeout.set()
else:
self._next.set()
- self._request_to_timer(StepTimer.NEXT)
+ self._request_to_timer(StepTimer.STATE_NEXT)
def pause(self):
"""pause the timer"""
with self._lock:
self._paused = True
self._next.clear()
- self._request_to_timer(StepTimer.PAUSED)
+ self._request_to_timer(StepTimer.STATE_PAUSED)
def stop(self):
"""stop the timer if it hasn't finished yet"""
@@ -95,12 +95,12 @@ class StepTimer(Thread):
self._finished = True
self._timeout.set()
self._next.set()
- self._request_to_timer(StepTimer.STOPPING)
+ self._request_to_timer(StepTimer.STATE_STOPPING)
def _request_to_timer(self, request):
"""set the request on the timer"""
with self._lock:
- if request in [StepTimer.NEXT, StepTimer.STARTED]:
+ if request in [StepTimer.STATE_NEXT, StepTimer.STATE_STARTED]:
self._req_count += 1
prev_req = self._request
@@ -111,8 +111,8 @@ class StepTimer(Thread):
self._logger.info("{0}[{1}] {2}->{3}".format(
self.name, self._req_time, prev_req, self.get_timer_status()))
- def _timer_substep(self, substep):
- """log exe step"""
+ def _log_substep(self, substep):
+ """log timer substep"""
with self._lock:
self._substep = substep
utcnow = datetime.utcnow()
@@ -120,51 +120,56 @@ class StepTimer(Thread):
self._substep_ts = utcnow
self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status()))
+ def _on_time_event(self):
+ """execute the _on_time event"""
+ if self._paused:
+ self._log_substep("paused - skip on_time event")
+ return
+
+ try:
+ self._log_substep("on_time event")
+ self._on_time(*self._args, **self._kwargs)
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})"
+ .format(self.name, type(ex).__name__, str(ex), "_on_time",
+ json.dumps(self._args), json.dumps(self._kwargs)))
+ self._logger.exception(error_msg)
+
def run(self):
"""loop one step a time until stopped=finished"""
- self._request_to_timer(StepTimer.STARTED)
+ self._request_to_timer(StepTimer.STATE_STARTED)
while True:
with self._lock:
self._timeout.clear()
self._waiting_for_timeout = True
- self._timer_substep("waiting for timeout {0}...".format(self._interval))
+ self._log_substep("waiting for timeout {0}...".format(self._interval))
interrupted = self._timeout.wait(self._interval)
with self._lock:
self._waiting_for_timeout = False
- self._timer_substep("woke up after {0}timeout"
- .format((interrupted and "interrupted ") or ""))
+ self._log_substep("woke up after {0}timeout"
+ .format((interrupted and "interrupted ") or ""))
if self._finished:
- self._timer_substep("finished")
+ self._log_substep("finished")
break
if self._next.is_set() and interrupted:
self._next.clear()
- self._timer_substep("restart timer")
+ self._log_substep("restart timer")
continue
- if self._paused:
- self._timer_substep("paused - skip on_time event")
- else:
- try:
- self._timer_substep("on_time event")
- self._on_time(*self._args, **self._kwargs)
- except Exception as ex:
- error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})"
- .format(self.name, type(ex).__name__, str(ex), "_on_time",
- json.dumps(self._args), json.dumps(self._kwargs)))
- self._logger.exception(error_msg)
-
- self._timer_substep("waiting for next...")
+ self._on_time_event()
+
+ self._log_substep("waiting for next...")
self._next.wait()
with self._lock:
self._next.clear()
- self._timer_substep("woke up on next")
+ self._log_substep("woke up on next")
if self._finished:
- self._timer_substep("finished")
+ self._log_substep("finished")
break
- self._request_to_timer(StepTimer.STOPPED)
+ self._request_to_timer(StepTimer.STATE_STOPPED)
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 041a442..c49536f 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -30,13 +30,14 @@ from .policy_receiver import PolicyReceiver
class PolicyWeb(object):
"""run REST API of policy-handler"""
+ SERVER_HOST = "0.0.0.0"
logger = logging.getLogger("policy_handler.policy_web")
@staticmethod
def run_forever(audit):
"""run the web-server of the policy-handler forever"""
PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port)
- cherrypy.config.update({"server.socket_host": "0.0.0.0",
+ cherrypy.config.update({"server.socket_host": PolicyWeb.SERVER_HOST,
'server.socket_port': Config.wservice_port})
cherrypy.tree.mount(_PolicyWeb(), '/')
audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port))