diff options
author | Alex Shatov <alexs@att.com> | 2018-12-05 15:23:50 -0500 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-12-05 15:23:50 -0500 |
commit | a39f4e82cef0414f510cf20e25864ac04cc8f055 (patch) | |
tree | 759f5a9f51c368456e8d8f3be65dac1951de0bd9 /policyhandler | |
parent | 61fe1b8aa4f6e3c2388cbc573f655f60b4c1b5f3 (diff) |
4.5.0 policy-handler - multi change
DCAEGEN2-853:
- stop reporting the absence of policies or updates
as error - this is an expected result == INFO or WARNING
DCAEGEN2-903: preparation for TLS on the web-server of policy-handler
DCAEGEN2-930:
- configurable timeouts for http requests from policy-handler
- added configurable pinging on the web-socket to PDP
- added healthcheck info on the web-socket
- upgraded websocket-client lib to 0.53.0
DCAEGEN2-1017: fixed a bug on policy-filter matching
by filter_config_name
- refactored and enhanced the unit-tests
Change-Id: I111ddc57bb978554ef376cbf916965b6667dad9b
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-853
Issue-ID: DCAEGEN2-903
Issue-ID: DCAEGEN2-930
Issue-ID: DCAEGEN2-1017
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/config.py | 59 | ||||
-rw-r--r-- | policyhandler/deploy_handler.py | 27 | ||||
-rw-r--r-- | policyhandler/discovery.py | 5 | ||||
-rw-r--r-- | policyhandler/onap/audit.py | 81 | ||||
-rw-r--r-- | policyhandler/policy_matcher.py | 40 | ||||
-rw-r--r-- | policyhandler/policy_receiver.py | 131 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 95 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 36 | ||||
-rw-r--r-- | policyhandler/web_server.py | 78 |
9 files changed, 376 insertions, 176 deletions
diff --git a/policyhandler/config.py b/policyhandler/config.py index d94ed79..5184f7f 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -148,24 +148,43 @@ class Config(object): TLS_CA_MODE = "tls_ca_mode" TLS_WSS_CA_MODE = "tls_wss_ca_mode" TLS_CA_MODE_DO_NOT_VERIFY = "do_not_verify" + TIMEOUT_IN_SECS = "timeout_in_secs" + CONSUL_TIMEOUT_IN_SECS = "consul_timeout_in_secs" + WS_PING_INTERVAL_IN_SECS = "ws_ping_interval_in_secs" + DEFAULT_TIMEOUT_IN_SECS = 60 system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 consul_url = "http://consul:8500" + consul_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS tls_cacert_file = None tls_server_cert_file = None tls_private_key_file = None + tls_server_ca_chain_file = None _local_config = Settings() discovered_config = Settings() @staticmethod + def _get_tls_file_path(tls_config, cert_directory, tls_name): + """calc file path and verify its existance""" + file_name = tls_config.get(tls_name) + if not file_name: + return None + tls_file_path = os.path.join(cert_directory, file_name) + if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK): + Config._logger.error("invalid %s: %s", tls_name, tls_file_path) + return None + return tls_file_path + + @staticmethod def _set_tls_config(tls_config): """verify and set tls certs in config""" try: Config.tls_cacert_file = None Config.tls_server_cert_file = None Config.tls_private_key_file = None + Config.tls_server_ca_chain_file = None if not (tls_config and isinstance(tls_config, dict)): Config._logger.info("no tls in config: %s", json.dumps(tls_config)) @@ -174,43 +193,28 @@ class Config(object): cert_directory = tls_config.get("cert_directory") if not (cert_directory and isinstance(cert_directory, str)): - Config._logger.info("unexpected tls.cert_directory: %r", cert_directory) + Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory) return cert_directory = os.path.join( os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory) if not (cert_directory and os.path.isdir(cert_directory)): - Config._logger.info("ignoring invalid cert_directory: %s", cert_directory) + Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory) return - cacert = tls_config.get("cacert") - if cacert: - tls_cacert_file = os.path.join(cert_directory, cacert) - if not os.path.isfile(tls_cacert_file): - Config._logger.error("invalid tls_cacert_file: %s", tls_cacert_file) - else: - Config.tls_cacert_file = tls_cacert_file - - server_cert = tls_config.get("server_cert") - if server_cert: - tls_server_cert_file = os.path.join(cert_directory, server_cert) - if not os.path.isfile(tls_server_cert_file): - Config._logger.error("invalid tls_server_cert_file: %s", tls_server_cert_file) - else: - Config.tls_server_cert_file = tls_server_cert_file - - private_key = tls_config.get("private_key") - if private_key: - tls_private_key_file = os.path.join(cert_directory, private_key) - if not os.path.isfile(tls_private_key_file): - Config._logger.error("invalid tls_private_key_file: %s", tls_private_key_file) - else: - Config.tls_private_key_file = tls_private_key_file + Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert") + Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory, + "server_cert") + Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory, + "private_key") + Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory, + "server_ca_chain") finally: Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file) Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file) Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file) + Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) @staticmethod def init_config(file_path=None): @@ -239,6 +243,9 @@ class Config(object): Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) Config.consul_url = os.environ.get( "CONSUL_URL", loaded_config.get(Config.FIELD_CONSUL_URL, Config.consul_url)).rstrip("/") + Config.consul_timeout_in_secs = loaded_config.get(Config.CONSUL_TIMEOUT_IN_SECS) + if not Config.consul_timeout_in_secs or Config.consul_timeout_in_secs < 1: + Config.consul_timeout_in_secs = Config.DEFAULT_TIMEOUT_IN_SECS local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {}) Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) @@ -250,7 +257,7 @@ class Config(object): @staticmethod def discover(audit): - """bring and merge the config settings from the discovery service""" + """bring the config settings from the discovery service""" discovery_key = Config.system_name from .discovery import DiscoveryClient new_config = DiscoveryClient.get_value(audit, discovery_key) diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index b4b2468..0ffacba 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -145,6 +145,7 @@ class DeployHandler(object): """calling the deployment-handler web apis""" _logger = logging.getLogger("policy_handler.deploy_handler") DEFAULT_TARGET_ENTITY = "deployment_handler" + DEFAULT_TIMEOUT_IN_SECS = 60 _lazy_inited = False _lock = Lock() @@ -158,6 +159,7 @@ class DeployHandler(object): _target_entity = None _custom_kwargs = {} _server_instance_uuid = None + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS server_instance_changed = False @staticmethod @@ -188,7 +190,8 @@ class DeployHandler(object): # "query" : { # "cfy_tenant_name" : "default_tenant" # }, - # "tls_ca_mode" : "cert_directory" + # "tls_ca_mode" : "cert_directory", + # "timeout_in_secs": 60 # } DeployHandler._target_entity = config_dh.get(TARGET_ENTITY, DeployHandler.DEFAULT_TARGET_ENTITY) @@ -204,6 +207,10 @@ class DeployHandler(object): DeployHandler._target_entity, DeployHandler._url, tls_ca_mode, json.dumps(DeployHandler._custom_kwargs)) + DeployHandler._timeout_in_secs = config_dh.get(Config.TIMEOUT_IN_SECS) + if not DeployHandler._timeout_in_secs or DeployHandler._timeout_in_secs < 1: + DeployHandler._timeout_in_secs = DeployHandler.DEFAULT_TIMEOUT_IN_SECS + if not DeployHandler._url: # discover routing to deployment-handler at consul-services if not isinstance(config_dh, dict): @@ -290,6 +297,7 @@ class DeployHandler(object): target_entity = DeployHandler._target_entity url = DeployHandler._url_policy params = deepcopy(DeployHandler._query) + timeout_in_secs = DeployHandler._timeout_in_secs custom_kwargs = deepcopy(DeployHandler._custom_kwargs) metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity), @@ -297,9 +305,9 @@ class DeployHandler(object): headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} log_action = "put to {} at {}".format(target_entity, url) - log_data = "msg={} headers={}, params={} custom_kwargs({})".format( + log_data = "msg={} headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format( json.dumps(message), json.dumps(headers), - json.dumps(params), json.dumps(custom_kwargs)) + json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs)) log_line = log_action + " " + log_data DeployHandler._logger.info(log_line) @@ -315,7 +323,8 @@ class DeployHandler(object): res = None try: - res = session.put(url, json=message, headers=headers, params=params, **custom_kwargs) + res = session.put(url, json=message, headers=headers, params=params, + timeout=timeout_in_secs, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -357,6 +366,7 @@ class DeployHandler(object): target_entity = DeployHandler._target_entity url = DeployHandler._url_policy params = deepcopy(DeployHandler._query) + timeout_in_secs = DeployHandler._timeout_in_secs custom_kwargs = deepcopy(DeployHandler._custom_kwargs) metrics = Metrics(aud_parent=audit, @@ -365,8 +375,8 @@ class DeployHandler(object): headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} log_action = "get from {} at {}".format(target_entity, url) - log_data = "headers={}, params={} custom_kwargs({})".format( - json.dumps(headers), json.dumps(params), json.dumps(custom_kwargs)) + log_data = "headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format( + json.dumps(headers), json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs)) log_line = log_action + " " + log_data DeployHandler._logger.info(log_line) @@ -382,7 +392,8 @@ class DeployHandler(object): res = None try: - res = session.get(url, headers=headers, params=params, **custom_kwargs) + res = session.get(url, headers=headers, params=params, timeout=timeout_in_secs, + **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -412,7 +423,7 @@ class DeployHandler(object): policies = result.get(POLICIES, {}) policy_filters = result.get(POLICY_FILTERS, {}) if not policies and not policy_filters: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) DeployHandler._logger.warning(audit.warn( "found no deployed policies or policy-filters: {}".format(log_line), error_code=AuditResponseCode.DATA_ERROR)) diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index 5a35525..4c5b64e 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -56,7 +56,7 @@ class DiscoveryClient(object): @staticmethod def _discover_service(audit, service_name, service_path): """find the service record in consul""" - response = requests.get(service_path) + response = requests.get(service_path, timeout=Config.consul_timeout_in_secs) DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format( response.status_code, service_path, response.text))) @@ -113,7 +113,7 @@ class DiscoveryClient(object): @staticmethod def _get_value_from_kv(url): """get the value from consul-kv at discovery url""" - response = requests.get(url) + response = requests.get(url, timeout=Config.consul_timeout_in_secs) response.raise_for_status() data = response.json() value = base64.b64decode(data[0]["Value"]).decode("utf-8") @@ -129,6 +129,7 @@ class DiscoveryClient(object): log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url) DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + status_code = None try: status_code, value = DiscoveryClient._get_value_from_kv(discovery_url) except Exception as ex: diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index 1bee4a7..d63d0b2 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -64,10 +64,10 @@ ERROR_DESCRIPTION = "errorDescription" class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 + DATA_NOT_FOUND_OK = 204 PERMISSION_UNAUTHORIZED_ERROR = 401 PERMISSION_FORBIDDEN_ERROR = 403 RESPONSE_ERROR = 400 - DATA_NOT_FOUND_ERROR = 404 SERVER_INTERNAL_ERROR = 500 SERVICE_UNAVAILABLE_ERROR = 503 DATA_ERROR = 1030 @@ -88,7 +88,7 @@ class AuditResponseCode(Enum): def get_response_code(http_status_code): """calculates the response_code from max_http_status_code""" response_code = AuditResponseCode.UNKNOWN_ERROR - if http_status_code <= AuditHttpCode.HTTP_OK.value: + if http_status_code <= AuditHttpCode.DATA_NOT_FOUND_OK.value: response_code = AuditResponseCode.SUCCESS elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, @@ -99,8 +99,7 @@ class AuditResponseCode(Enum): elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR elif http_status_code in [AuditHttpCode.DATA_ERROR.value, - AuditHttpCode.RESPONSE_ERROR.value, - AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + AuditHttpCode.RESPONSE_ERROR.value]: response_code = AuditResponseCode.DATA_ERROR elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: response_code = AuditResponseCode.SCHEMA_ERROR @@ -138,6 +137,7 @@ class _Audit(object): _hostname = os.environ.get(HOSTNAME) _health = Health() + _health_checkers = {} _py_ver = sys.version.replace("\n", "") _packages = [] @@ -167,6 +167,31 @@ class _Audit(object): pass + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): + """create audit object per each request in the system + + :job_name: is the name of the audit job for health stats + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :kwargs: - put any request related params into kwargs + """ + self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) + self.request_id = request_id + self.req_message = req_message or "" + self.kwargs = kwargs or {} + + self.max_http_status_code = 0 + self._lock = threading.Lock() + + + @staticmethod + def register_item_health(health_name, health_getter): + """ + register the health-checker for the additional item + by its health_name and the function health_getter that returns its health status as json + """ + _Audit._health_checkers[health_name] = health_getter + def health(self, full=False): """returns json for health check""" utcnow = datetime.utcnow() @@ -186,33 +211,21 @@ class _Audit(object): "process_memory" : ProcessInfo.process_memory() }, "stats" : _Audit._health.dump(), + "items" : dict((health_name, health_getter()) + for health_name, health_getter in _Audit._health_checkers.items()), "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} } - self.info("{} health: {}".format(_Audit._service_name, json.dumps(health))) + self.info("{} health: {}".format(_Audit._service_name, + json.dumps(health, sort_keys=True))) return health + def process_info(self): """get the debug info on all the threads and memory""" process_info = ProcessInfo.get_all() self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info))) return process_info - def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): - """create audit object per each request in the system - - :job_name: is the name of the audit job for health stats - :request_id: is the X-ECOMP-RequestID for tracing - :req_message: is the request message string for logging - :kwargs: - put any request related params into kwargs - """ - self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) - self.request_id = request_id - self.req_message = req_message or "" - self.kwargs = kwargs or {} - - self.max_http_status_code = 0 - self._lock = threading.Lock() - def merge_all_kwargs(self, **kwargs): """returns the merge of copy of self.kwargs with the param kwargs""" @@ -230,7 +243,7 @@ class _Audit(object): def reset_http_status_not_found(self): """resets the highest(worst) http status code if data not found""" with self._lock: - if self.max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value: + if self.max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value: self.max_http_status_code = 0 def get_max_http_status_code(self): @@ -252,25 +265,23 @@ class _Audit(object): == AuditResponseCode.get_response_code(status_code).value or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value) - def _get_response_status(self, not_found_ok=None): + def _get_response_status(self): """calculates the response status fields from max_http_status_code""" max_http_status_code = self.get_max_http_status_code() response_code = AuditResponseCode.get_response_code(max_http_status_code) - success = ((response_code.value == AuditResponseCode.SUCCESS.value) - or (not_found_ok - and max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value)) + success = (response_code.value == AuditResponseCode.SUCCESS.value) response_description = AuditResponseCode.get_human_text(response_code) return success, max_http_status_code, response_code, response_description def is_success(self): - """returns whether the response_code is success""" + """returns whether the response_code is success or 204 - not found""" success, _, _, _ = self._get_response_status() return success - def is_success_or_not_found(self): - """returns whether the response_code is success or 404 - not found""" - success, _, _, _ = self._get_response_status(not_found_ok=True) - return success + def is_not_found(self): + """returns whether the response_code is 204 - not found""" + max_http_status_code = self.get_max_http_status_code() + return max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value def debug(self, log_line, **kwargs): """debug - the debug=lowest level of logging""" @@ -397,8 +408,8 @@ class Audit(_Audit): def audit_done(self, result=None, **kwargs): """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - success, max_http_status_code, response_code, response_description = \ - self._get_response_status() + (success, max_http_status_code, + response_code, response_description) = self._get_response_status() log_line = "{0} {1}".format(self.req_message, result or "").strip() audit_func = None timer = _Audit.get_elapsed_time(self._started) @@ -461,8 +472,8 @@ class Metrics(_Audit): def metrics(self, log_line, **kwargs): """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - success, max_http_status_code, response_code, response_description = \ - self._get_response_status() + (success, max_http_status_code, + response_code, response_description) = self._get_response_status() metrics_func = None timer = _Audit.get_elapsed_time(self._metrics_started) if success: diff --git a/policyhandler/policy_matcher.py b/policyhandler/policy_matcher.py index 71b5ce8..d0786ba 100644 --- a/policyhandler/policy_matcher.py +++ b/policyhandler/policy_matcher.py @@ -23,6 +23,7 @@ import logging import re from .deploy_handler import DeployHandler, PolicyUpdateMessage +from .onap.audit import AuditHttpCode, AuditResponseCode from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER, POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS) @@ -36,15 +37,32 @@ class PolicyMatcher(object): PENDING_UPDATE = "pending_update" @staticmethod - def get_latest_policies(audit): - """ - find the latest policies from policy-engine for the deployed policies and policy-filters - """ + def get_deployed_policies(audit): + """get the deployed policies and policy-filters""" deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) + if audit.is_not_found(): + warning_txt = "got no deployed policies or policy-filters" + PolicyMatcher._logger.warning(warning_txt) + return {"warning": warning_txt}, None, None + if not audit.is_success() or (not deployed_policies and not deployed_policy_filters): error_txt = "failed to retrieve policies from deployment-handler" PolicyMatcher._logger.error(error_txt) + return {"error": error_txt}, None, None + + return None, deployed_policies, deployed_policy_filters + + + @staticmethod + def build_catch_up_message(audit, deployed_policies, deployed_policy_filters): + """ + find the latest policies from policy-engine for the deployed policies and policy-filters + """ + + if not (deployed_policies or deployed_policy_filters): + error_txt = "no deployed policies or policy-filters" + PolicyMatcher._logger.warning(error_txt) return {"error": error_txt}, None coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns( @@ -54,7 +72,9 @@ class PolicyMatcher(object): error_txt = ("failed to construct the coarse_regex_patterns from " + "deployed_policies: {} and deployed_policy_filters: {}" .format(deployed_policies, deployed_policy_filters)) - PolicyMatcher._logger.error(error_txt) + PolicyMatcher._logger.error(audit.error( + error_txt, error_code=AuditResponseCode.DATA_ERROR)) + audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) return {"error": error_txt}, None pdp_response = PolicyRest.get_latest_policies( @@ -62,9 +82,9 @@ class PolicyMatcher(object): for policy_name_pattern in coarse_regex_patterns] ) - if not audit.is_success_or_not_found(): + if not audit.is_success(): error_txt = "failed to retrieve policies from policy-engine" - PolicyMatcher._logger.error(error_txt) + PolicyMatcher._logger.warning(error_txt) return {"error": error_txt}, None latest_policies = pdp_response.get(LATEST_POLICIES, {}) @@ -90,6 +110,7 @@ class PolicyMatcher(object): removed_policies, policy_filter_matches)) + @staticmethod def calc_coarse_patterns(audit, deployed_policies, deployed_policy_filters): """calculate the coarsed patterns on policy-names in policies and policy-filters""" @@ -109,6 +130,7 @@ class PolicyMatcher(object): coarse_regex.patterns))) return coarse_regex_patterns + @staticmethod def match_to_deployed_policies(audit, policies_updated, policies_removed): """match the policies_updated, policies_removed versus deployed policies""" @@ -125,6 +147,7 @@ class PolicyMatcher(object): return changed_policies, policies_removed, policy_filter_matches + @staticmethod def _match_policies(audit, policies, deployed_policies, deployed_policy_filters): """ @@ -174,6 +197,7 @@ class PolicyMatcher(object): return matching_policies, changed_policies, policy_filter_matches + @staticmethod def _match_policy_to_filter(audit, policy_id, policy, policy_filter_id, policy_filter): """Match the policy to the policy-filter""" @@ -218,7 +242,7 @@ class PolicyMatcher(object): filter_config_name = policy_filter.get("configName") policy_config_name = matching_conditions.get("ConfigName") - if filter_onap_name and filter_config_name != policy_config_name: + if filter_config_name and filter_config_name != policy_config_name: PolicyMatcher._logger.debug( audit.debug("not match by configName: {} != {}: {}" .format(policy_config_name, filter_config_name, log_line))) diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 3ae25fc..249c1f7 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -28,14 +28,16 @@ passes the notifications to policy-updater import copy import json import logging -import os import ssl import time +import urllib.parse +from datetime import datetime from threading import Lock, Thread import websocket from .config import Config, Settings +from .onap.audit import Audit from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater from .policy_utils import Utils @@ -48,8 +50,17 @@ POLICY_MATCHES = 'matches' class _PolicyReceiver(Thread): """web-socket to PolicyEngine""" _logger = logging.getLogger("policy_handler.policy_receiver") - - def __init__(self): + WS_STARTED = "started" + WS_START_COUNT = "start_count" + WS_CLOSE_COUNT = "close_count" + WS_ERROR_COUNT = "error_count" + WS_PONG_COUNT = "pong_count" + WS_MESSAGE_COUNT = "message_count" + WS_MESSAGE_TIMESTAMP = "message_timestamp" + WS_STATUS = "status" + WS_PING_INTERVAL_DEFAULT = 180 + + def __init__(self, audit): """web-socket inside the thread to receive policy notifications from PolicyEngine""" Thread.__init__(self, name="policy_receiver", daemon=True) @@ -62,14 +73,26 @@ class _PolicyReceiver(Thread): self._web_socket_sslopt = None self._tls_wss_ca_mode = None self._web_socket = None - self.reconfigure() - - self._policy_updater = PolicyUpdater(self.reconfigure) - self._policy_updater.start() - - def reconfigure(self): + self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT + self._web_socket_health = { + _PolicyReceiver.WS_START_COUNT: 0, + _PolicyReceiver.WS_CLOSE_COUNT: 0, + _PolicyReceiver.WS_ERROR_COUNT: 0, + _PolicyReceiver.WS_PONG_COUNT: 0, + _PolicyReceiver.WS_MESSAGE_COUNT: 0, + _PolicyReceiver.WS_STATUS: "created" + } + + Audit.register_item_health("web_socket_health", self._get_health) + self._reconfigure(audit) + + self._policy_updater = PolicyUpdater(self._reconfigure) + + def _reconfigure(self, audit): """configure and reconfigure the web-socket""" with self._lock: + _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format( + json.dumps(self._get_health(), sort_keys=True)))) self._sleep_before_restarting = 5 self._settings.set_config(Config.discovered_config) changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) @@ -80,13 +103,19 @@ class _PolicyReceiver(Thread): prev_web_socket_url = self._web_socket_url prev_web_socket_sslopt = self._web_socket_sslopt + prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs + self._web_socket_sslopt = None - resturl = (config.get("url", "").lower() - + config.get("path_notifications", "/pdp/notifications")) + resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/", + config.get("path_notifications", "/pdp/notifications")) self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE) + self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS) + if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60: + self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT + if resturl.startswith("https:"): self._web_socket_url = resturl.replace("https:", "wss:") @@ -101,17 +130,19 @@ class _PolicyReceiver(Thread): else: self._web_socket_url = resturl.replace("http:", "ws:") + log_changed = ( + "changed web_socket_url(%s) or tls_wss_ca_mode(%s)" + " or ws_ping_interval_in_secs(%s): %s" % + (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs, + self._settings)) if (self._web_socket_url == prev_web_socket_url - and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)): - _PolicyReceiver._logger.info( - "not changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s", - self._web_socket_url, self._tls_wss_ca_mode, self._settings) + and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt) + and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs): + _PolicyReceiver._logger.info(audit.info("not {}".format(log_changed))) self._settings.commit_change() return False - _PolicyReceiver._logger.info("changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s", - self._web_socket_url, self._tls_wss_ca_mode, - self._settings) + _PolicyReceiver._logger.info(audit.info(log_changed)) self._settings.commit_change() self._stop_notifications() @@ -119,6 +150,8 @@ class _PolicyReceiver(Thread): def run(self): """listen on web-socket and pass the policy notifications to policy-updater""" + self._policy_updater.start() + _PolicyReceiver._logger.info("starting policy_receiver...") websocket.enableTrace(True) restarting = False while True: @@ -142,6 +175,7 @@ class _PolicyReceiver(Thread): web_socket_url = self._web_socket_url sslopt = copy.deepcopy(self._web_socket_sslopt) tls_wss_ca_mode = self._tls_wss_ca_mode + ws_ping_interval_in_secs = self._ws_ping_interval_in_secs _PolicyReceiver._logger.info( "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)", @@ -149,13 +183,15 @@ class _PolicyReceiver(Thread): self._web_socket = websocket.WebSocketApp( web_socket_url, + on_open=self._on_ws_open, on_message=self._on_pdp_message, on_close=self._on_ws_close, - on_error=self._on_ws_error + on_error=self._on_ws_error, + on_pong=self._on_ws_pong ) _PolicyReceiver._logger.info("waiting for policy-notifications...") - self._web_socket.run_forever(sslopt=sslopt) + self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) restarting = True _PolicyReceiver._logger.info("exit policy-receiver") @@ -175,9 +211,13 @@ class _PolicyReceiver(Thread): def _on_pdp_message(self, *args): """received the notification from PDP""" + self._web_socket_health[_PolicyReceiver.WS_MESSAGE_COUNT] += 1 + self._web_socket_health[_PolicyReceiver.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow()) try: message = args and args[-1] _PolicyReceiver._logger.info("Received notification message: %s", message) + _PolicyReceiver._logger.info("web_socket_health %s", + json.dumps(self._get_health(), sort_keys=True)) if not message: return message = json.loads(message) @@ -216,9 +256,47 @@ class _PolicyReceiver(Thread): _PolicyReceiver._logger.exception("policy-notification error %s", str(error)) self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5 + self._web_socket_health[_PolicyReceiver.WS_STATUS] = "error" + self._web_socket_health[_PolicyReceiver.WS_ERROR_COUNT] += 1 + self._web_socket_health["last_error"] = { + "error": str(error), "timestamp": str(datetime.utcnow()) + } + _PolicyReceiver._logger.info("web_socket_health %s", + json.dumps(self._get_health(), sort_keys=True)) + def _on_ws_close(self, code, reason): """restart web-socket on close""" - _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason) + self._web_socket_health["last_closed"] = str(datetime.utcnow()) + self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed" + self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1 + _PolicyReceiver._logger.info( + "lost connection(%s, %s) to PDP - restarting... web_socket_health %s", + code, reason, json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_open(self): + """started web-socket""" + self._web_socket_health[_PolicyReceiver.WS_STATUS] = _PolicyReceiver.WS_STARTED + self._web_socket_health[_PolicyReceiver.WS_START_COUNT] += 1 + self._web_socket_health[_PolicyReceiver.WS_STARTED] = datetime.utcnow() + _PolicyReceiver._logger.info("opened connection to PDP web_socket_health %s", + json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_pong(self, pong): + """pong = response to pinging the server of the web-socket""" + self._web_socket_health[_PolicyReceiver.WS_PONG_COUNT] += 1 + _PolicyReceiver._logger.info( + "pong(%s) from connection to PDP web_socket_health %s", + pong, json.dumps(self._get_health(), sort_keys=True)) + + def _get_health(self): + """returns the healthcheck of the web-socket as json""" + web_socket_health = copy.deepcopy(self._web_socket_health) + started = web_socket_health.get(_PolicyReceiver.WS_STARTED) + if started: + web_socket_health[_PolicyReceiver.WS_STARTED] = str(started) + web_socket_health["uptime"] = str(datetime.utcnow() - started) + return web_socket_health + def shutdown(self, audit): """Shutdown the policy-receiver""" @@ -237,11 +315,20 @@ class _PolicyReceiver(Thread): """need to bring the latest policies to DCAE-Controller""" self._policy_updater.catch_up(audit) + def is_running(self): + """check whether the policy-receiver and policy-updater are running""" + return self.is_alive() and self._policy_updater.is_alive() + class PolicyReceiver(object): """policy-receiver - static singleton wrapper""" _policy_receiver = None @staticmethod + def is_running(): + """check whether the policy-receiver runs""" + return PolicyReceiver._policy_receiver and PolicyReceiver._policy_receiver.is_running() + + @staticmethod def shutdown(audit): """Shutdown the notification-handler""" PolicyReceiver._policy_receiver.shutdown(audit) @@ -254,7 +341,7 @@ class PolicyReceiver(object): @staticmethod def run(audit): """Using policy-engine client to talk to policy engine""" - PolicyReceiver._policy_receiver = _PolicyReceiver() + PolicyReceiver._policy_receiver = _PolicyReceiver(audit) PolicyReceiver._policy_receiver.start() PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 0713b38..85dd914 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -22,6 +22,7 @@ import copy import json import logging import time +import urllib.parse from multiprocessing.dummy import Pool as ThreadPool from threading import Lock @@ -51,6 +52,7 @@ class PolicyRest(object): EXPECTED_VERSIONS = "expected_versions" IGNORE_POLICY_NAMES = "ignore_policy_names" + DEFAULT_TIMEOUT_IN_SECS = 60 _lock = Lock() _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, @@ -65,6 +67,7 @@ class PolicyRest(object): _thread_pool_size = 4 _policy_retry_count = 1 _policy_retry_sleep = 0 + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS @staticmethod def _init(): @@ -85,8 +88,9 @@ class PolicyRest(object): 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)) - PolicyRest._url_get_config = (config.get("url", "") + config.get("path_api", "") - + PolicyRest.POLICY_GET_CONFIG) + get_config_path = urllib.parse.urljoin( + config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG) + PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path) PolicyRest._headers = config.get("headers", {}) PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( @@ -101,12 +105,16 @@ class PolicyRest(object): tls_ca_mode = config.get(Config.TLS_CA_MODE) PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) + if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1: + PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS - PolicyRest._logger.info("PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) custom_kwargs(%s): %s", - PolicyRest._target_entity, PolicyRest._url_get_config, - Metrics.json_dumps(PolicyRest._headers), - tls_ca_mode, json.dumps(PolicyRest._custom_kwargs), - PolicyRest._settings) + PolicyRest._logger.info( + "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s", + PolicyRest._target_entity, PolicyRest._url_get_config, + Metrics.json_dumps(PolicyRest._headers), tls_ca_mode, + PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs), + PolicyRest._settings) PolicyRest._settings.commit_change() PolicyRest._lazy_inited = True @@ -144,6 +152,7 @@ class PolicyRest(object): session = PolicyRest._requests_session target_entity = PolicyRest._target_entity url = PolicyRest._url_get_config + timeout_in_secs = PolicyRest._timeout_in_secs headers = copy.deepcopy(PolicyRest._headers) custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) @@ -152,15 +161,17 @@ class PolicyRest(object): headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id log_action = "post to {} at {}".format(target_entity, url) - log_data = "msg={} headers={}, custom_kwargs({})".format( - json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs)) + log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( + json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs), + timeout_in_secs) log_line = log_action + " " + log_data PolicyRest._logger.info(metrics.metrics_start(log_line)) res = None try: - res = session.post(url, json=json_body, headers=headers, **custom_kwargs) + res = session.post(url, json=json_body, headers=headers, timeout=timeout_in_secs, + **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -195,12 +206,12 @@ class PolicyRest(object): res_data = res.json() if res_data and isinstance(res_data, list) and len(res_data) == 1: - rslt = res_data[0] - if rslt and not rslt.get(POLICY_NAME): + rslt = res_data[0] or {} + if not rslt.get(POLICY_NAME): res_data = None if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED: error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - error_msg = "unexpected {0}".format(log_line) + error_msg = "{} unexpected {}".format(error_code, log_line) PolicyRest._logger.error(error_msg) metrics.set_http_status_code(error_code) @@ -222,8 +233,8 @@ class PolicyRest(object): if (rslt and not rslt.get(POLICY_NAME) and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): - status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value - info_msg = "not found {0}".format(log_line) + status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value + info_msg = "{} not found {}".format(status_code, log_line) PolicyRest._logger.info(info_msg) metrics.set_http_status_code(status_code) @@ -279,7 +290,8 @@ class PolicyRest(object): expect_policy_removed = (ignore_policy_names and not expected_versions) for retry in range(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug(str_metrics) + PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s", + retry, retry_get_config, str_metrics) done, latest_policy, status_code = PolicyRest._get_latest_policy_once( audit, policy_id, expected_versions, ignore_policy_names, @@ -289,16 +301,16 @@ class PolicyRest(object): break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {} from PDP after #{} for policy_id={}" - .format(PolicyRest._url_get_config, retry, policy_id), - error_code=AuditResponseCode.DATA_ERROR) + PolicyRest._logger.error( + audit.error("gave up retrying after #{} for policy_id({}) from PDP {}" + .format(retry, policy_id, PolicyRest._url_get_config), + error_code=AuditResponseCode.DATA_ERROR)) break - audit.warn( - "retry #{} {} from PDP in {} secs for policy_id={}".format( - retry, PolicyRest._url_get_config, - PolicyRest._policy_retry_sleep, policy_id), - error_code=AuditResponseCode.DATA_ERROR) + PolicyRest._logger.warning(audit.warn( + "will retry({}) for policy_id({}) in {} secs from PDP {}".format( + retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config), + error_code=AuditResponseCode.DATA_ERROR)) time.sleep(PolicyRest._policy_retry_sleep) if (expect_policy_removed and not latest_policy @@ -308,10 +320,10 @@ class PolicyRest(object): audit.set_http_status_code(status_code) if not PolicyRest._validate_policy(latest_policy): - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + PolicyRest._logger.error(audit.error( "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR) + error_code=AuditResponseCode.DATA_ERROR)) return latest_policy @@ -331,9 +343,10 @@ class PolicyRest(object): ) if not latest_policy and not expect_policy_removed: - audit.error("received unexpected policy data from PDP for policy_id={}: {}" - .format(policy_id, json.dumps(policy_bodies or [])), - error_code=AuditResponseCode.DATA_ERROR) + PolicyRest._logger.error( + audit.error("received unexpected policy data from PDP for policy_id={}: {}" + .format(policy_id, json.dumps(policy_bodies or [])), + error_code=AuditResponseCode.DATA_ERROR)) done = bool(latest_policy or (expect_policy_removed and not policy_bodies) @@ -411,6 +424,9 @@ class PolicyRest(object): policies = None apns_length = len(apns) + PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length, + json.dumps(policies_to_find)) + if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] else: @@ -419,8 +435,9 @@ class PolicyRest(object): pool.close() pool.join() - metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies))) + metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}" + .format(apns_length, str_metrics, + len(policies), json.dumps(policies))) updated_policies = dict((policy[POLICY_ID], policy) for policy in policies @@ -438,12 +455,12 @@ class PolicyRest(object): and policy_id not in removed_policies) PolicyRest._logger.debug( - "result updated_policies %s, removed_policies %s, errored_policies %s", - json.dumps(updated_policies), json.dumps(removed_policies), + "result(%s) updated_policies %s, removed_policies %s, errored_policies %s", + apns_length, json.dumps(updated_policies), json.dumps(removed_policies), json.dumps(errored_policies)) if errored_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) audit.error( "errored_policies in PDP: {}".format(json.dumps(errored_policies)), error_code=AuditResponseCode.DATA_ERROR) @@ -460,6 +477,7 @@ class PolicyRest(object): PolicyRest._logger.debug("%s", str_policy_filter) status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter) + audit.set_http_status_code(status_code) PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code, str_policy_filter, json.dumps(policy_bodies or [])) @@ -467,14 +485,13 @@ class PolicyRest(object): latest_policies = PolicyUtils.select_latest_policies(policy_bodies) if not latest_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.warn( + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + PolicyRest._logger.warning(audit.warn( "received no policies from PDP for policy_filter {}: {}" .format(str_policy_filter, json.dumps(policy_bodies or [])), - error_code=AuditResponseCode.DATA_ERROR) + error_code=AuditResponseCode.DATA_ERROR)) return None, latest_policies - audit.set_http_status_code(status_code) valid_policies = {} errored_policies = {} for (policy_id, policy) in latest_policies.items(): diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 235e2b6..fb6c8b6 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -106,9 +106,9 @@ class _PolicyUpdate(object): self._audit.req_message = req_message self._logger.info( - "pending request_id %s for %s policies_updated %s policies_removed %s", + "pending(%s) for %s policies_updated %s policies_removed %s", self._audit.request_id, req_message, - json.dumps(policies_updated), json.dumps(policies_removed)) + json.dumps(self._policies_updated), json.dumps(self._policies_removed)) class PolicyUpdater(Thread): @@ -178,13 +178,14 @@ class PolicyUpdater(Thread): if not self._aud_reconfigure: self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) PolicyUpdater._logger.info( - "reconfigure %s request_id %s", + "%s request_id %s", self._aud_reconfigure.req_message, self._aud_reconfigure.request_id ) self._run.set() def run(self): """wait and run the policy-update in thread""" + PolicyUpdater._logger.info("starting policy_updater...") self._run_reconfigure_timer() while True: PolicyUpdater._logger.info("waiting for policy-updates...") @@ -307,7 +308,7 @@ class PolicyUpdater(Thread): if DeployHandler.reconfigure(aud_reconfigure): reconfigure_result += " " + Config.DEPLOY_HANDLER - if self._reconfigure_receiver(): + if self._reconfigure_receiver(aud_reconfigure): reconfigure_result += " web-socket" reconfigure_result += " -- change: {}".format(Config.discovered_config) @@ -348,12 +349,24 @@ class PolicyUpdater(Thread): ) catch_up_result = "" try: + not_found_ok = None PolicyUpdater._logger.info(log_line) self._pause_catch_up_timer() - _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up) + _, policies, policy_filters = PolicyMatcher.get_deployed_policies(aud_catch_up) - if not catch_up_message or not aud_catch_up.is_success_or_not_found(): + catch_up_message = None + if aud_catch_up.is_not_found(): + not_found_ok = True + else: + _, catch_up_message = PolicyMatcher.build_catch_up_message( + aud_catch_up, policies, policy_filters) + + if not_found_ok: + catch_up_result = ("- not sending catch-up " + "- no deployed policies or policy-filters") + PolicyUpdater._logger.warning(catch_up_result) + elif not (catch_up_message and aud_catch_up.is_success()): catch_up_result = "- not sending catch-up to deployment-handler due to errors" PolicyUpdater._logger.warning(catch_up_result) elif catch_up_message.empty(): @@ -402,11 +415,14 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.info(log_line) try: + not_found_ok = None (updated_policies, removed_policies, policy_filter_matches) = PolicyMatcher.match_to_deployed_policies( audit, policies_updated, policies_removed) - if updated_policies or removed_policies: + if audit.is_not_found(): + not_found_ok = True + elif updated_policies or removed_policies: updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( (audit, [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) @@ -415,7 +431,11 @@ class PolicyUpdater(Thread): for policy_id, policy in removed_policies.items()] )) - if not audit.is_success_or_not_found(): + if not_found_ok: + result = ("- not sending policy-updates to deployment-handler " + "- no deployed policies or policy-filters") + PolicyUpdater._logger.warning(result) + elif not audit.is_success(): result = "- not sending policy-updates to deployment-handler due to errors" PolicyUpdater._logger.warning(result) elif not updated_policies and not removed_policies: diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 24db468..73e7fbc 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -26,7 +26,7 @@ import cherrypy from .config import Config from .deploy_handler import PolicyUpdateMessage -from .onap.audit import Audit +from .onap.audit import Audit, AuditHttpCode from .policy_matcher import PolicyMatcher from .policy_receiver import PolicyReceiver from .policy_rest import PolicyRest @@ -34,21 +34,35 @@ from .policy_rest import PolicyRest class PolicyWeb(object): """run http API of policy-handler on 0.0.0.0:wservice_port - any incoming address""" + DATA_NOT_FOUND_ERROR = 404 HOST_INADDR_ANY = ".".join("0"*4) logger = logging.getLogger("policy_handler.policy_web") @staticmethod def run_forever(audit): """run the web-server of the policy-handler forever""" - PolicyWeb.logger.info("policy_handler web-server on port(%d)...", Config.wservice_port) cherrypy.config.update({"server.socket_host": PolicyWeb.HOST_INADDR_ANY, "server.socket_port": Config.wservice_port}) + + protocol = "http" + tls_info = "" + # if Config.tls_server_cert_file and Config.tls_private_key_file: + # cherrypy.server.ssl_module = 'builtin' + # cherrypy.server.ssl_certificate = Config.tls_server_cert_file + # cherrypy.server.ssl_private_key = Config.tls_private_key_file + # if Config.tls_server_ca_chain_file: + # cherrypy.server.ssl_certificate_chain = Config.tls_server_ca_chain_file + # protocol = "https" + # tls_info = "cert: {} {} {}".format(Config.tls_server_cert_file, + # Config.tls_private_key_file, + # Config.tls_server_ca_chain_file) + cherrypy.tree.mount(_PolicyWeb(), '/') - audit.info("running policy_handler web-server as {0}:{1}".format( - cherrypy.server.socket_host, cherrypy.server.socket_port)) - PolicyWeb.logger.info("running policy_handler web-server as %s:%d with config: %s", - cherrypy.server.socket_host, cherrypy.server.socket_port, - json.dumps(cherrypy.config)) + + PolicyWeb.logger.info( + "%s with config: %s", audit.info("running policy_handler as {}://{}:{} {}".format( + protocol, cherrypy.server.socket_host, cherrypy.server.socket_port, tls_info)), + json.dumps(cherrypy.config)) cherrypy.engine.start() class _PolicyWeb(object): @@ -67,17 +81,18 @@ class _PolicyWeb(object): req_info = _PolicyWeb._get_request_info(cherrypy.request) audit = Audit(job_name="get_latest_policy", req_message=req_info, headers=cherrypy.request.headers) - PolicyWeb.logger.info("%s policy_id=%s headers=%s", \ - req_info, policy_id, json.dumps(cherrypy.request.headers)) + PolicyWeb.logger.info("%s policy_id=%s headers=%s", + req_info, policy_id, json.dumps(cherrypy.request.headers)) latest_policy = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} PolicyWeb.logger.info("res %s policy_id=%s latest_policy=%s", req_info, policy_id, json.dumps(latest_policy)) - success, http_status_code, _ = audit.audit_done(result=json.dumps(latest_policy)) - if not success: - cherrypy.response.status = http_status_code + _, http_status_code, _ = audit.audit_done(result=json.dumps(latest_policy)) + if http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value: + http_status_code = PolicyWeb.DATA_NOT_FOUND_ERROR + cherrypy.response.status = http_status_code return latest_policy @@ -89,15 +104,20 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - result, policy_update = PolicyMatcher.get_latest_policies(audit) - if policy_update and isinstance(policy_update, PolicyUpdateMessage): - result["policy_update"] = policy_update.get_message() + result, policies, policy_filters = PolicyMatcher.get_deployed_policies(audit) + if not result: + result, policy_update = PolicyMatcher.build_catch_up_message( + audit, policies, policy_filters) + if policy_update and isinstance(policy_update, PolicyUpdateMessage): + result["policy_update"] = policy_update.get_message() - PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result)) + result_str = json.dumps(result, sort_keys=True) + PolicyWeb.logger.info("result %s: %s", req_info, result_str) - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: - cherrypy.response.status = http_status_code + _, http_status_code, _ = audit.audit_done(result=result_str) + if http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value: + http_status_code = PolicyWeb.DATA_NOT_FOUND_ERROR + cherrypy.response.status = http_status_code return result @@ -159,19 +179,21 @@ class _PolicyWeb(object): req_info = _PolicyWeb._get_request_info(cherrypy.request) audit = Audit(job_name="get_latest_policies", - req_message="{0}: {1}".format(req_info, str_policy_filter), \ - headers=cherrypy.request.headers) - PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \ - req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) + req_message="{0}: {1}".format(req_info, str_policy_filter), + headers=cherrypy.request.headers) + PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", + req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + result_str = json.dumps(result, sort_keys=True) - PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \ - req_info, str_policy_filter, json.dumps(result)) + PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", + req_info, str_policy_filter, result_str) - success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) - if not success: - cherrypy.response.status = http_status_code + _, http_status_code, _ = audit.audit_done(result=result_str) + if http_status_code == AuditHttpCode.DATA_NOT_FOUND_OK.value: + http_status_code = PolicyWeb.DATA_NOT_FOUND_ERROR + cherrypy.response.status = http_status_code return result |