diff options
author | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
commit | f2d7bef13705812c1bf147c2fb65162fbf385c6b (patch) | |
tree | e6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler | |
parent | 50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff) |
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads
of policy-handler to avoid losing the thread and tracking
the unexpected crashes
- rediscover the deployment-handler if not found before
and after each catchup
- refactored audit - separated metrics from audit
- added more stats and runtime info to healthcheck
= gc counts and garbage info if any detected
= memory usage - to detect the potential memory leaks
= request_id to all stats
= stats of active requests
- avoid reallocating the whole Queue of policy-updates after catchup
= clear of the internal queue under proper lock
Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/deploy_handler.py | 85 | ||||
-rw-r--r-- | policyhandler/onap/audit.py | 443 | ||||
-rw-r--r-- | policyhandler/onap/health.py | 93 | ||||
-rw-r--r-- | policyhandler/policy_receiver.py | 66 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 560 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 154 | ||||
-rw-r--r-- | policyhandler/step_timer.py | 27 | ||||
-rw-r--r-- | policyhandler/web_server.py | 11 |
8 files changed, 863 insertions, 576 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 139e660..4ea5ad1 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -26,7 +26,7 @@ import requests from .config import Config from .customize import CustomizerUser from .discovery import DiscoveryClient -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode +from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics POOL_SIZE = 1 @@ -44,27 +44,27 @@ class DeployHandler(object): _server_instance_uuid = None @staticmethod - def _lazy_init(audit): + def _lazy_init(audit, rediscover=False): """ set static properties """ - if DeployHandler._lazy_inited: + if DeployHandler._lazy_inited and not rediscover: return - DeployHandler._lazy_inited = True - DeployHandler._custom_kwargs = CustomizerUser.get_customizer() \ - .get_deploy_handler_kwargs(audit) - if not DeployHandler._custom_kwargs \ - or not isinstance(DeployHandler._custom_kwargs, dict): + DeployHandler._custom_kwargs = (CustomizerUser.get_customizer() + .get_deploy_handler_kwargs(audit)) + if (not DeployHandler._custom_kwargs + or not isinstance(DeployHandler._custom_kwargs, dict)): DeployHandler._custom_kwargs = {} - DeployHandler._requests_session = requests.Session() - DeployHandler._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) - ) - DeployHandler._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) - ) + if not DeployHandler._requests_session: + DeployHandler._requests_session = requests.Session() + DeployHandler._requests_session.mount( + 'https://', + requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + ) + DeployHandler._requests_session.mount( + 'http://', + requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + ) config_dh = Config.config.get("deploy_handler") if config_dh and isinstance(config_dh, dict): @@ -77,7 +77,7 @@ class DeployHandler(object): DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") DeployHandler._url = config_dh.get("url") DeployHandler._logger.info("dns based routing to %s: url(%s)", - DeployHandler._target_entity, DeployHandler._url) + DeployHandler._target_entity, DeployHandler._url) if not DeployHandler._url: # discover routing to deployment-handler at consul-services @@ -85,14 +85,18 @@ class DeployHandler(object): # config for policy-handler <= 2.3.1 # "deploy_handler" : "deployment_handler" DeployHandler._target_entity = str(config_dh or "deployment_handler") - DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity) + DeployHandler._url = DiscoveryClient.get_service_url(audit, + DeployHandler._target_entity) DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' DeployHandler._logger.info( "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy) + DeployHandler._lazy_inited = bool(DeployHandler._url) + + @staticmethod - def policy_update(audit, message): + def policy_update(audit, message, rediscover=False): """ post policy_updated message to deploy-handler @@ -101,10 +105,10 @@ class DeployHandler(object): if not message: return - DeployHandler._lazy_init(audit) - sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, - targetServiceName=DeployHandler._url_policy) - headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id} + DeployHandler._lazy_init(audit, rediscover) + metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, + targetServiceName=DeployHandler._url_policy) + headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} msg_str = json.dumps(message) headers_str = json.dumps(headers) @@ -114,14 +118,14 @@ class DeployHandler(object): log_data = " msg={0} headers={1}".format(msg_str, headers_str) log_line = log_action + log_data DeployHandler._logger.info(log_line) - sub_aud.metrics_start(log_line) + metrics.metrics_start(log_line) if not DeployHandler._url: error_msg = "no url found to {0}".format(log_line) DeployHandler._logger.error(error_msg) - sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) - sub_aud.metrics(error_msg) + metrics.metrics(error_msg) return res = None @@ -130,25 +134,30 @@ class DeployHandler(object): DeployHandler._url_policy, json=message, headers=headers, **DeployHandler._custom_kwargs ) - except requests.exceptions.RequestException as ex: - error_msg = "failed to {0}: {1}{2}".format(log_action, str(ex), log_data) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed to {0} {1}: {2}{3}" + .format(log_action, type(ex).__name__, str(ex), log_data)) DeployHandler._logger.exception(error_msg) - sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) - audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) - sub_aud.metrics(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) return - sub_aud.set_http_status_code(res.status_code) + metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) log_line = "response {0} from {1}: text={2}{3}" \ .format(res.status_code, log_action, res.text, log_data) - sub_aud.metrics(log_line) - DeployHandler._logger.info(log_line) + metrics.metrics(log_line) if res.status_code != requests.codes.ok: + DeployHandler._logger.error(log_line) return + DeployHandler._logger.info(log_line) result = res.json() or {} prev_server_instance_uuid = DeployHandler._server_instance_uuid DeployHandler._server_instance_uuid = result.get("server_instance_uuid") @@ -156,9 +165,9 @@ class DeployHandler(object): deployment_handler_changed = (prev_server_instance_uuid and prev_server_instance_uuid != DeployHandler._server_instance_uuid) if deployment_handler_changed: - log_line = "deployment_handler_changed: {1} != {0}" \ - .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid) - sub_aud.info(log_line) + log_line = ("deployment_handler_changed: {1} != {0}" + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) + metrics.info(log_line) DeployHandler._logger.info(log_line) return deployment_handler_changed diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index 08dcd37..48988fe 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -26,16 +26,19 @@ """ import copy +import gc import json import os import re import subprocess import sys +import threading import time import uuid from datetime import datetime from enum import Enum -from threading import Lock + +import psutil from .CommonLogger import CommonLogger from .health import Health @@ -51,10 +54,15 @@ AUDIT_SERVER = 'server' AUDIT_TARGET_ENTITY = 'targetEntity' AUDIT_METRICS = 'metrics' AUDIT_TOTAL_STATS = 'audit_total_stats' +METRICS_TOTAL_STATS = 'metrics_total_stats' HEADER_CLIENTAUTH = "clientauth" HEADER_AUTHORIZATION = "authorization" +ERROR_CODE = "errorCode" +ERROR_DESCRIPTION = "errorDescription" + + class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 @@ -67,6 +75,7 @@ class AuditHttpCode(Enum): DATA_ERROR = 1030 SCHEMA_ERROR = 1040 + class AuditResponseCode(Enum): """audit response codes""" SUCCESS = 0 @@ -107,7 +116,60 @@ class AuditResponseCode(Enum): return "unknown" return response_code.name.lower().replace("_", " ") -class Audit(object): + +class ProcessInfo(object): + """static class to calculate process info""" + _KILO_SYMBOLS = ('KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB') + _KILO_POWERS = {} + + @staticmethod + def init(): + """init static constants""" + for i, kilo_symbol in enumerate(ProcessInfo._KILO_SYMBOLS): + ProcessInfo._KILO_POWERS[kilo_symbol] = 1 << (i + 1) * 10 + ProcessInfo._KILO_SYMBOLS = list(reversed(ProcessInfo._KILO_SYMBOLS)) + + @staticmethod + def bytes_to_human(byte_count): + """converts byte count to human value in kilo-powers""" + for kilo_symbol in ProcessInfo._KILO_SYMBOLS: + kilo_power = ProcessInfo._KILO_POWERS[kilo_symbol] + if byte_count >= kilo_power: + value = float(byte_count) / kilo_power + return '%.1f%s' % (value, kilo_symbol) + return "%sB" % byte_count + + @staticmethod + def mem_info(): + """calculates the memory usage of the current process""" + process = psutil.Process() + with process.oneshot(): + mem = process.memory_full_info() + return { + "uss" : ProcessInfo.bytes_to_human(mem.uss), + "rss" : ProcessInfo.bytes_to_human(mem.rss), + "swap" : ProcessInfo.bytes_to_human(getattr(mem, "swap", 0)), + "pss" : ProcessInfo.bytes_to_human(getattr(mem, "pss", 0)) + } + + + @staticmethod + def gc_info(full=False): + """gets info from garbage collector""" + gc_info = { + "gc_count" : str(gc.get_count()), + "gc_threshold" : str(gc.get_threshold()) + } + try: + if gc.garbage: + gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage] + if full else len(gc.garbage)) + except Exception: + pass + return gc_info + + +class _Audit(object): """put the audit object on stack per each initiating request in the system :request_id: is the X-ECOMP-RequestID for tracing @@ -121,7 +183,7 @@ class Audit(object): _service_name = "" _service_version = "" _service_instance_uuid = str(uuid.uuid4()) - _started = datetime.now() + _started = datetime.utcnow() _key_format = re.compile(r"\W") _logger_debug = None _logger_error = None @@ -129,95 +191,69 @@ class Audit(object): _logger_audit = None _health = Health() _py_ver = sys.version.replace("\n", "") - try: - _packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines()) - except subprocess.CalledProcessError: - _packages = [] + _packages = [] @staticmethod def init(service_name, service_version, config_file_path): """init static invariants and loggers""" - Audit._service_name = service_name - Audit._service_version = service_version - Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - - @staticmethod - def health(): + _Audit._service_name = service_name + _Audit._service_version = service_version + _Audit._logger_debug = CommonLogger(config_file_path, "debug", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_error = CommonLogger(config_file_path, "error", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_audit = CommonLogger(config_file_path, "audit", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + ProcessInfo.init() + try: + _Audit._packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines()) + except subprocess.CalledProcessError: + pass + + + def health(self, full=False): """returns json for health check""" - now = datetime.now() - return { - "service_name" : Audit._service_name, - "service_version" : Audit._service_version, - "service_instance_uuid" : Audit._service_instance_uuid, - "python" : Audit._py_ver, - "started" : str(Audit._started), - "now" : str(now), - "uptime" : str(now - Audit._started), - "stats" : Audit._health.dump(), - "packages" : Audit._packages + utcnow = datetime.utcnow() + health = { + "server" : { + "service_name" : _Audit._service_name, + "service_version" : _Audit._service_version, + "service_instance_uuid" : _Audit._service_instance_uuid + }, + "runtime" : { + "started" : str(_Audit._started), + "utcnow" : str(utcnow), + "uptime" : str(utcnow - _Audit._started), + "active_threads" : sorted([thr.name for thr in threading.enumerate()]), + "gc" : ProcessInfo.gc_info(full), + "mem_info" : ProcessInfo.mem_info() + }, + "stats" : _Audit._health.dump(), + "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} } + health_txt = "{} health: {}".format(_Audit._service_name, json.dumps(health)) + self.info(health_txt) + return health + - def __init__(self, job_name=None, request_id=None, req_message=None, aud_parent=None, **kwargs): + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): """create audit object per each request in the system :job_name: is the name of the audit job for health stats :request_id: is the X-ECOMP-RequestID for tracing :req_message: is the request message string for logging - :aud_parent: is the parent Audit - used for sub-query metrics to other systems :kwargs: - put any request related params into kwargs """ - self.job_name = Audit._key_format.sub('_', job_name or req_message or Audit._service_name) + self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) self.request_id = request_id self.req_message = req_message or "" - self.aud_parent = aud_parent self.kwargs = kwargs or {} - self.retry_get_config = False self.max_http_status_code = 0 - self._lock = Lock() - - if self.aud_parent: - self.job_name = Audit._key_format.sub( - '_', job_name or self.aud_parent.job_name or Audit._service_name) - if not self.request_id: - self.request_id = self.aud_parent.request_id - if not self.req_message: - self.req_message = self.aud_parent.req_message - self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) - else: - headers = self.kwargs.get("headers", {}) - if headers: - if not self.request_id: - self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) - if AUDIT_IPADDRESS not in self.kwargs: - self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) - - created_req = "" - if not self.request_id: - created_req = " with new" - self.request_id = str(uuid.uuid4()) - - self.kwargs[AUDIT_REQUESTID] = self.request_id + self._lock = threading.Lock() - self._started = time.time() - self._start_event = Audit._logger_audit.getStartRecordEvent() - self.metrics_start() - - if not self.aud_parent: - self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ - .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) def merge_all_kwargs(self, **kwargs): """returns the merge of copy of self.kwargs with the param kwargs""" @@ -228,16 +264,14 @@ class Audit(object): def set_http_status_code(self, http_status_code): """accumulate the highest(worst) http status code""" - self._lock.acquire() - if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: - self.max_http_status_code = max(http_status_code, self.max_http_status_code) - self._lock.release() + with self._lock: + if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: + self.max_http_status_code = max(http_status_code, self.max_http_status_code) def get_max_http_status_code(self): """returns the highest(worst) http status code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() + with self._lock: + max_http_status_code = self.max_http_status_code return max_http_status_code @staticmethod @@ -247,33 +281,11 @@ class Audit(object): return 'COMPLETE' return 'ERROR' - @staticmethod - def hide_secrets(obj): - """hides the known secret field values of the dictionary""" - if not isinstance(obj, dict): - return obj - - for key in obj: - if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: - obj[key] = "*" - elif isinstance(obj[key], dict): - obj[key] = Audit.hide_secrets(obj[key]) - - return obj - - @staticmethod - def log_json_dumps(obj, **kwargs): - """hide the known secret field values of the dictionary and return json.dumps""" - if not isinstance(obj, dict): - return json.dumps(obj, **kwargs) - - return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - def is_serious_error(self, status_code): """returns whether the response_code is success and a human text for response code""" - return AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value \ - or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value + return (AuditResponseCode.PERMISSION_ERROR.value + == AuditResponseCode.get_response_code(status_code).value + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value) def _get_response_status(self): """calculates the response status fields from max_http_status_code""" @@ -290,104 +302,215 @@ class Audit(object): def debug(self, log_line, **kwargs): """debug - the debug=lowest level of logging""" - Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) def info(self, log_line, **kwargs): """debug - the info level of logging""" - Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) def info_requested(self, result=None, **kwargs): """info "requested ..." - the info level of logging""" self.info("requested {0} {1}".format(self.req_message, result or ""), \ **self.merge_all_kwargs(**kwargs)) - def warn(self, log_line, **kwargs): + def warn(self, log_line, error_code=None, **kwargs): """debug+error - the warn level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.warn(log_line, **all_kwargs) - Audit._logger_error.warn(log_line, **all_kwargs) - def error(self, log_line, **kwargs): + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.warn(log_line, **all_kwargs) + _Audit._logger_error.warn(log_line, **all_kwargs) + + def error(self, log_line, error_code=None, **kwargs): """debug+error - the error level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.error(log_line, **all_kwargs) - Audit._logger_error.error(log_line, **all_kwargs) - def fatal(self, log_line, **kwargs): + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.error(log_line, **all_kwargs) + _Audit._logger_error.error(log_line, **all_kwargs) + + def fatal(self, log_line, error_code=None, **kwargs): """debug+error - the fatal level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.fatal(log_line, **all_kwargs) - Audit._logger_error.fatal(log_line, **all_kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.fatal(log_line, **all_kwargs) + _Audit._logger_error.fatal(log_line, **all_kwargs) + + @staticmethod + def hide_secrets(obj): + """hides the known secret field values of the dictionary""" + if not isinstance(obj, dict): + return obj + + for key in obj: + if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: + obj[key] = "*" + elif isinstance(obj[key], dict): + obj[key] = _Audit.hide_secrets(obj[key]) + + return obj + + @staticmethod + def log_json_dumps(obj, **kwargs): + """hide the known secret field values of the dictionary and return json.dumps""" + if not isinstance(obj, dict): + return json.dumps(obj, **kwargs) + + return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) @staticmethod def get_elapsed_time(started): """returns the elapsed time since started in milliseconds""" - return int(round(1000 * (time.time() - started))) + return int(round(1000 * (time.time() - (started or 0)))) - def metrics_start(self, log_line=None, **kwargs): - """reset metrics timing""" - self._metrics_started = time.time() - self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() - if log_line: - self.info(log_line, **self.merge_all_kwargs(**kwargs)) - def metrics(self, log_line, **kwargs): - """debug+metrics - the metrics=sub-audit level of logging""" +class Audit(_Audit): + """Audit class to track the high level operations""" + + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): + """create audit object per each request in the system + + :job_name: is the name of the audit job for health stats + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super(Audit, self).__init__(job_name=job_name, + request_id=request_id, + req_message=req_message, + **kwargs) + + headers = self.kwargs.get("headers", {}) + if headers: + if not self.request_id: + self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + if AUDIT_IPADDRESS not in self.kwargs: + self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) + + created_req = "" + if not self.request_id: + created_req = " with new" + self.request_id = str(uuid.uuid4()) + + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) + + self.kwargs[AUDIT_REQUESTID] = self.request_id + + _Audit._health.start(self.job_name, self.request_id) + _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id) + + self._started = time.time() + self._start_event = Audit._logger_audit.getStartRecordEvent() + + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + + + def audit_done(self, result=None, **kwargs): + """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) success, max_http_status_code, response_code, response_description = \ self._get_response_status() - metrics_func = None - timer = Audit.get_elapsed_time(self._metrics_started) - metrics_job = Audit._key_format.sub( - '_', all_kwargs.get(AUDIT_TARGET_ENTITY, AUDIT_METRICS + "_" + self.job_name)) + log_line = "{0} {1}".format(self.req_message, result or "").strip() + audit_func = None + timer = _Audit.get_elapsed_time(self._started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) - metrics_func = Audit._logger_metrics.info - Audit._health.success(metrics_job, timer) + audit_func = _Audit._logger_audit.info + _Audit._health.success(self.job_name, timer, self.request_id) + _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id) else: log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=response_description, **all_kwargs) - metrics_func = Audit._logger_metrics.error - Audit._health.error(metrics_job, timer) - - metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, - statusCode=Audit.get_status_code(success), responseCode=response_code.value, - responseDescription=response_description, - **all_kwargs - ) - - self.metrics_start() + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + audit_func = _Audit._logger_audit.error + _Audit._health.error(self.job_name, timer, self.request_id) + _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs) + return (success, max_http_status_code, response_description) - def audit_done(self, result=None, **kwargs): - """debug+audit - the audit=top level of logging""" + +class Metrics(_Audit): + """Metrics class to track the calls to outside systems""" + + def __init__(self, aud_parent, **kwargs): + """create audit object per each request in the system + + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super(Metrics, self).__init__(job_name=aud_parent.job_name, + request_id=aud_parent.request_id, + req_message=aud_parent.req_message, + **aud_parent.merge_all_kwargs(**kwargs)) + self.aud_parent = aud_parent + self._metrics_name = _Audit._key_format.sub( + '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name)) + + self._metrics_started = None + self._metrics_start_event = None + + + def metrics_start(self, log_line=None, **kwargs): + """reset metrics timing""" + self.merge_all_kwargs(**kwargs) + self._metrics_started = time.time() + self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent() + if log_line: + self.info(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._health.start(self._metrics_name, self.request_id) + _Audit._health.start(METRICS_TOTAL_STATS, self.request_id) + + + def metrics(self, log_line, **kwargs): + """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) success, max_http_status_code, response_code, response_description = \ self._get_response_status() - log_line = "{0} {1}".format(self.req_message, result or "").strip() - audit_func = None - timer = Audit.get_elapsed_time(self._started) + metrics_func = None + timer = _Audit.get_elapsed_time(self._metrics_started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) - audit_func = Audit._logger_audit.info - Audit._health.success(self.job_name, timer) - Audit._health.success(AUDIT_TOTAL_STATS, timer) + metrics_func = _Audit._logger_metrics.info + _Audit._health.success(self._metrics_name, timer, self.request_id) + _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id) else: log_line = "failed: {0}".format(log_line) self.error(log_line, errorCode=response_code.value, errorDescription=response_description, **all_kwargs) - audit_func = Audit._logger_audit.error - Audit._health.error(self.job_name, timer) - Audit._health.error(AUDIT_TOTAL_STATS, timer) - - audit_func(log_line, begTime=self._start_event, timer=timer, - statusCode=Audit.get_status_code(success), - responseCode=response_code.value, - responseDescription=response_description, - **all_kwargs - ) + metrics_func = _Audit._logger_metrics.error + _Audit._health.error(self._metrics_name, timer, self.request_id) + _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id) + + metrics_func( + log_line, + begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()), + timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs) return (success, max_http_status_code, response_description) diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py index 485f422..e6a6f69 100644 --- a/policyhandler/onap/health.py +++ b/policyhandler/onap/health.py @@ -28,46 +28,95 @@ class HealthStats(object): """keep track of stats for metrics calls""" self._name = name or "stats_" + str(uuid.uuid4()) self._lock = Lock() + self._call_count = 0 self._error_count = 0 + self._active_count = 0 + self._longest_timer = 0 self._total_timer = 0 + self._last_success = None self._last_error = None + self._last_start = None + self._longest_end_ts = None + + self._last_success_request_id = None + self._last_error_request_id = None + self._last_started_request_id = None + self._longest_request_id = None + def dump(self): """returns dict of stats""" dump = None with self._lock: dump = { - "call_count" : self._call_count, - "error_count" : self._error_count, - "last_success" : str(self._last_success), - "last_error" : str(self._last_error), - "longest_timer_millisecs" : self._longest_timer, - "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \ - if self._call_count else 0) + "total" : { + "call_count" : self._call_count, + "ave_timer_millisecs" : (float(self._total_timer)/self._call_count + if self._call_count else 0) + }, + "success" : { + "success_count" : (self._call_count - self._error_count), + "last_success" : str(self._last_success), + "last_success_request_id" : self._last_success_request_id + }, + "error" : { + "error_count" : self._error_count, + "last_error" : str(self._last_error), + "last_error_request_id" : self._last_error_request_id + }, + "active" : { + "active_count" : self._active_count, + "last_start" : str(self._last_start), + "last_started_request_id" : self._last_started_request_id + }, + "longest" : { + "longest_timer_millisecs" : self._longest_timer, + "longest_request_id" : self._longest_request_id, + "longest_end" : str(self._longest_end_ts) + } } return dump - def success(self, timer): + + def start(self, request_id=None): + """records the start of active execution""" + with self._lock: + self._active_count += 1 + self._last_start = datetime.utcnow() + self._last_started_request_id = request_id + + + def success(self, timer, request_id=None): """records the successful execution""" with self._lock: + self._active_count -= 1 self._call_count += 1 - self._last_success = datetime.now() + self._last_success = datetime.utcnow() + self._last_success_request_id = request_id self._total_timer += timer if not self._longest_timer or self._longest_timer < timer: self._longest_timer = timer + self._longest_request_id = request_id + self._longest_end_ts = self._last_success + - def error(self, timer): + def error(self, timer, request_id=None): """records the errored execution""" with self._lock: + self._active_count -= 1 self._call_count += 1 self._error_count += 1 - self._last_error = datetime.now() + self._last_error = datetime.utcnow() + self._last_error_request_id = request_id self._total_timer += timer if not self._longest_timer or self._longest_timer < timer: self._longest_timer = timer + self._longest_request_id = request_id + self._longest_end_ts = self._last_error + class Health(object): """Health stats for multiple requests""" @@ -76,28 +125,36 @@ class Health(object): self._all_stats = {} self._lock = Lock() + def _add_or_get_stats(self, stats_name): """add to or get from the ever growing dict of HealthStats""" - stats = None with self._lock: stats = self._all_stats.get(stats_name) if not stats: self._all_stats[stats_name] = stats = HealthStats(stats_name) - return stats + return stats + + + def start(self, stats_name, request_id=None): + """records the start of execution on stats_name""" + stats = self._add_or_get_stats(stats_name) + stats.start(request_id) + - def success(self, stats_name, timer): + def success(self, stats_name, timer, request_id=None): """records the successful execution on stats_name""" stats = self._add_or_get_stats(stats_name) - stats.success(timer) + stats.success(timer, request_id) + - def error(self, stats_name, timer): + def error(self, stats_name, timer, request_id=None): """records the error execution on stats_name""" stats = self._add_or_get_stats(stats_name) - stats.error(timer) + stats.error(timer, request_id) + def dump(self): """returns dict of stats""" with self._lock: stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems()) - return stats diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 751bea8..dd9eea6 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -35,7 +35,7 @@ from threading import Lock, Thread import websocket from .config import Config -from .onap.audit import Audit +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_updater import PolicyUpdater LOADED_POLICIES = 'loadedPolicies' @@ -115,32 +115,41 @@ class _PolicyReceiver(Thread): def _on_pdp_message(self, _, message): """received the notification from PDP""" - _PolicyReceiver._logger.info("Received notification message: %s", message) - if not message: - return - message = json.loads(message) - - if not message: - return - - policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(LOADED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(REMOVED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - - if not policies_updated and not policies_removed: - _PolicyReceiver._logger.info( - "no policy updated or removed for scopes %s", self._policy_scopes.pattern - ) - return + try: + _PolicyReceiver._logger.info("Received notification message: %s", message) + if not message: + return + message = json.loads(message) + + if not message or not isinstance(message, dict): + _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message)) + return + + policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(LOADED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) + for policy in message.get(REMOVED_POLICIES, []) + if self._policy_scopes.match(policy.get(POLICY_NAME))] + + if not policies_updated and not policies_removed: + _PolicyReceiver._logger.info("no policy updated or removed for scopes %s", + self._policy_scopes.pattern) + return + + audit = Audit(job_name="policy_update", + req_message="policy-notification - updated[{0}], removed[{1}]" + .format(len(policies_updated), len(policies_removed)), + retry_get_config=True) + self._policy_updater.enqueue(audit, policies_updated, policies_removed) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_pdp_message", json.dumps(message)) + + _PolicyReceiver._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - audit = Audit(job_name="policy_update", - req_message="policy-notification - updated[{0}], removed[{1}]" - .format(len(policies_updated), len(policies_removed))) - audit.retry_get_config = True - self._policy_updater.enqueue(audit, policies_updated, policies_removed) def _on_ws_error(self, _, error): """report an error""" @@ -184,12 +193,7 @@ class PolicyReceiver(object): @staticmethod def run(audit): """Using policy-engine client to talk to policy engine""" - sub_aud = Audit(aud_parent=audit) - sub_aud.metrics_start("start policy receiver") - PolicyReceiver._policy_receiver = _PolicyReceiver() PolicyReceiver._policy_receiver.start() - sub_aud.metrics("started policy receiver") - PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 0c8920a..977a9a1 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -27,11 +27,11 @@ from multiprocessing.dummy import Pool as ThreadPool import requests from .config import Config -from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, - AuditResponseCode) -from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY, - POLICY_CONFIG, POLICY_FILTER, POLICY_ID, - POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES) +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES, + POLICY_BODY, POLICY_CONFIG, POLICY_FILTER, + POLICY_ID, POLICY_NAME, SCOPE_PREFIXES) from .policy_utils import PolicyUtils @@ -95,43 +95,47 @@ class PolicyRest(object): PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1 PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0) - PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \ - PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \ + PolicyRest._logger.info( + "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", + PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers), json.dumps(PolicyRest._scope_prefixes)) @staticmethod def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" - sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \ - targetServiceName=PolicyRest._url_get_config) + metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, + targetServiceName=PolicyRest._url_get_config) msg = json.dumps(json_body) headers = copy.copy(PolicyRest._headers) - headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id - headers_str = Audit.log_json_dumps(headers) + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + headers_str = Metrics.log_json_dumps(headers) log_line = "post to PDP {0} msg={1} headers={2}".format( PolicyRest._url_get_config, msg, headers_str) - sub_aud.metrics_start(log_line) + metrics.metrics_start(log_line) PolicyRest._logger.info(log_line) res = None try: res = PolicyRest._requests_session.post( PolicyRest._url_get_config, json=json_body, headers=headers) - except requests.exceptions.RequestException as ex: - error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value - error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \ - .format(PolicyRest._url_get_config, str(ex), msg, headers_str) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ( + "failed to post to PDP {0} {1}: {2} msg={3} headers={4}" + .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str)) PolicyRest._logger.exception(error_msg) - sub_aud.set_http_status_code(error_code) + metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) - sub_aud.metrics(error_msg) + metrics.metrics(error_msg) return (error_code, None) log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format( PolicyRest._url_get_config, res.status_code, msg, res.text, - Audit.log_json_dumps(dict(res.request.headers.items()))) + Metrics.log_json_dumps(dict(res.request.headers.items()))) res_data = None if res.status_code == requests.codes.ok: @@ -146,9 +150,9 @@ class PolicyRest(object): error_msg = "unexpected {0}".format(log_line) PolicyRest._logger.error(error_msg) - sub_aud.set_http_status_code(error_code) + metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) - sub_aud.metrics(error_msg) + metrics.metrics(error_msg) return (error_code, None) elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR: @@ -163,14 +167,14 @@ class PolicyRest(object): info_msg = "not found {0}".format(log_line) PolicyRest._logger.info(info_msg) - sub_aud.set_http_status_code(status_code) - sub_aud.metrics(info_msg) + metrics.set_http_status_code(status_code) + metrics.metrics(info_msg) return (status_code, None) except ValueError: pass - sub_aud.set_http_status_code(res.status_code) - sub_aud.metrics(log_line) + metrics.set_http_status_code(res.status_code) + metrics.metrics(log_line) PolicyRest._logger.info(log_line) return res.status_code, res_data @@ -191,165 +195,187 @@ class PolicyRest(object): @staticmethod def get_latest_policy(aud_policy_id): """Get the latest policy for the policy_id from the policy-engine""" - PolicyRest._lazy_init() audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id + str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format( + policy_id, min_version_expected, json.dumps(ignore_policy_names)) - status_code = 0 - policy_configs = None - latest_policy = None - expect_policy_removed = (ignore_policy_names and not min_version_expected) - - for retry in xrange(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("%s", policy_id) - - status_code, policy_configs = PolicyRest._pdp_get_config( - audit, {POLICY_NAME:policy_id} - ) - - PolicyRest._logger.debug("%s %s policy_configs: %s", - status_code, policy_id, json.dumps(policy_configs or [])) - - latest_policy = PolicyUtils.select_latest_policy( - policy_configs, min_version_expected, ignore_policy_names - ) - - if not latest_policy and not expect_policy_removed: - audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" - .format(policy_id, json.dumps(policy_configs or [])), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text( - AuditResponseCode.DATA_ERROR)) - - if latest_policy or not audit.retry_get_config \ - or (expect_policy_removed and not policy_configs) \ - or not PolicyRest._policy_retry_sleep \ - or audit.is_serious_error(status_code): - break - - if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" - .format(PolicyRest._url_get_config, retry, policy_id), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text( - AuditResponseCode.DATA_ERROR)) - break - - audit.warn( - "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( - retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text( - AuditResponseCode.DATA_ERROR)) - time.sleep(PolicyRest._policy_retry_sleep) - - if expect_policy_removed and not latest_policy \ - and AuditHttpCode.RESPONSE_ERROR.value == status_code: - audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) - return None + try: + PolicyRest._lazy_init() + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") + policy_configs = None + latest_policy = None + expect_policy_removed = (ignore_policy_names and not min_version_expected) + + for retry in xrange(1, PolicyRest._policy_retry_count + 1): + PolicyRest._logger.debug(str_metrics) + + status_code, policy_configs = PolicyRest._pdp_get_config( + audit, {POLICY_NAME:policy_id} + ) + + PolicyRest._logger.debug("%s %s policy_configs: %s", + status_code, policy_id, json.dumps(policy_configs or [])) + + latest_policy = PolicyUtils.select_latest_policy( + policy_configs, min_version_expected, ignore_policy_names + ) - audit.set_http_status_code(status_code) - if not PolicyRest._validate_policy(latest_policy): - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( - "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR) - ) + if not latest_policy and not expect_policy_removed: + audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + .format(policy_id, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + + if (latest_policy + or not retry_get_config + or (expect_policy_removed and not policy_configs) + or not PolicyRest._policy_retry_sleep + or audit.is_serious_error(status_code)): + break + + if retry == PolicyRest._policy_retry_count: + audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + .format(PolicyRest._url_get_config, retry, policy_id), + error_code=AuditResponseCode.DATA_ERROR) + break + + audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + retry, PolicyRest._url_get_config, + PolicyRest._policy_retry_sleep, policy_id), + error_code=AuditResponseCode.DATA_ERROR) + time.sleep(PolicyRest._policy_retry_sleep) + + if (expect_policy_removed and not latest_policy + and AuditHttpCode.RESPONSE_ERROR.value == status_code): + audit.set_http_status_code(AuditHttpCode.HTTP_OK.value) + return None + + audit.set_http_status_code(status_code) + if not PolicyRest._validate_policy(latest_policy): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR) + + return latest_policy + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policy", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None - return latest_policy @staticmethod def get_latest_updated_policies(aud_policy_updates): """Get the latest policies of the list of policy_names from the policy-engine""" - PolicyRest._lazy_init() audit, policies_updated, policies_removed = aud_policy_updates if not policies_updated and not policies_removed: - return + return None, None str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( len(policies_updated), json.dumps(policies_updated), len(policies_removed), json.dumps(policies_removed)) - audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) - PolicyRest._logger.debug(str_metrics) - - policies_to_find = {} - for (policy_name, policy_version) in policies_updated: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id or not policy_version.isdigit(): - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), - PolicyRest.IGNORE_POLICY_NAMES: {} - } - continue - if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): - policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) - - for (policy_name, _) in policies_removed: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id: - continue - policy = policies_to_find.get(policy_id) - if not policy: - policies_to_find[policy_id] = { - POLICY_ID: policy_id, - PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} - } - continue - policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True - - apns = [(audit, policy_id, - policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), - policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) - for (policy_id, policy_to_find) in policies_to_find.iteritems()] - - policies = None - apns_length = len(apns) - if apns_length == 1: - policies = [PolicyRest.get_latest_policy(apns[0])] - else: - pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) - policies = pool.map(PolicyRest.get_latest_policy, apns) - pool.close() - pool.join() - - audit.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies)), - targetEntity=PolicyRest._target_entity, - targetServiceName=PolicyRest._url_get_config) - - updated_policies = dict((policy[POLICY_ID], policy) - for policy in policies - if policy and policy.get(POLICY_ID)) - - removed_policies = dict((policy_id, True) - for (policy_id, policy_to_find) in policies_to_find.iteritems() - if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) - and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) - and policy_id not in updated_policies) - - errored_policies = dict((policy_id, policy_to_find) - for (policy_id, policy_to_find) in policies_to_find.iteritems() - if policy_id not in updated_policies - and policy_id not in removed_policies) - - PolicyRest._logger.debug( - "result updated_policies %s, removed_policies %s, errored_policies %s", - json.dumps(updated_policies), json.dumps(removed_policies), - json.dumps(errored_policies)) - - if errored_policies: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.error( - "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR) - ) - - return updated_policies, removed_policies + + target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity) + try: + PolicyRest._lazy_init() + metrics = Metrics(aud_parent=audit, + targetEntity=target_entity, + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + PolicyRest._logger.debug(str_metrics) + + policies_to_find = {} + for (policy_name, policy_version) in policies_updated: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id or not policy_version.isdigit(): + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.MIN_VERSION_EXPECTED: int(policy_version), + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): + policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) + + for (policy_name, _) in policies_removed: + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.iteritems()] + + policies = None + apns_length = len(apns) + if apns_length == 1: + policies = [PolicyRest.get_latest_policy(apns[0])] + else: + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) + policies = pool.map(PolicyRest.get_latest_policy, apns) + pool.close() + pool.join() + + metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.iteritems() + if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.iteritems() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + PolicyRest._logger.debug( + "result updated_policies %s, removed_policies %s, errored_policies %s", + json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.error( + "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_updated_policies", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None + @staticmethod def _get_latest_policies(aud_policy_filter): @@ -358,104 +384,120 @@ class PolicyRest(object): or all the latest policies of the same scope from the policy-engine """ audit, policy_filter, scope_prefix = aud_policy_filter - str_policy_filter = json.dumps(policy_filter) - PolicyRest._logger.debug("%s", str_policy_filter) - - status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) - - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, - str_policy_filter, json.dumps(policy_configs or [])) + try: + str_policy_filter = json.dumps(policy_filter) + PolicyRest._logger.debug("%s", str_policy_filter) + + status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + + PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, + str_policy_filter, json.dumps(policy_configs or [])) + + latest_policies = PolicyUtils.select_latest_policies(policy_configs) + + if (scope_prefix and not policy_configs + and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value): + audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies, scope_prefix + + if not latest_policies: + if not scope_prefix: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_configs or [])), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies, None + + audit.set_http_status_code(status_code) + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.iteritems(): + if PolicyRest._validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + return valid_policies, errored_policies, None + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})" + .format(audit.request_id, type(ex).__name__, str(ex), + "_get_latest_policies", json.dumps(policy_filter), scope_prefix)) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None, scope_prefix - if scope_prefix and not policy_configs \ - and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value: - audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text( - AuditResponseCode.DATA_ERROR) - ) - return None, latest_policies, scope_prefix - - if not latest_policies: - if not scope_prefix: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.warn( - "received no policies from PDP for policy_filter {0}: {1}" - .format(str_policy_filter, json.dumps(policy_configs or [])), - errorCode=AuditResponseCode.DATA_ERROR.value, - errorDescription=AuditResponseCode.get_human_text( - AuditResponseCode.DATA_ERROR) - ) - return None, latest_policies, None - - audit.set_http_status_code(status_code) - valid_policies = {} - errored_policies = {} - for (policy_id, policy) in latest_policies.iteritems(): - if PolicyRest._validate_policy(policy): - valid_policies[policy_id] = policy - else: - errored_policies[policy_id] = policy - return valid_policies, errored_policies, None @staticmethod def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" - PolicyRest._lazy_init() - result = {} aud_policy_filters = None str_policy_filters = None str_metrics = None target_entity = None - if policy_filter is not None: - aud_policy_filters = [(audit, policy_filter, None)] - str_policy_filters = json.dumps(policy_filter) - str_metrics = "get_latest_policies for policy_filter {0}".format( - str_policy_filters) - target_entity = ("{0} total get_latest_policies by policy_filter" - .format(PolicyRest._target_entity)) - result[POLICY_FILTER] = copy.deepcopy(policy_filter) - else: - aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) - for scope_prefix in PolicyRest._scope_prefixes] - str_policy_filters = json.dumps(PolicyRest._scope_prefixes) - str_metrics = "get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), str_policy_filters) - target_entity = ("{0} total get_latest_policies by scope_prefixes" - .format(PolicyRest._target_entity)) - result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) - - PolicyRest._logger.debug("%s", str_policy_filters) - audit.metrics_start(str_metrics) - - latest_policies = None - apfs_length = len(aud_policy_filters) - if apfs_length == 1: - latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] - else: - pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) - latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) - pool.close() - pool.join() - - audit.metrics( - "total result {0}: {1} {2}".format( - str_metrics, len(latest_policies), json.dumps(latest_policies)), - targetEntity=target_entity, targetServiceName=PolicyRest._url_get_config) - - # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] - result[LATEST_POLICIES] = dict( - pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems()) - - result[ERRORED_POLICIES] = dict( - pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems()) - - result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) - - PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", - str_policy_filters, json.dumps(result)) - - return result + try: + PolicyRest._lazy_init() + if policy_filter is not None: + aud_policy_filters = [(audit, policy_filter, None)] + str_policy_filters = json.dumps(policy_filter) + str_metrics = "get_latest_policies for policy_filter {0}".format( + str_policy_filters) + target_entity = ("{0} total get_latest_policies by policy_filter" + .format(PolicyRest._target_entity)) + result[POLICY_FILTER] = copy.deepcopy(policy_filter) + else: + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) + for scope_prefix in PolicyRest._scope_prefixes] + str_policy_filters = json.dumps(PolicyRest._scope_prefixes) + str_metrics = "get_latest_policies for scopes {0} {1}".format( \ + len(PolicyRest._scope_prefixes), str_policy_filters) + target_entity = ("{0} total get_latest_policies by scope_prefixes" + .format(PolicyRest._target_entity)) + result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) + + PolicyRest._logger.debug("%s", str_policy_filters) + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, + targetServiceName=PolicyRest._url_get_config) + + metrics.metrics_start(str_metrics) + + latest_policies = None + apfs_length = len(aud_policy_filters) + if apfs_length == 1: + latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] + else: + pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) + pool.close() + pool.join() + + metrics.metrics("total result {0}: {1} {2}".format( + str_metrics, len(latest_policies), json.dumps(latest_policies))) + + # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + result[LATEST_POLICIES] = dict( + pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems()) + + result[ERRORED_POLICIES] = dict( + pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems()) + + result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) + + PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", + str_policy_filters, json.dumps(result)) + return result + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policies", str_metrics)) + + PolicyRest._logger.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 70823fa..38ce93a 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -26,7 +26,7 @@ from threading import Lock, Thread from .config import Config from .deploy_handler import DeployHandler -from .onap.audit import Audit +from .onap.audit import Audit, AuditHttpCode, AuditResponseCode from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES) from .policy_rest import PolicyRest @@ -56,6 +56,7 @@ class PolicyUpdater(Thread): self._lock = Lock() self._queue = Queue() + def enqueue(self, audit=None, policies_updated=None, policies_removed=None): """enqueue the policy-updates""" policies_updated = policies_updated or [] @@ -65,7 +66,23 @@ class PolicyUpdater(Thread): "enqueue request_id %s policies_updated %s policies_removed %s", ((audit and audit.request_id) or "none"), json.dumps(policies_updated), json.dumps(policies_removed)) - self._queue.put((audit, policies_updated, policies_removed)) + + with self._lock: + self._queue.put((audit, policies_updated, policies_removed)) + + + def catch_up(self, audit=None): + """need to bring the latest policies to DCAE-Controller""" + with self._lock: + if not self._aud_catch_up: + self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) + PolicyUpdater._logger.info( + "catch_up %s request_id %s", + self._aud_catch_up.req_message, self._aud_catch_up.request_id + ) + + self.enqueue() + def run(self): """wait and run the policy-update in thread""" @@ -78,32 +95,15 @@ class PolicyUpdater(Thread): json.dumps(policies_updated), json.dumps(policies_removed)) if not self._keep_running(): - self._queue.task_done() break if self._on_catch_up(): self._reset_queue() continue elif not queued_audit: - self._queue.task_done() continue - updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (queued_audit, policies_updated, policies_removed)) - - message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - - deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) - - self._queue.task_done() - queued_audit.audit_done() - - if deployment_handler_changed: - self._catch_up_prev_message = None - self._pause_catch_up_timer() - self.catch_up() - elif not queued_audit.is_success(): - self._catch_up_prev_message = None + self._on_policies_update(queued_audit, policies_updated, policies_removed) PolicyUpdater._logger.info("exit policy-updater") @@ -116,17 +116,6 @@ class PolicyUpdater(Thread): self._aud_shutdown.audit_done() return keep_running - def catch_up(self, audit=None): - """need to bring the latest policies to DCAE-Controller""" - with self._lock: - self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) - PolicyUpdater._logger.info( - "catch_up %s request_id %s", - self._aud_catch_up.req_message, self._aud_catch_up.request_id - ) - - self.enqueue() - def _run_catch_up_timer(self): """create and start the catch_up timer""" if not self._catch_up_interval: @@ -190,9 +179,9 @@ class PolicyUpdater(Thread): def _reset_queue(self): """clear up the queue""" with self._lock: - self._aud_catch_up = None - self._queue.task_done() - self._queue = Queue() + if not self._aud_catch_up and not self._aud_shutdown: + with self._queue.mutex: + self._queue.queue.clear() def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" @@ -207,37 +196,94 @@ class PolicyUpdater(Thread): log_line = "catch_up {0} request_id {1}".format( aud_catch_up.req_message, aud_catch_up.request_id ) + try: + PolicyUpdater._logger.info(log_line) + self._pause_catch_up_timer() - PolicyUpdater._logger.info(log_line) - self._pause_catch_up_timer() + catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) + catch_up_message[CATCH_UP] = True - catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) - catch_up_message[CATCH_UP] = True - - catch_up_result = "" - if not aud_catch_up.is_success(): - catch_up_result = "- not sending catch-up to deployment-handler due to errors" - PolicyUpdater._logger.warn(catch_up_result) - elif self._need_to_send_catch_up(aud_catch_up, catch_up_message): - DeployHandler.policy_update(aud_catch_up, catch_up_message) - if aud_catch_up.is_success(): - catch_up_result = "- sent catch-up to deployment-handler" - else: - catch_up_result = "- failed to send catch-up to deployment-handler" + catch_up_result = "" + if not aud_catch_up.is_success(): + catch_up_result = "- not sending catch-up to deployment-handler due to errors" PolicyUpdater._logger.warn(catch_up_result) - else: - catch_up_result = "- skipped sending the same policies" - success, _, _ = aud_catch_up.audit_done(result=catch_up_result) - PolicyUpdater._logger.info(log_line + " " + catch_up_result) - PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message): + catch_up_result = "- skipped sending the same policies" + else: + DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) + if not aud_catch_up.is_success(): + catch_up_result = "- failed to send catch-up to deployment-handler" + PolicyUpdater._logger.warn(catch_up_result) + else: + catch_up_result = "- sent catch-up to deployment-handler" + success, _, _ = aud_catch_up.audit_done(result=catch_up_result) + PolicyUpdater._logger.info(log_line + " " + catch_up_result) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(aud_catch_up.request_id, type(ex).__name__, str(ex), + "on_catch_up", log_line + " " + catch_up_result)) + + PolicyUpdater._logger.exception(error_msg) + aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + success = False if not success: self._catch_up_prev_message = None self._run_catch_up_timer() + PolicyUpdater._logger.info("policy_handler health: %s", + json.dumps(aud_catch_up.health(full=True))) return success + + def _on_policies_update(self, queued_audit, policies_updated, policies_removed): + """handle the event of policy-updates from the queue""" + deployment_handler_changed = None + result = "" + + log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( + ((queued_audit and queued_audit.request_id) or "none"), + json.dumps(policies_updated), json.dumps(policies_removed)) + + try: + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (queued_audit, policies_updated, policies_removed)) + + if not queued_audit.is_success(): + result = "- not sending policy-updates to deployment-handler due to errors" + PolicyUpdater._logger.warn(result) + else: + message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} + deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) + if not queued_audit.is_success(): + result = "- failed to send policy-updates to deployment-handler" + PolicyUpdater._logger.warn(result) + else: + result = "- sent policy-updates to deployment-handler" + + success, _, _ = queued_audit.audit_done(result=result) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(queued_audit.request_id, type(ex).__name__, str(ex), + "on_policies_update", log_line + " " + result)) + + PolicyUpdater._logger.exception(error_msg) + queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + success = False + + if deployment_handler_changed: + self._catch_up_prev_message = None + self._pause_catch_up_timer() + self.catch_up() + elif not success: + self._catch_up_prev_message = None + + def shutdown(self, audit): """Shutdown the policy-updater""" PolicyUpdater._logger.info("shutdown policy-updater") diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py index 3936107..2a13dd5 100644 --- a/policyhandler/step_timer.py +++ b/policyhandler/step_timer.py @@ -18,6 +18,7 @@ """periodically callback""" +import json from datetime import datetime from threading import Event, RLock, Thread @@ -51,11 +52,11 @@ class StepTimer(Thread): self._request = StepTimer.INIT self._req_count = 0 self._req_time = 0 - self._req_ts = datetime.now() + self._req_ts = datetime.utcnow() self._substep = None self._substep_time = 0 - self._substep_ts = datetime.now() + self._substep_ts = datetime.utcnow() def get_timer_status(self): """returns timer status""" @@ -104,9 +105,9 @@ class StepTimer(Thread): prev_req = self._request self._request = request - now = datetime.now() - self._req_time = (now - self._req_ts).total_seconds() - self._req_ts = now + utcnow = datetime.utcnow() + self._req_time = (utcnow - self._req_ts).total_seconds() + self._req_ts = utcnow self._logger.info("{0}[{1}] {2}->{3}".format( self.name, self._req_time, prev_req, self.get_timer_status())) @@ -114,9 +115,9 @@ class StepTimer(Thread): """log exe step""" with self._lock: self._substep = substep - now = datetime.now() - self._substep_time = (now - self._substep_ts).total_seconds() - self._substep_ts = now + utcnow = datetime.utcnow() + self._substep_time = (utcnow - self._substep_ts).total_seconds() + self._substep_ts = utcnow self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status())) def run(self): @@ -147,8 +148,14 @@ class StepTimer(Thread): if self._paused: self._timer_substep("paused - skip on_time event") else: - self._timer_substep("on_time event") - self._on_time(*self._args, **self._kwargs) + try: + self._timer_substep("on_time event") + self._on_time(*self._args, **self._kwargs) + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})" + .format(self.name, type(ex).__name__, str(ex), "_on_time", + json.dumps(self._args), json.dumps(self._kwargs))) + self._logger.exception(error_msg) self._timer_substep("waiting for next...") self._next.wait() diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index e9cc9cc..5314791 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -169,7 +169,7 @@ class _PolicyWeb(object): @cherrypy.tools.json_out() def catch_up(self): """catch up with all DCAE policies""" - started = str(datetime.now()) + started = str(datetime.utcnow()) req_info = _PolicyWeb._get_request_info(cherrypy.request) audit = Audit(job_name="catch_up", req_message=req_info, headers=cherrypy.request.headers) @@ -193,11 +193,10 @@ class _PolicyWeb(object): PolicyReceiver.shutdown(audit) - health = json.dumps(Audit.health()) - audit.info("policy_handler health: {0}".format(health)) - PolicyWeb.logger.info("policy_handler health: %s", health) + PolicyWeb.logger.info("policy_handler health: {0}" + .format(json.dumps(audit.health(full=True)))) PolicyWeb.logger.info("%s: --------- the end -----------", req_info) - res = str(datetime.now()) + res = str(datetime.utcnow()) audit.info_requested(res) return "goodbye! shutdown requested {0}".format(res) @@ -211,7 +210,7 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - res = Audit.health() + res = audit.health() PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res)) |