diff options
author | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-05-10 09:23:16 -0400 |
commit | f2d7bef13705812c1bf147c2fb65162fbf385c6b (patch) | |
tree | e6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler/onap/audit.py | |
parent | 50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff) |
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads
of policy-handler to avoid losing the thread and tracking
the unexpected crashes
- rediscover the deployment-handler if not found before
and after each catchup
- refactored audit - separated metrics from audit
- added more stats and runtime info to healthcheck
= gc counts and garbage info if any detected
= memory usage - to detect the potential memory leaks
= request_id to all stats
= stats of active requests
- avoid reallocating the whole Queue of policy-updates after catchup
= clear of the internal queue under proper lock
Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler/onap/audit.py')
-rw-r--r-- | policyhandler/onap/audit.py | 443 |
1 files changed, 283 insertions, 160 deletions
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index 08dcd37..48988fe 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -26,16 +26,19 @@ """ import copy +import gc import json import os import re import subprocess import sys +import threading import time import uuid from datetime import datetime from enum import Enum -from threading import Lock + +import psutil from .CommonLogger import CommonLogger from .health import Health @@ -51,10 +54,15 @@ AUDIT_SERVER = 'server' AUDIT_TARGET_ENTITY = 'targetEntity' AUDIT_METRICS = 'metrics' AUDIT_TOTAL_STATS = 'audit_total_stats' +METRICS_TOTAL_STATS = 'metrics_total_stats' HEADER_CLIENTAUTH = "clientauth" HEADER_AUTHORIZATION = "authorization" +ERROR_CODE = "errorCode" +ERROR_DESCRIPTION = "errorDescription" + + class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 @@ -67,6 +75,7 @@ class AuditHttpCode(Enum): DATA_ERROR = 1030 SCHEMA_ERROR = 1040 + class AuditResponseCode(Enum): """audit response codes""" SUCCESS = 0 @@ -107,7 +116,60 @@ class AuditResponseCode(Enum): return "unknown" return response_code.name.lower().replace("_", " ") -class Audit(object): + +class ProcessInfo(object): + """static class to calculate process info""" + _KILO_SYMBOLS = ('KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB') + _KILO_POWERS = {} + + @staticmethod + def init(): + """init static constants""" + for i, kilo_symbol in enumerate(ProcessInfo._KILO_SYMBOLS): + ProcessInfo._KILO_POWERS[kilo_symbol] = 1 << (i + 1) * 10 + ProcessInfo._KILO_SYMBOLS = list(reversed(ProcessInfo._KILO_SYMBOLS)) + + @staticmethod + def bytes_to_human(byte_count): + """converts byte count to human value in kilo-powers""" + for kilo_symbol in ProcessInfo._KILO_SYMBOLS: + kilo_power = ProcessInfo._KILO_POWERS[kilo_symbol] + if byte_count >= kilo_power: + value = float(byte_count) / kilo_power + return '%.1f%s' % (value, kilo_symbol) + return "%sB" % byte_count + + @staticmethod + def mem_info(): + """calculates the memory usage of the current process""" + process = psutil.Process() + with process.oneshot(): + mem = process.memory_full_info() + return { + "uss" : ProcessInfo.bytes_to_human(mem.uss), + "rss" : ProcessInfo.bytes_to_human(mem.rss), + "swap" : ProcessInfo.bytes_to_human(getattr(mem, "swap", 0)), + "pss" : ProcessInfo.bytes_to_human(getattr(mem, "pss", 0)) + } + + + @staticmethod + def gc_info(full=False): + """gets info from garbage collector""" + gc_info = { + "gc_count" : str(gc.get_count()), + "gc_threshold" : str(gc.get_threshold()) + } + try: + if gc.garbage: + gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage] + if full else len(gc.garbage)) + except Exception: + pass + return gc_info + + +class _Audit(object): """put the audit object on stack per each initiating request in the system :request_id: is the X-ECOMP-RequestID for tracing @@ -121,7 +183,7 @@ class Audit(object): _service_name = "" _service_version = "" _service_instance_uuid = str(uuid.uuid4()) - _started = datetime.now() + _started = datetime.utcnow() _key_format = re.compile(r"\W") _logger_debug = None _logger_error = None @@ -129,95 +191,69 @@ class Audit(object): _logger_audit = None _health = Health() _py_ver = sys.version.replace("\n", "") - try: - _packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines()) - except subprocess.CalledProcessError: - _packages = [] + _packages = [] @staticmethod def init(service_name, service_version, config_file_path): """init static invariants and loggers""" - Audit._service_name = service_name - Audit._service_version = service_version - Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name) - - @staticmethod - def health(): + _Audit._service_name = service_name + _Audit._service_version = service_version + _Audit._logger_debug = CommonLogger(config_file_path, "debug", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_error = CommonLogger(config_file_path, "error", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_audit = CommonLogger(config_file_path, "audit", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + ProcessInfo.init() + try: + _Audit._packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines()) + except subprocess.CalledProcessError: + pass + + + def health(self, full=False): """returns json for health check""" - now = datetime.now() - return { - "service_name" : Audit._service_name, - "service_version" : Audit._service_version, - "service_instance_uuid" : Audit._service_instance_uuid, - "python" : Audit._py_ver, - "started" : str(Audit._started), - "now" : str(now), - "uptime" : str(now - Audit._started), - "stats" : Audit._health.dump(), - "packages" : Audit._packages + utcnow = datetime.utcnow() + health = { + "server" : { + "service_name" : _Audit._service_name, + "service_version" : _Audit._service_version, + "service_instance_uuid" : _Audit._service_instance_uuid + }, + "runtime" : { + "started" : str(_Audit._started), + "utcnow" : str(utcnow), + "uptime" : str(utcnow - _Audit._started), + "active_threads" : sorted([thr.name for thr in threading.enumerate()]), + "gc" : ProcessInfo.gc_info(full), + "mem_info" : ProcessInfo.mem_info() + }, + "stats" : _Audit._health.dump(), + "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} } + health_txt = "{} health: {}".format(_Audit._service_name, json.dumps(health)) + self.info(health_txt) + return health + - def __init__(self, job_name=None, request_id=None, req_message=None, aud_parent=None, **kwargs): + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): """create audit object per each request in the system :job_name: is the name of the audit job for health stats :request_id: is the X-ECOMP-RequestID for tracing :req_message: is the request message string for logging - :aud_parent: is the parent Audit - used for sub-query metrics to other systems :kwargs: - put any request related params into kwargs """ - self.job_name = Audit._key_format.sub('_', job_name or req_message or Audit._service_name) + self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) self.request_id = request_id self.req_message = req_message or "" - self.aud_parent = aud_parent self.kwargs = kwargs or {} - self.retry_get_config = False self.max_http_status_code = 0 - self._lock = Lock() - - if self.aud_parent: - self.job_name = Audit._key_format.sub( - '_', job_name or self.aud_parent.job_name or Audit._service_name) - if not self.request_id: - self.request_id = self.aud_parent.request_id - if not self.req_message: - self.req_message = self.aud_parent.req_message - self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) - else: - headers = self.kwargs.get("headers", {}) - if headers: - if not self.request_id: - self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) - if AUDIT_IPADDRESS not in self.kwargs: - self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) - - created_req = "" - if not self.request_id: - created_req = " with new" - self.request_id = str(uuid.uuid4()) - - self.kwargs[AUDIT_REQUESTID] = self.request_id + self._lock = threading.Lock() - self._started = time.time() - self._start_event = Audit._logger_audit.getStartRecordEvent() - self.metrics_start() - - if not self.aud_parent: - self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ - .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) def merge_all_kwargs(self, **kwargs): """returns the merge of copy of self.kwargs with the param kwargs""" @@ -228,16 +264,14 @@ class Audit(object): def set_http_status_code(self, http_status_code): """accumulate the highest(worst) http status code""" - self._lock.acquire() - if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: - self.max_http_status_code = max(http_status_code, self.max_http_status_code) - self._lock.release() + with self._lock: + if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: + self.max_http_status_code = max(http_status_code, self.max_http_status_code) def get_max_http_status_code(self): """returns the highest(worst) http status code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() + with self._lock: + max_http_status_code = self.max_http_status_code return max_http_status_code @staticmethod @@ -247,33 +281,11 @@ class Audit(object): return 'COMPLETE' return 'ERROR' - @staticmethod - def hide_secrets(obj): - """hides the known secret field values of the dictionary""" - if not isinstance(obj, dict): - return obj - - for key in obj: - if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: - obj[key] = "*" - elif isinstance(obj[key], dict): - obj[key] = Audit.hide_secrets(obj[key]) - - return obj - - @staticmethod - def log_json_dumps(obj, **kwargs): - """hide the known secret field values of the dictionary and return json.dumps""" - if not isinstance(obj, dict): - return json.dumps(obj, **kwargs) - - return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - def is_serious_error(self, status_code): """returns whether the response_code is success and a human text for response code""" - return AuditResponseCode.PERMISSION_ERROR.value \ - == AuditResponseCode.get_response_code(status_code).value \ - or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value + return (AuditResponseCode.PERMISSION_ERROR.value + == AuditResponseCode.get_response_code(status_code).value + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value) def _get_response_status(self): """calculates the response status fields from max_http_status_code""" @@ -290,104 +302,215 @@ class Audit(object): def debug(self, log_line, **kwargs): """debug - the debug=lowest level of logging""" - Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) def info(self, log_line, **kwargs): """debug - the info level of logging""" - Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) def info_requested(self, result=None, **kwargs): """info "requested ..." - the info level of logging""" self.info("requested {0} {1}".format(self.req_message, result or ""), \ **self.merge_all_kwargs(**kwargs)) - def warn(self, log_line, **kwargs): + def warn(self, log_line, error_code=None, **kwargs): """debug+error - the warn level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.warn(log_line, **all_kwargs) - Audit._logger_error.warn(log_line, **all_kwargs) - def error(self, log_line, **kwargs): + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.warn(log_line, **all_kwargs) + _Audit._logger_error.warn(log_line, **all_kwargs) + + def error(self, log_line, error_code=None, **kwargs): """debug+error - the error level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.error(log_line, **all_kwargs) - Audit._logger_error.error(log_line, **all_kwargs) - def fatal(self, log_line, **kwargs): + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.error(log_line, **all_kwargs) + _Audit._logger_error.error(log_line, **all_kwargs) + + def fatal(self, log_line, error_code=None, **kwargs): """debug+error - the fatal level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.fatal(log_line, **all_kwargs) - Audit._logger_error.fatal(log_line, **all_kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.fatal(log_line, **all_kwargs) + _Audit._logger_error.fatal(log_line, **all_kwargs) + + @staticmethod + def hide_secrets(obj): + """hides the known secret field values of the dictionary""" + if not isinstance(obj, dict): + return obj + + for key in obj: + if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: + obj[key] = "*" + elif isinstance(obj[key], dict): + obj[key] = _Audit.hide_secrets(obj[key]) + + return obj + + @staticmethod + def log_json_dumps(obj, **kwargs): + """hide the known secret field values of the dictionary and return json.dumps""" + if not isinstance(obj, dict): + return json.dumps(obj, **kwargs) + + return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) @staticmethod def get_elapsed_time(started): """returns the elapsed time since started in milliseconds""" - return int(round(1000 * (time.time() - started))) + return int(round(1000 * (time.time() - (started or 0)))) - def metrics_start(self, log_line=None, **kwargs): - """reset metrics timing""" - self._metrics_started = time.time() - self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() - if log_line: - self.info(log_line, **self.merge_all_kwargs(**kwargs)) - def metrics(self, log_line, **kwargs): - """debug+metrics - the metrics=sub-audit level of logging""" +class Audit(_Audit): + """Audit class to track the high level operations""" + + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): + """create audit object per each request in the system + + :job_name: is the name of the audit job for health stats + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super(Audit, self).__init__(job_name=job_name, + request_id=request_id, + req_message=req_message, + **kwargs) + + headers = self.kwargs.get("headers", {}) + if headers: + if not self.request_id: + self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + if AUDIT_IPADDRESS not in self.kwargs: + self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) + + created_req = "" + if not self.request_id: + created_req = " with new" + self.request_id = str(uuid.uuid4()) + + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) + + self.kwargs[AUDIT_REQUESTID] = self.request_id + + _Audit._health.start(self.job_name, self.request_id) + _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id) + + self._started = time.time() + self._start_event = Audit._logger_audit.getStartRecordEvent() + + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + + + def audit_done(self, result=None, **kwargs): + """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) success, max_http_status_code, response_code, response_description = \ self._get_response_status() - metrics_func = None - timer = Audit.get_elapsed_time(self._metrics_started) - metrics_job = Audit._key_format.sub( - '_', all_kwargs.get(AUDIT_TARGET_ENTITY, AUDIT_METRICS + "_" + self.job_name)) + log_line = "{0} {1}".format(self.req_message, result or "").strip() + audit_func = None + timer = _Audit.get_elapsed_time(self._started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) - metrics_func = Audit._logger_metrics.info - Audit._health.success(metrics_job, timer) + audit_func = _Audit._logger_audit.info + _Audit._health.success(self.job_name, timer, self.request_id) + _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id) else: log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=response_description, **all_kwargs) - metrics_func = Audit._logger_metrics.error - Audit._health.error(metrics_job, timer) - - metrics_func(log_line, begTime=self._metrics_start_event, timer=timer, - statusCode=Audit.get_status_code(success), responseCode=response_code.value, - responseDescription=response_description, - **all_kwargs - ) - - self.metrics_start() + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + audit_func = _Audit._logger_audit.error + _Audit._health.error(self.job_name, timer, self.request_id) + _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs) + return (success, max_http_status_code, response_description) - def audit_done(self, result=None, **kwargs): - """debug+audit - the audit=top level of logging""" + +class Metrics(_Audit): + """Metrics class to track the calls to outside systems""" + + def __init__(self, aud_parent, **kwargs): + """create audit object per each request in the system + + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super(Metrics, self).__init__(job_name=aud_parent.job_name, + request_id=aud_parent.request_id, + req_message=aud_parent.req_message, + **aud_parent.merge_all_kwargs(**kwargs)) + self.aud_parent = aud_parent + self._metrics_name = _Audit._key_format.sub( + '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name)) + + self._metrics_started = None + self._metrics_start_event = None + + + def metrics_start(self, log_line=None, **kwargs): + """reset metrics timing""" + self.merge_all_kwargs(**kwargs) + self._metrics_started = time.time() + self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent() + if log_line: + self.info(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._health.start(self._metrics_name, self.request_id) + _Audit._health.start(METRICS_TOTAL_STATS, self.request_id) + + + def metrics(self, log_line, **kwargs): + """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) success, max_http_status_code, response_code, response_description = \ self._get_response_status() - log_line = "{0} {1}".format(self.req_message, result or "").strip() - audit_func = None - timer = Audit.get_elapsed_time(self._started) + metrics_func = None + timer = _Audit.get_elapsed_time(self._metrics_started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) - audit_func = Audit._logger_audit.info - Audit._health.success(self.job_name, timer) - Audit._health.success(AUDIT_TOTAL_STATS, timer) + metrics_func = _Audit._logger_metrics.info + _Audit._health.success(self._metrics_name, timer, self.request_id) + _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id) else: log_line = "failed: {0}".format(log_line) self.error(log_line, errorCode=response_code.value, errorDescription=response_description, **all_kwargs) - audit_func = Audit._logger_audit.error - Audit._health.error(self.job_name, timer) - Audit._health.error(AUDIT_TOTAL_STATS, timer) - - audit_func(log_line, begTime=self._start_event, timer=timer, - statusCode=Audit.get_status_code(success), - responseCode=response_code.value, - responseDescription=response_description, - **all_kwargs - ) + metrics_func = _Audit._logger_metrics.error + _Audit._health.error(self._metrics_name, timer, self.request_id) + _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id) + + metrics_func( + log_line, + begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()), + timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs) return (success, max_http_status_code, response_description) |