aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-12-05 15:23:50 -0500
committerAlex Shatov <alexs@att.com>2018-12-05 15:23:50 -0500
commita39f4e82cef0414f510cf20e25864ac04cc8f055 (patch)
tree759f5a9f51c368456e8d8f3be65dac1951de0bd9 /policyhandler
parent61fe1b8aa4f6e3c2388cbc573f655f60b4c1b5f3 (diff)
4.5.0 policy-handler - multi change
DCAEGEN2-853: - stop reporting the absence of policies or updates as error - this is an expected result == INFO or WARNING DCAEGEN2-903: preparation for TLS on the web-server of policy-handler DCAEGEN2-930: - configurable timeouts for http requests from policy-handler - added configurable pinging on the web-socket to PDP - added healthcheck info on the web-socket - upgraded websocket-client lib to 0.53.0 DCAEGEN2-1017: fixed a bug on policy-filter matching by filter_config_name - refactored and enhanced the unit-tests Change-Id: I111ddc57bb978554ef376cbf916965b6667dad9b Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-853 Issue-ID: DCAEGEN2-903 Issue-ID: DCAEGEN2-930 Issue-ID: DCAEGEN2-1017
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/config.py59
-rw-r--r--policyhandler/deploy_handler.py27
-rw-r--r--policyhandler/discovery.py5
-rw-r--r--policyhandler/onap/audit.py81
-rw-r--r--policyhandler/policy_matcher.py40
-rw-r--r--policyhandler/policy_receiver.py131
-rw-r--r--policyhandler/policy_rest.py95
-rw-r--r--policyhandler/policy_updater.py36
-rw-r--r--policyhandler/web_server.py78
9 files changed, 376 insertions, 176 deletions
diff --git a/policyhandler/config.py b/policyhandler/config.py
index d94ed79..5184f7f 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -148,24 +148,43 @@ class Config(object):
TLS_CA_MODE = "tls_ca_mode"
TLS_WSS_CA_MODE = "tls_wss_ca_mode"
TLS_CA_MODE_DO_NOT_VERIFY = "do_not_verify"
+ TIMEOUT_IN_SECS = "timeout_in_secs"
+ CONSUL_TIMEOUT_IN_SECS = "consul_timeout_in_secs"
+ WS_PING_INTERVAL_IN_SECS = "ws_ping_interval_in_secs"
+ DEFAULT_TIMEOUT_IN_SECS = 60
system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
consul_url = "http://consul:8500"
+ consul_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
tls_cacert_file = None
tls_server_cert_file = None
tls_private_key_file = None
+ tls_server_ca_chain_file = None
_local_config = Settings()
discovered_config = Settings()
@staticmethod
+ def _get_tls_file_path(tls_config, cert_directory, tls_name):
+ """calc file path and verify its existance"""
+ file_name = tls_config.get(tls_name)
+ if not file_name:
+ return None
+ tls_file_path = os.path.join(cert_directory, file_name)
+ if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK):
+ Config._logger.error("invalid %s: %s", tls_name, tls_file_path)
+ return None
+ return tls_file_path
+
+ @staticmethod
def _set_tls_config(tls_config):
"""verify and set tls certs in config"""
try:
Config.tls_cacert_file = None
Config.tls_server_cert_file = None
Config.tls_private_key_file = None
+ Config.tls_server_ca_chain_file = None
if not (tls_config and isinstance(tls_config, dict)):
Config._logger.info("no tls in config: %s", json.dumps(tls_config))
@@ -174,43 +193,28 @@ class Config(object):
cert_directory = tls_config.get("cert_directory")
if not (cert_directory and isinstance(cert_directory, str)):
- Config._logger.info("unexpected tls.cert_directory: %r", cert_directory)
+ Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory)
return
cert_directory = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory)
if not (cert_directory and os.path.isdir(cert_directory)):
- Config._logger.info("ignoring invalid cert_directory: %s", cert_directory)
+ Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory)
return
- cacert = tls_config.get("cacert")
- if cacert:
- tls_cacert_file = os.path.join(cert_directory, cacert)
- if not os.path.isfile(tls_cacert_file):
- Config._logger.error("invalid tls_cacert_file: %s", tls_cacert_file)
- else:
- Config.tls_cacert_file = tls_cacert_file
-
- server_cert = tls_config.get("server_cert")
- if server_cert:
- tls_server_cert_file = os.path.join(cert_directory, server_cert)
- if not os.path.isfile(tls_server_cert_file):
- Config._logger.error("invalid tls_server_cert_file: %s", tls_server_cert_file)
- else:
- Config.tls_server_cert_file = tls_server_cert_file
-
- private_key = tls_config.get("private_key")
- if private_key:
- tls_private_key_file = os.path.join(cert_directory, private_key)
- if not os.path.isfile(tls_private_key_file):
- Config._logger.error("invalid tls_private_key_file: %s", tls_private_key_file)
- else:
- Config.tls_private_key_file = tls_private_key_file
+ Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert")
+ Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory,
+ "server_cert")
+ Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory,
+ "private_key")
+ Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory,
+ "server_ca_chain")
finally:
Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file)
Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file)
+ Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file)
@staticmethod
def init_config(file_path=None):
@@ -239,6 +243,9 @@ class Config(object):
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
Config.consul_url = os.environ.get(
"CONSUL_URL", loaded_config.get(Config.FIELD_CONSUL_URL, Config.consul_url)).rstrip("/")
+ Config.consul_timeout_in_secs = loaded_config.get(Config.CONSUL_TIMEOUT_IN_SECS)
+ if not Config.consul_timeout_in_secs or Config.consul_timeout_in_secs < 1:
+ Config.consul_timeout_in_secs = Config.DEFAULT_TIMEOUT_IN_SECS
local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {})
Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name)
@@ -250,7 +257,7 @@ class Config(object):
@staticmethod
def discover(audit):
- """bring and merge the config settings from the discovery service"""
+ """bring the config settings from the discovery service"""
discovery_key = Config.system_name
from .discovery import DiscoveryClient
new_config = DiscoveryClient.get_value(audit, discovery_key)
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index b4b2468..0ffacba 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -145,6 +145,7 @@ class DeployHandler(object):
"""calling the deployment-handler web apis"""
_logger = logging.getLogger("policy_handler.deploy_handler")
DEFAULT_TARGET_ENTITY = "deployment_handler"
+ DEFAULT_TIMEOUT_IN_SECS = 60
_lazy_inited = False
_lock = Lock()
@@ -158,6 +159,7 @@ class DeployHandler(object):
_target_entity = None
_custom_kwargs = {}
_server_instance_uuid = None
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
server_instance_changed = False
@staticmethod
@@ -188,7 +190,8 @@ class DeployHandler(object):
# "query" : {
# "cfy_tenant_name" : "default_tenant"
# },
- # "tls_ca_mode" : "cert_directory"
+ # "tls_ca_mode" : "cert_directory",
+ # "timeout_in_secs": 60
# }
DeployHandler._target_entity = config_dh.get(TARGET_ENTITY,
DeployHandler.DEFAULT_TARGET_ENTITY)
@@ -204,6 +207,10 @@ class DeployHandler(object):
DeployHandler._target_entity, DeployHandler._url,
tls_ca_mode, json.dumps(DeployHandler._custom_kwargs))
+ DeployHandler._timeout_in_secs = config_dh.get(Config.TIMEOUT_IN_SECS)
+ if not DeployHandler._timeout_in_secs or DeployHandler._timeout_in_secs < 1:
+ DeployHandler._timeout_in_secs = DeployHandler.DEFAULT_TIMEOUT_IN_SECS
+
if not DeployHandler._url:
# discover routing to deployment-handler at consul-services
if not isinstance(config_dh, dict):
@@ -290,6 +297,7 @@ class DeployHandler(object):
target_entity = DeployHandler._target_entity
url = DeployHandler._url_policy
params = deepcopy(DeployHandler._query)
+ timeout_in_secs = DeployHandler._timeout_in_secs
custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity),
@@ -297,9 +305,9 @@ class DeployHandler(object):
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
log_action = "put to {} at {}".format(target_entity, url)
- log_data = "msg={} headers={}, params={} custom_kwargs({})".format(
+ log_data = "msg={} headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format(
json.dumps(message), json.dumps(headers),
- json.dumps(params), json.dumps(custom_kwargs))
+ json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs))
log_line = log_action + " " + log_data
DeployHandler._logger.info(log_line)
@@ -315,7 +323,8 @@ class DeployHandler(object):
res = None
try:
- res = session.put(url, json=message, headers=headers, params=params, **custom_kwargs)
+ res = session.put(url, json=message, headers=headers, params=params,
+ timeout=timeout_in_secs, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -357,6 +366,7 @@ class DeployHandler(object):
target_entity = DeployHandler._target_entity
url = DeployHandler._url_policy
params = deepcopy(DeployHandler._query)
+ timeout_in_secs = DeployHandler._timeout_in_secs
custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
metrics = Metrics(aud_parent=audit,
@@ -365,8 +375,8 @@ class DeployHandler(object):
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
log_action = "get from {} at {}".format(target_entity, url)
- log_data = "headers={}, params={} custom_kwargs({})".format(
- json.dumps(headers), json.dumps(params), json.dumps(custom_kwargs))
+ log_data = "headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format(
+ json.dumps(headers), json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs))
log_line = log_action + " " + log_data
DeployHandler._logger.info(log_line)
@@ -382,7 +392,8 @@ class DeployHandler(object):
res = None
try:
- res = session.get(url, headers=headers, params=params, **custom_kwargs)
+ res = session.get(url, headers=headers, params=params, timeout=timeout_in_secs,
+ **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -412,7 +423,7 @@ class DeployHandler(object):
policies = result.get(POLICIES, {})
policy_filters = result.get(POLICY_FILTERS, {})
if not policies and not policy_filters:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
DeployHandler._logger.warning(audit.warn(
"found no deployed policies or policy-filters: {}".format(log_line),
error_code=AuditResponseCode.DATA_ERROR))
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
index 5a35525..4c5b64e 100644
--- a/policyhandler/discovery.py
+++ b/policyhandler/discovery.py
@@ -56,7 +56,7 @@ class DiscoveryClient(object):
@staticmethod
def _discover_service(audit, service_name, service_path):
"""find the service record in consul"""
- response = requests.get(service_path)
+ response = requests.get(service_path, timeout=Config.consul_timeout_in_secs)
DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format(
response.status_code, service_path, response.text)))
@@ -113,7 +113,7 @@ class DiscoveryClient(object):
@staticmethod
def _get_value_from_kv(url):
"""get the value from consul-kv at discovery url"""
- response = requests.get(url)
+ response = requests.get(url, timeout=Config.consul_timeout_in_secs)
response.raise_for_status()
data = response.json()
value = base64.b64decode(data[0]["Value"]).decode("utf-8")
@@ -129,6 +129,7 @@ class DiscoveryClient(object):
log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url)
DiscoveryClient._logger.info(metrics.metrics_start(log_line))
+ status_code = None
try:
status_code, value = DiscoveryClient._get_value_from_kv(discovery_url)
except Exception as ex:
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index 1bee4a7..d63d0b2 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -64,10 +64,10 @@ ERROR_DESCRIPTION = "errorDescription"
class AuditHttpCode(Enum):
"""audit http codes"""
HTTP_OK = 200
+ DATA_NOT_FOUND_OK = 204
PERMISSION_UNAUTHORIZED_ERROR = 401
PERMISSION_FORBIDDEN_ERROR = 403
RESPONSE_ERROR = 400
- DATA_NOT_FOUND_ERROR = 404
SERVER_INTERNAL_ERROR = 500
SERVICE_UNAVAILABLE_ERROR = 503
DATA_ERROR = 1030
@@ -88,7 +88,7 @@ class AuditResponseCode(Enum):
def get_response_code(http_status_code):
"""calculates the response_code from max_http_status_code"""
response_code = AuditResponseCode.UNKNOWN_ERROR
- if http_status_code <= AuditHttpCode.HTTP_OK.value:
+ if http_status_code <= AuditHttpCode.DATA_NOT_FOUND_OK.value:
response_code = AuditResponseCode.SUCCESS
elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value,
@@ -99,8 +99,7 @@ class AuditResponseCode(Enum):
elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR
elif http_status_code in [AuditHttpCode.DATA_ERROR.value,
- AuditHttpCode.RESPONSE_ERROR.value,
- AuditHttpCode.DATA_NOT_FOUND_ERROR.value]:
+ AuditHttpCode.RESPONSE_ERROR.value]:
response_code = AuditResponseCode.DATA_ERROR
elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value:
response_code = AuditResponseCode.SCHEMA_ERROR
@@ -138,6 +137,7 @@ class _Audit(object):
_hostname = os.environ.get(HOSTNAME)
_health = Health()
+ _health_checkers = {}
_py_ver = sys.version.replace("\n", "")
_packages = []
@@ -167,6 +167,31 @@ class _Audit(object):
pass
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
+ """create audit object per each request in the system
+
+ :job_name: is the name of the audit job for health stats
+ :request_id: is the X-ECOMP-RequestID for tracing
+ :req_message: is the request message string for logging
+ :kwargs: - put any request related params into kwargs
+ """
+ self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
+ self.request_id = request_id
+ self.req_message = req_message or ""
+ self.kwargs = kwargs or {}
+
+ self.max_http_status_code = 0
+ self._lock = threading.Lock()
+
+
+ @staticmethod
+ def register_item_health(health_name, health_getter):
+ """
+ register the health-checker for the additional item
+ by its health_name and the function health_getter that returns its health status as json
+ """
+ _Audit._health_checkers[health_name] = health_getter
+
def health(self, full=False):
"""returns json for health check"""
utcnow = datetime.utcnow()
@@ -186,33 +211,21 @@ class _Audit(object):
"process_memory" : ProcessInfo.process_memory()
},
"stats" : _Audit._health.dump(),
+ "items" : dict((health_name, health_getter())
+ for health_name, health_getter in _Audit._health_checkers.items()),
"soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages}
}
- self.info("{} health: {}".format(_Audit._service_name, json.dumps(health)))
+ self.info("{} health: {}".format(_Audit._service_name,
+ json.dumps(health, sort_keys=True)))
return health
+
def process_info(self):
"""get the debug info on all the threads and memory"""
process_info = ProcessInfo.get_all()
self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info)))
return process_info
- def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
- """create audit object per each request in the system
-
- :job_name: is the name of the audit job for health stats
- :request_id: is the X-ECOMP-RequestID for tracing
- :req_message: is the request message string for logging
- :kwargs: - put any request related params into kwargs
- """
- self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
- self.request_id = request_id
- self.req_message = req_message or ""
- self.kwargs = kwargs or {}
-
- self.max_http_status_code = 0
- self._lock = threading.Lock()
-
def merge_all_kwargs(self, **kwargs):
"""returns the merge of copy of self.kwargs with the param kwargs"""
@@ -230,7 +243,7 @@ class _Audit(object):
def reset_http_status_not_found(self):
"""resets the highest(worst) http status code if data not found"""
with self._lock:
- if self.max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value:
+ if self.max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value:
self.max_http_status_code = 0
def get_max_http_status_code(self):
@@ -252,25 +265,23 @@ class _Audit(object):
== AuditResponseCode.get_response_code(status_code).value
or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- def _get_response_status(self, not_found_ok=None):
+ def _get_response_status(self):
"""calculates the response status fields from max_http_status_code"""
max_http_status_code = self.get_max_http_status_code()
response_code = AuditResponseCode.get_response_code(max_http_status_code)
- success = ((response_code.value == AuditResponseCode.SUCCESS.value)
- or (not_found_ok
- and max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value))
+ success = (response_code.value == AuditResponseCode.SUCCESS.value)
response_description = AuditResponseCode.get_human_text(response_code)
return success, max_http_status_code, response_code, response_description
def is_success(self):
- """returns whether the response_code is success"""
+ """returns whether the response_code is success or 204 - not found"""
success, _, _, _ = self._get_response_status()
return success
- def is_success_or_not_found(self):
- """returns whether the response_code is success or 404 - not found"""
- success, _, _, _ = self._get_response_status(not_found_ok=True)
- return success
+ def is_not_found(self):
+ """returns whether the response_code is 204 - not found"""
+ max_http_status_code = self.get_max_http_status_code()
+ return max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value
def debug(self, log_line, **kwargs):
"""debug - the debug=lowest level of logging"""
@@ -397,8 +408,8 @@ class Audit(_Audit):
def audit_done(self, result=None, **kwargs):
"""debug+audit - the audit=top level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- success, max_http_status_code, response_code, response_description = \
- self._get_response_status()
+ (success, max_http_status_code,
+ response_code, response_description) = self._get_response_status()
log_line = "{0} {1}".format(self.req_message, result or "").strip()
audit_func = None
timer = _Audit.get_elapsed_time(self._started)
@@ -461,8 +472,8 @@ class Metrics(_Audit):
def metrics(self, log_line, **kwargs):
"""debug+metrics - the metrics=sub-audit level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- success, max_http_status_code, response_code, response_description = \
- self._get_response_status()
+ (success, max_http_status_code,
+ response_code, response_description) = self._get_response_status()
metrics_func = None
timer = _Audit.get_elapsed_time(self._metrics_started)
if success:
diff --git a/policyhandler/policy_matcher.py b/policyhandler/policy_matcher.py
index 71b5ce8..d0786ba 100644
--- a/policyhandler/policy_matcher.py
+++ b/policyhandler/policy_matcher.py
@@ -23,6 +23,7 @@ import logging
import re
from .deploy_handler import DeployHandler, PolicyUpdateMessage
+from .onap.audit import AuditHttpCode, AuditResponseCode
from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES,
MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER,
POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS)
@@ -36,15 +37,32 @@ class PolicyMatcher(object):
PENDING_UPDATE = "pending_update"
@staticmethod
- def get_latest_policies(audit):
- """
- find the latest policies from policy-engine for the deployed policies and policy-filters
- """
+ def get_deployed_policies(audit):
+ """get the deployed policies and policy-filters"""
deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
+ if audit.is_not_found():
+ warning_txt = "got no deployed policies or policy-filters"
+ PolicyMatcher._logger.warning(warning_txt)
+ return {"warning": warning_txt}, None, None
+
if not audit.is_success() or (not deployed_policies and not deployed_policy_filters):
error_txt = "failed to retrieve policies from deployment-handler"
PolicyMatcher._logger.error(error_txt)
+ return {"error": error_txt}, None, None
+
+ return None, deployed_policies, deployed_policy_filters
+
+
+ @staticmethod
+ def build_catch_up_message(audit, deployed_policies, deployed_policy_filters):
+ """
+ find the latest policies from policy-engine for the deployed policies and policy-filters
+ """
+
+ if not (deployed_policies or deployed_policy_filters):
+ error_txt = "no deployed policies or policy-filters"
+ PolicyMatcher._logger.warning(error_txt)
return {"error": error_txt}, None
coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns(
@@ -54,7 +72,9 @@ class PolicyMatcher(object):
error_txt = ("failed to construct the coarse_regex_patterns from " +
"deployed_policies: {} and deployed_policy_filters: {}"
.format(deployed_policies, deployed_policy_filters))
- PolicyMatcher._logger.error(error_txt)
+ PolicyMatcher._logger.error(audit.error(
+ error_txt, error_code=AuditResponseCode.DATA_ERROR))
+ audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
return {"error": error_txt}, None
pdp_response = PolicyRest.get_latest_policies(
@@ -62,9 +82,9 @@ class PolicyMatcher(object):
for policy_name_pattern in coarse_regex_patterns]
)
- if not audit.is_success_or_not_found():
+ if not audit.is_success():
error_txt = "failed to retrieve policies from policy-engine"
- PolicyMatcher._logger.error(error_txt)
+ PolicyMatcher._logger.warning(error_txt)
return {"error": error_txt}, None
latest_policies = pdp_response.get(LATEST_POLICIES, {})
@@ -90,6 +110,7 @@ class PolicyMatcher(object):
removed_policies,
policy_filter_matches))
+
@staticmethod
def calc_coarse_patterns(audit, deployed_policies, deployed_policy_filters):
"""calculate the coarsed patterns on policy-names in policies and policy-filters"""
@@ -109,6 +130,7 @@ class PolicyMatcher(object):
coarse_regex.patterns)))
return coarse_regex_patterns
+
@staticmethod
def match_to_deployed_policies(audit, policies_updated, policies_removed):
"""match the policies_updated, policies_removed versus deployed policies"""
@@ -125,6 +147,7 @@ class PolicyMatcher(object):
return changed_policies, policies_removed, policy_filter_matches
+
@staticmethod
def _match_policies(audit, policies, deployed_policies, deployed_policy_filters):
"""
@@ -174,6 +197,7 @@ class PolicyMatcher(object):
return matching_policies, changed_policies, policy_filter_matches
+
@staticmethod
def _match_policy_to_filter(audit, policy_id, policy, policy_filter_id, policy_filter):
"""Match the policy to the policy-filter"""
@@ -218,7 +242,7 @@ class PolicyMatcher(object):
filter_config_name = policy_filter.get("configName")
policy_config_name = matching_conditions.get("ConfigName")
- if filter_onap_name and filter_config_name != policy_config_name:
+ if filter_config_name and filter_config_name != policy_config_name:
PolicyMatcher._logger.debug(
audit.debug("not match by configName: {} != {}: {}"
.format(policy_config_name, filter_config_name, log_line)))
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 3ae25fc..249c1f7 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -28,14 +28,16 @@ passes the notifications to policy-updater
import copy
import json
import logging
-import os
import ssl
import time
+import urllib.parse
+from datetime import datetime
from threading import Lock, Thread
import websocket
from .config import Config, Settings
+from .onap.audit import Audit
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
from .policy_utils import Utils
@@ -48,8 +50,17 @@ POLICY_MATCHES = 'matches'
class _PolicyReceiver(Thread):
"""web-socket to PolicyEngine"""
_logger = logging.getLogger("policy_handler.policy_receiver")
-
- def __init__(self):
+ WS_STARTED = "started"
+ WS_START_COUNT = "start_count"
+ WS_CLOSE_COUNT = "close_count"
+ WS_ERROR_COUNT = "error_count"
+ WS_PONG_COUNT = "pong_count"
+ WS_MESSAGE_COUNT = "message_count"
+ WS_MESSAGE_TIMESTAMP = "message_timestamp"
+ WS_STATUS = "status"
+ WS_PING_INTERVAL_DEFAULT = 180
+
+ def __init__(self, audit):
"""web-socket inside the thread to receive policy notifications from PolicyEngine"""
Thread.__init__(self, name="policy_receiver", daemon=True)
@@ -62,14 +73,26 @@ class _PolicyReceiver(Thread):
self._web_socket_sslopt = None
self._tls_wss_ca_mode = None
self._web_socket = None
- self.reconfigure()
-
- self._policy_updater = PolicyUpdater(self.reconfigure)
- self._policy_updater.start()
-
- def reconfigure(self):
+ self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT
+ self._web_socket_health = {
+ _PolicyReceiver.WS_START_COUNT: 0,
+ _PolicyReceiver.WS_CLOSE_COUNT: 0,
+ _PolicyReceiver.WS_ERROR_COUNT: 0,
+ _PolicyReceiver.WS_PONG_COUNT: 0,
+ _PolicyReceiver.WS_MESSAGE_COUNT: 0,
+ _PolicyReceiver.WS_STATUS: "created"
+ }
+
+ Audit.register_item_health("web_socket_health", self._get_health)
+ self._reconfigure(audit)
+
+ self._policy_updater = PolicyUpdater(self._reconfigure)
+
+ def _reconfigure(self, audit):
"""configure and reconfigure the web-socket"""
with self._lock:
+ _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format(
+ json.dumps(self._get_health(), sort_keys=True))))
self._sleep_before_restarting = 5
self._settings.set_config(Config.discovered_config)
changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
@@ -80,13 +103,19 @@ class _PolicyReceiver(Thread):
prev_web_socket_url = self._web_socket_url
prev_web_socket_sslopt = self._web_socket_sslopt
+ prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
self._web_socket_sslopt = None
- resturl = (config.get("url", "").lower()
- + config.get("path_notifications", "/pdp/notifications"))
+ resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/",
+ config.get("path_notifications", "/pdp/notifications"))
self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
+ self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS)
+ if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60:
+ self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT
+
if resturl.startswith("https:"):
self._web_socket_url = resturl.replace("https:", "wss:")
@@ -101,17 +130,19 @@ class _PolicyReceiver(Thread):
else:
self._web_socket_url = resturl.replace("http:", "ws:")
+ log_changed = (
+ "changed web_socket_url(%s) or tls_wss_ca_mode(%s)"
+ " or ws_ping_interval_in_secs(%s): %s" %
+ (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs,
+ self._settings))
if (self._web_socket_url == prev_web_socket_url
- and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)):
- _PolicyReceiver._logger.info(
- "not changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s",
- self._web_socket_url, self._tls_wss_ca_mode, self._settings)
+ and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)
+ and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs):
+ _PolicyReceiver._logger.info(audit.info("not {}".format(log_changed)))
self._settings.commit_change()
return False
- _PolicyReceiver._logger.info("changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s",
- self._web_socket_url, self._tls_wss_ca_mode,
- self._settings)
+ _PolicyReceiver._logger.info(audit.info(log_changed))
self._settings.commit_change()
self._stop_notifications()
@@ -119,6 +150,8 @@ class _PolicyReceiver(Thread):
def run(self):
"""listen on web-socket and pass the policy notifications to policy-updater"""
+ self._policy_updater.start()
+ _PolicyReceiver._logger.info("starting policy_receiver...")
websocket.enableTrace(True)
restarting = False
while True:
@@ -142,6 +175,7 @@ class _PolicyReceiver(Thread):
web_socket_url = self._web_socket_url
sslopt = copy.deepcopy(self._web_socket_sslopt)
tls_wss_ca_mode = self._tls_wss_ca_mode
+ ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
_PolicyReceiver._logger.info(
"connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)",
@@ -149,13 +183,15 @@ class _PolicyReceiver(Thread):
self._web_socket = websocket.WebSocketApp(
web_socket_url,
+ on_open=self._on_ws_open,
on_message=self._on_pdp_message,
on_close=self._on_ws_close,
- on_error=self._on_ws_error
+ on_error=self._on_ws_error,
+ on_pong=self._on_ws_pong
)
_PolicyReceiver._logger.info("waiting for policy-notifications...")
- self._web_socket.run_forever(sslopt=sslopt)
+ self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
restarting = True
_PolicyReceiver._logger.info("exit policy-receiver")
@@ -175,9 +211,13 @@ class _PolicyReceiver(Thread):
def _on_pdp_message(self, *args):
"""received the notification from PDP"""
+ self._web_socket_health[_PolicyReceiver.WS_MESSAGE_COUNT] += 1
+ self._web_socket_health[_PolicyReceiver.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow())
try:
message = args and args[-1]
_PolicyReceiver._logger.info("Received notification message: %s", message)
+ _PolicyReceiver._logger.info("web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
if not message:
return
message = json.loads(message)
@@ -216,9 +256,47 @@ class _PolicyReceiver(Thread):
_PolicyReceiver._logger.exception("policy-notification error %s", str(error))
self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5
+ self._web_socket_health[_PolicyReceiver.WS_STATUS] = "error"
+ self._web_socket_health[_PolicyReceiver.WS_ERROR_COUNT] += 1
+ self._web_socket_health["last_error"] = {
+ "error": str(error), "timestamp": str(datetime.utcnow())
+ }
+ _PolicyReceiver._logger.info("web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
def _on_ws_close(self, code, reason):
"""restart web-socket on close"""
- _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason)
+ self._web_socket_health["last_closed"] = str(datetime.utcnow())
+ self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed"
+ self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1
+ _PolicyReceiver._logger.info(
+ "lost connection(%s, %s) to PDP - restarting... web_socket_health %s",
+ code, reason, json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_open(self):
+ """started web-socket"""
+ self._web_socket_health[_PolicyReceiver.WS_STATUS] = _PolicyReceiver.WS_STARTED
+ self._web_socket_health[_PolicyReceiver.WS_START_COUNT] += 1
+ self._web_socket_health[_PolicyReceiver.WS_STARTED] = datetime.utcnow()
+ _PolicyReceiver._logger.info("opened connection to PDP web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_pong(self, pong):
+ """pong = response to pinging the server of the web-socket"""
+ self._web_socket_health[_PolicyReceiver.WS_PONG_COUNT] += 1
+ _PolicyReceiver._logger.info(
+ "pong(%s) from connection to PDP web_socket_health %s",
+ pong, json.dumps(self._get_health(), sort_keys=True))
+
+ def _get_health(self):
+ """returns the healthcheck of the web-socket as json"""
+ web_socket_health = copy.deepcopy(self._web_socket_health)
+ started = web_socket_health.get(_PolicyReceiver.WS_STARTED)
+ if started:
+ web_socket_health[_PolicyReceiver.WS_STARTED] = str(started)
+ web_socket_health["uptime"] = str(datetime.utcnow() - started)
+ return web_socket_health
+
def shutdown(self, audit):
"""Shutdown the policy-receiver"""
@@ -237,11 +315,20 @@ class _PolicyReceiver(Thread):
"""need to bring the latest policies to DCAE-Controller"""
self._policy_updater.catch_up(audit)
+ def is_running(self):
+ """check whether the policy-receiver and policy-updater are running"""
+ return self.is_alive() and self._policy_updater.is_alive()
+
class PolicyReceiver(object):
"""policy-receiver - static singleton wrapper"""
_policy_receiver = None
@staticmethod
+ def is_running():
+ """check whether the policy-receiver runs"""
+ return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running()
+
+ @staticmethod
def shutdown(audit):
"""Shutdown the notification-handler"""
PolicyReceiver._policy_receiver.shutdown(audit)
@@ -254,7 +341,7 @@ class PolicyReceiver(object):
@staticmethod
def run(audit):
"""Using policy-engine client to talk to policy engine"""
- PolicyReceiver._policy_receiver = _PolicyReceiver()
+ PolicyReceiver._policy_receiver = _PolicyReceiver(audit)
PolicyReceiver._policy_receiver.start()
PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 0713b38..85dd914 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -22,6 +22,7 @@ import copy
import json
import logging
import time
+import urllib.parse
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
@@ -51,6 +52,7 @@ class PolicyRest(object):
EXPECTED_VERSIONS = "expected_versions"
IGNORE_POLICY_NAMES = "ignore_policy_names"
+ DEFAULT_TIMEOUT_IN_SECS = 60
_lock = Lock()
_settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS,
@@ -65,6 +67,7 @@ class PolicyRest(object):
_thread_pool_size = 4
_policy_retry_count = 1
_policy_retry_sleep = 0
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
@staticmethod
def _init():
@@ -85,8 +88,9 @@ class PolicyRest(object):
'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
pool_maxsize=pool_size))
- PolicyRest._url_get_config = (config.get("url", "") + config.get("path_api", "")
- + PolicyRest.POLICY_GET_CONFIG)
+ get_config_path = urllib.parse.urljoin(
+ config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG)
+ PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path)
PolicyRest._headers = config.get("headers", {})
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
_, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
@@ -101,12 +105,16 @@ class PolicyRest(object):
tls_ca_mode = config.get(Config.TLS_CA_MODE)
PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+ PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
+ if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1:
+ PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS
- PolicyRest._logger.info("PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) custom_kwargs(%s): %s",
- PolicyRest._target_entity, PolicyRest._url_get_config,
- Metrics.json_dumps(PolicyRest._headers),
- tls_ca_mode, json.dumps(PolicyRest._custom_kwargs),
- PolicyRest._settings)
+ PolicyRest._logger.info(
+ "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s",
+ PolicyRest._target_entity, PolicyRest._url_get_config,
+ Metrics.json_dumps(PolicyRest._headers), tls_ca_mode,
+ PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs),
+ PolicyRest._settings)
PolicyRest._settings.commit_change()
PolicyRest._lazy_inited = True
@@ -144,6 +152,7 @@ class PolicyRest(object):
session = PolicyRest._requests_session
target_entity = PolicyRest._target_entity
url = PolicyRest._url_get_config
+ timeout_in_secs = PolicyRest._timeout_in_secs
headers = copy.deepcopy(PolicyRest._headers)
custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs)
@@ -152,15 +161,17 @@ class PolicyRest(object):
headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
log_action = "post to {} at {}".format(target_entity, url)
- log_data = "msg={} headers={}, custom_kwargs({})".format(
- json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs))
+ log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format(
+ json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs),
+ timeout_in_secs)
log_line = log_action + " " + log_data
PolicyRest._logger.info(metrics.metrics_start(log_line))
res = None
try:
- res = session.post(url, json=json_body, headers=headers, **custom_kwargs)
+ res = session.post(url, json=json_body, headers=headers, timeout=timeout_in_secs,
+ **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -195,12 +206,12 @@ class PolicyRest(object):
res_data = res.json()
if res_data and isinstance(res_data, list) and len(res_data) == 1:
- rslt = res_data[0]
- if rslt and not rslt.get(POLICY_NAME):
+ rslt = res_data[0] or {}
+ if not rslt.get(POLICY_NAME):
res_data = None
if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
- error_msg = "unexpected {0}".format(log_line)
+ error_msg = "{} unexpected {}".format(error_code, log_line)
PolicyRest._logger.error(error_msg)
metrics.set_http_status_code(error_code)
@@ -222,8 +233,8 @@ class PolicyRest(object):
if (rslt and not rslt.get(POLICY_NAME)
and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
- status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
- info_msg = "not found {0}".format(log_line)
+ status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value
+ info_msg = "{} not found {}".format(status_code, log_line)
PolicyRest._logger.info(info_msg)
metrics.set_http_status_code(status_code)
@@ -279,7 +290,8 @@ class PolicyRest(object):
expect_policy_removed = (ignore_policy_names and not expected_versions)
for retry in range(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug(str_metrics)
+ PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s",
+ retry, retry_get_config, str_metrics)
done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
audit, policy_id, expected_versions, ignore_policy_names,
@@ -289,16 +301,16 @@ class PolicyRest(object):
break
if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {} from PDP after #{} for policy_id={}"
- .format(PolicyRest._url_get_config, retry, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
+ PolicyRest._logger.error(
+ audit.error("gave up retrying after #{} for policy_id({}) from PDP {}"
+ .format(retry, policy_id, PolicyRest._url_get_config),
+ error_code=AuditResponseCode.DATA_ERROR))
break
- audit.warn(
- "retry #{} {} from PDP in {} secs for policy_id={}".format(
- retry, PolicyRest._url_get_config,
- PolicyRest._policy_retry_sleep, policy_id),
- error_code=AuditResponseCode.DATA_ERROR)
+ PolicyRest._logger.warning(audit.warn(
+ "will retry({}) for policy_id({}) in {} secs from PDP {}".format(
+ retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config),
+ error_code=AuditResponseCode.DATA_ERROR))
time.sleep(PolicyRest._policy_retry_sleep)
if (expect_policy_removed and not latest_policy
@@ -308,10 +320,10 @@ class PolicyRest(object):
audit.set_http_status_code(status_code)
if not PolicyRest._validate_policy(latest_policy):
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ PolicyRest._logger.error(audit.error(
"received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
- error_code=AuditResponseCode.DATA_ERROR)
+ error_code=AuditResponseCode.DATA_ERROR))
return latest_policy
@@ -331,9 +343,10 @@ class PolicyRest(object):
)
if not latest_policy and not expect_policy_removed:
- audit.error("received unexpected policy data from PDP for policy_id={}: {}"
- .format(policy_id, json.dumps(policy_bodies or [])),
- error_code=AuditResponseCode.DATA_ERROR)
+ PolicyRest._logger.error(
+ audit.error("received unexpected policy data from PDP for policy_id={}: {}"
+ .format(policy_id, json.dumps(policy_bodies or [])),
+ error_code=AuditResponseCode.DATA_ERROR))
done = bool(latest_policy
or (expect_policy_removed and not policy_bodies)
@@ -411,6 +424,9 @@ class PolicyRest(object):
policies = None
apns_length = len(apns)
+ PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length,
+ json.dumps(policies_to_find))
+
if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
else:
@@ -419,8 +435,9 @@ class PolicyRest(object):
pool.close()
pool.join()
- metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)))
+ metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}"
+ .format(apns_length, str_metrics,
+ len(policies), json.dumps(policies)))
updated_policies = dict((policy[POLICY_ID], policy)
for policy in policies
@@ -438,12 +455,12 @@ class PolicyRest(object):
and policy_id not in removed_policies)
PolicyRest._logger.debug(
- "result updated_policies %s, removed_policies %s, errored_policies %s",
- json.dumps(updated_policies), json.dumps(removed_policies),
+ "result(%s) updated_policies %s, removed_policies %s, errored_policies %s",
+ apns_length, json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(errored_policies))
if errored_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
audit.error(
"errored_policies in PDP: {}".format(json.dumps(errored_policies)),
error_code=AuditResponseCode.DATA_ERROR)
@@ -460,6 +477,7 @@ class PolicyRest(object):
PolicyRest._logger.debug("%s", str_policy_filter)
status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
+ audit.set_http_status_code(status_code)
PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code,
str_policy_filter, json.dumps(policy_bodies or []))
@@ -467,14 +485,13 @@ class PolicyRest(object):
latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
if not latest_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.warn(
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ PolicyRest._logger.warning(audit.warn(
"received no policies from PDP for policy_filter {}: {}"
.format(str_policy_filter, json.dumps(policy_bodies or [])),
- error_code=AuditResponseCode.DATA_ERROR)
+ error_code=AuditResponseCode.DATA_ERROR))
return None, latest_policies
- audit.set_http_status_code(status_code)
valid_policies = {}
errored_policies = {}
for (policy_id, policy) in latest_policies.items():
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 235e2b6..fb6c8b6 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -106,9 +106,9 @@ class _PolicyUpdate(object):
self._audit.req_message = req_message
self._logger.info(
- "pending request_id %s for %s policies_updated %s policies_removed %s",
+ "pending(%s) for %s policies_updated %s policies_removed %s",
self._audit.request_id, req_message,
- json.dumps(policies_updated), json.dumps(policies_removed))
+ json.dumps(self._policies_updated), json.dumps(self._policies_removed))
class PolicyUpdater(Thread):
@@ -178,13 +178,14 @@ class PolicyUpdater(Thread):
if not self._aud_reconfigure:
self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE)
PolicyUpdater._logger.info(
- "reconfigure %s request_id %s",
+ "%s request_id %s",
self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
)
self._run.set()
def run(self):
"""wait and run the policy-update in thread"""
+ PolicyUpdater._logger.info("starting policy_updater...")
self._run_reconfigure_timer()
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
@@ -307,7 +308,7 @@ class PolicyUpdater(Thread):
if DeployHandler.reconfigure(aud_reconfigure):
reconfigure_result += " " + Config.DEPLOY_HANDLER
- if self._reconfigure_receiver():
+ if self._reconfigure_receiver(aud_reconfigure):
reconfigure_result += " web-socket"
reconfigure_result += " -- change: {}".format(Config.discovered_config)
@@ -348,12 +349,24 @@ class PolicyUpdater(Thread):
)
catch_up_result = ""
try:
+ not_found_ok = None
PolicyUpdater._logger.info(log_line)
self._pause_catch_up_timer()
- _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up)
+ _, policies, policy_filters = PolicyMatcher.get_deployed_policies(aud_catch_up)
- if not catch_up_message or not aud_catch_up.is_success_or_not_found():
+ catch_up_message = None
+ if aud_catch_up.is_not_found():
+ not_found_ok = True
+ else:
+ _, catch_up_message = PolicyMatcher.build_catch_up_message(
+ aud_catch_up, policies, policy_filters)
+
+ if not_found_ok:
+ catch_up_result = ("- not sending catch-up "
+ "- no deployed policies or policy-filters")
+ PolicyUpdater._logger.warning(catch_up_result)
+ elif not (catch_up_message and aud_catch_up.is_success()):
catch_up_result = "- not sending catch-up to deployment-handler due to errors"
PolicyUpdater._logger.warning(catch_up_result)
elif catch_up_message.empty():
@@ -402,11 +415,14 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.info(log_line)
try:
+ not_found_ok = None
(updated_policies, removed_policies,
policy_filter_matches) = PolicyMatcher.match_to_deployed_policies(
audit, policies_updated, policies_removed)
- if updated_policies or removed_policies:
+ if audit.is_not_found():
+ not_found_ok = True
+ elif updated_policies or removed_policies:
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
(audit,
[(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
@@ -415,7 +431,11 @@ class PolicyUpdater(Thread):
for policy_id, policy in removed_policies.items()]
))
- if not audit.is_success_or_not_found():
+ if not_found_ok:
+ result = ("- not sending policy-updates to deployment-handler "
+ "- no deployed policies or policy-filters")
+ PolicyUpdater._logger.warning(result)
+ elif not audit.is_success():
result = "- not sending policy-updates to deployment-handler due to errors"
PolicyUpdater._logger.warning(result)
elif not updated_policies and not removed_policies:
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 24db468..73e7fbc 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -26,7 +26,7 @@ import cherrypy
from .config import Config
from .deploy_handler import PolicyUpdateMessage
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode
from .policy_matcher import PolicyMatcher
from .policy_receiver import PolicyReceiver
from .policy_rest import PolicyRest
@@ -34,21 +34,35 @@ from .policy_rest import PolicyRest
class PolicyWeb(object):
"""run http API of policy-handler on 0.0.0.0:wservice_port - any incoming address"""
+ DATA_NOT_FOUND_ERROR = 404
HOST_INADDR_ANY = ".".join("0"*4)
logger = logging.getLogger("policy_handler.policy_web")
@staticmethod
def run_forever(audit):
"""run the web-server of the policy-handler forever"""
- PolicyWeb.logger.info("policy_handler web-server on port(%d)...", Config.wservice_port)
cherrypy.config.update({"server.socket_host": PolicyWeb.HOST_INADDR_ANY,
"server.socket_port": Config.wservice_port})
+
+ protocol = "http"
+ tls_info = ""
+ # if Config.tls_server_cert_file and Config.tls_private_key_file:
+ # cherrypy.server.ssl_module = 'builtin'
+ # cherrypy.server.ssl_certificate = Config.tls_server_cert_file
+ # cherrypy.server.ssl_private_key = Config.tls_private_key_file
+ # if Config.tls_server_ca_chain_file:
+ # cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file
+ # protocol = "https"
+ # tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file,
+ # Config.tls_private_key_file,
+ # Config.tls_server_ca_chain_file)
+
cherrypy.tree.mount(_PolicyWeb(), '/')
- audit.info("running policy_handler web-server as {0}:{1}".format(
- cherrypy.server.socket_host, cherrypy.server.socket_port))
- PolicyWeb.logger.info("running policy_handler web-server as %s:%d with config: %s",
- cherrypy.server.socket_host, cherrypy.server.socket_port,
- json.dumps(cherrypy.config))
+
+ PolicyWeb.logger.info(
+ "%s with config: %s", audit.info("running policy_handler as {}://{}:{} {}".format(
+ protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)),
+ json.dumps(cherrypy.config))
cherrypy.engine.start()
class _PolicyWeb(object):
@@ -67,17 +81,18 @@ class _PolicyWeb(object):
req_info = _PolicyWeb._get_request_info(cherrypy.request)
audit = Audit(job_name="get_latest_policy",
req_message=req_info, headers=cherrypy.request.headers)
- PolicyWeb.logger.info("%s policy_id=%s headers=%s", \
- req_info, policy_id, json.dumps(cherrypy.request.headers))
+ PolicyWeb.logger.info("%s policy_id=%s headers=%s",
+ req_info, policy_id, json.dumps(cherrypy.request.headers))
latest_policy = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {}
PolicyWeb.logger.info("res %s policy_id=%s latest_policy=%s",
req_info, policy_id, json.dumps(latest_policy))
- success, http_status_code, _ = audit.audit_done(result=json.dumps(latest_policy))
- if not success:
- cherrypy.response.status = http_status_code
+ _, http_status_code, _ = audit.audit_done(result=json.dumps(latest_policy))
+ if http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value:
+ http_status_code = PolicyWeb.DATA_NOT_FOUND_ERROR
+ cherrypy.response.status = http_status_code
return latest_policy
@@ -89,15 +104,20 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- result, policy_update = PolicyMatcher.get_latest_policies(audit)
- if policy_update and isinstance(policy_update, PolicyUpdateMessage):
- result["policy_update"] = policy_update.get_message()
+ result, policies, policy_filters = PolicyMatcher.get_deployed_policies(audit)
+ if not result:
+ result, policy_update = PolicyMatcher.build_catch_up_message(
+ audit, policies, policy_filters)
+ if policy_update and isinstance(policy_update, PolicyUpdateMessage):
+ result["policy_update"] = policy_update.get_message()
- PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result))
+ result_str = json.dumps(result, sort_keys=True)
+ PolicyWeb.logger.info("result %s: %s", req_info, result_str)
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
- cherrypy.response.status = http_status_code
+ _, http_status_code, _ = audit.audit_done(result=result_str)
+ if http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value:
+ http_status_code = PolicyWeb.DATA_NOT_FOUND_ERROR
+ cherrypy.response.status = http_status_code
return result
@@ -159,19 +179,21 @@ class _PolicyWeb(object):
req_info = _PolicyWeb._get_request_info(cherrypy.request)
audit = Audit(job_name="get_latest_policies",
- req_message="{0}: {1}".format(req_info, str_policy_filter), \
- headers=cherrypy.request.headers)
- PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \
- req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
+ req_message="{0}: {1}".format(req_info, str_policy_filter),
+ headers=cherrypy.request.headers)
+ PolicyWeb.logger.info("%s: policy_filter=%s headers=%s",
+ req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
+ result_str = json.dumps(result, sort_keys=True)
- PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \
- req_info, str_policy_filter, json.dumps(result))
+ PolicyWeb.logger.info("result %s: policy_filter=%s result=%s",
+ req_info, str_policy_filter, result_str)
- success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
- if not success:
- cherrypy.response.status = http_status_code
+ _, http_status_code, _ = audit.audit_done(result=result_str)
+ if http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value:
+ http_status_code = PolicyWeb.DATA_NOT_FOUND_ERROR
+ cherrypy.response.status = http_status_code
return result