diff options
Diffstat (limited to 'policyhandler/onap/audit.py')
-rw-r--r-- | policyhandler/onap/audit.py | 469 |
1 files changed, 311 insertions, 158 deletions
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index a1df861..a007a26 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -1,15 +1,5 @@ -"""generic class to keep track of request handling - from receiving it through reponse and log all the activities - - call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests - - start each outside request with creation of the Audit object - audit = Audit(request_id=None, headers=None, msg=None) -""" - -# org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,15 +16,30 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import os +"""generic class to keep track of request handling + from receiving it through reponse and log all the activities + + call Audit.init("component-name", "path/to/config_file") to init the loggers before any requests + + start each outside request with creation of the Audit object + audit = Audit(request_id=None, headers=None, msg=None) +""" + +import copy import json -import uuid +import os +import re +import subprocess +import sys +import threading import time -import copy -from threading import Lock +import uuid +from datetime import datetime from enum import Enum from .CommonLogger import CommonLogger +from .health import Health +from .process_info import ProcessInfo REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" REQUEST_REMOTE_ADDR = "Remote-Addr" @@ -44,21 +49,31 @@ HOSTNAME = "HOSTNAME" AUDIT_REQUESTID = 'requestID' AUDIT_IPADDRESS = 'IPAddress' AUDIT_SERVER = 'server' +AUDIT_TARGET_ENTITY = 'targetEntity' +AUDIT_METRICS = 'metrics' +AUDIT_TOTAL_STATS = 'audit_total_stats' +METRICS_TOTAL_STATS = 'metrics_total_stats' HEADER_CLIENTAUTH = "clientauth" HEADER_AUTHORIZATION = "authorization" +ERROR_CODE = "errorCode" +ERROR_DESCRIPTION = "errorDescription" + + class AuditHttpCode(Enum): """audit http codes""" HTTP_OK = 200 - DATA_NOT_FOUND_ERROR = 400 PERMISSION_UNAUTHORIZED_ERROR = 401 PERMISSION_FORBIDDEN_ERROR = 403 + RESPONSE_ERROR = 400 + DATA_NOT_FOUND_ERROR = 404 SERVER_INTERNAL_ERROR = 500 SERVICE_UNAVAILABLE_ERROR = 503 DATA_ERROR = 1030 SCHEMA_ERROR = 1040 + class AuditResponseCode(Enum): """audit response codes""" SUCCESS = 0 @@ -72,23 +87,25 @@ class AuditResponseCode(Enum): @staticmethod def get_response_code(http_status_code): """calculates the response_code from max_http_status_code""" + response_code = AuditResponseCode.UNKNOWN_ERROR if http_status_code <= AuditHttpCode.HTTP_OK.value: - return AuditResponseCode.SUCCESS - - if http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, \ - AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: - return AuditResponseCode.PERMISSION_ERROR - if http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: - return AuditResponseCode.AVAILABILITY_ERROR - if http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: - return AuditResponseCode.BUSINESS_PROCESS_ERROR - if http_status_code in [AuditHttpCode.DATA_ERROR.value, \ - AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: - return AuditResponseCode.DATA_ERROR - if http_status_code == AuditHttpCode.SCHEMA_ERROR.value: - return AuditResponseCode.SCHEMA_ERROR - - return AuditResponseCode.UNKNOWN_ERROR + response_code = AuditResponseCode.SUCCESS + + elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, + AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: + response_code = AuditResponseCode.PERMISSION_ERROR + elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + response_code = AuditResponseCode.AVAILABILITY_ERROR + elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: + response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR + elif http_status_code in [AuditHttpCode.DATA_ERROR.value, + AuditHttpCode.RESPONSE_ERROR.value, + AuditHttpCode.DATA_NOT_FOUND_ERROR.value]: + response_code = AuditResponseCode.DATA_ERROR + elif http_status_code == AuditHttpCode.SCHEMA_ERROR.value: + response_code = AuditResponseCode.SCHEMA_ERROR + + return response_code @staticmethod def get_human_text(response_code): @@ -97,7 +114,8 @@ class AuditResponseCode(Enum): return "unknown" return response_code.name.lower().replace("_", " ") -class Audit(object): + +class _Audit(object): """put the audit object on stack per each initiating request in the system :request_id: is the X-ECOMP-RequestID for tracing @@ -109,75 +127,90 @@ class Audit(object): :kwargs: - put any request related params into kwargs """ _service_name = "" - _service_instance_UUID = str(uuid.uuid4()) + _service_version = "" + _service_instance_uuid = str(uuid.uuid4()) + _started = datetime.utcnow() + _key_format = re.compile(r"\W") _logger_debug = None _logger_error = None _logger_metrics = None _logger_audit = None + _health = Health() + _py_ver = sys.version.replace("\n", "") + _packages = [] @staticmethod def init(service_name, config_file_path): """init static invariants and loggers""" - Audit._service_name = service_name - Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=Audit._service_instance_UUID, serviceName=Audit._service_name) - - def __init__(self, request_id=None, req_message=None, aud_parent=None, **kwargs): + _Audit._service_name = service_name + _Audit._logger_debug = CommonLogger(config_file_path, "debug", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_error = CommonLogger(config_file_path, "error", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + _Audit._logger_audit = CommonLogger(config_file_path, "audit", \ + instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + ProcessInfo.init() + try: + _Audit._service_version = subprocess.check_output( + ["python", "setup.py", "--version"], universal_newlines=True).strip() + except subprocess.CalledProcessError: + pass + try: + _Audit._packages = list( + filter(None, subprocess.check_output(["pip", "freeze"], + universal_newlines=True).splitlines())) + except subprocess.CalledProcessError: + pass + + + def health(self, full=False): + """returns json for health check""" + utcnow = datetime.utcnow() + health = { + "server" : { + "service_name" : _Audit._service_name, + "service_version" : _Audit._service_version, + "service_instance_uuid" : _Audit._service_instance_uuid + }, + "runtime" : { + "started" : str(_Audit._started), + "utcnow" : str(utcnow), + "uptime" : str(utcnow - _Audit._started), + "active_threads" : ProcessInfo.active_threads(), + "gc" : ProcessInfo.gc_info(full), + "virtual_memory" : ProcessInfo.virtual_memory(), + "process_memory" : ProcessInfo.process_memory() + }, + "stats" : _Audit._health.dump(), + "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} + } + self.info("{} health: {}".format(_Audit._service_name, json.dumps(health))) + return health + + def process_info(self): + """get the debug info on all the threads and memory""" + process_info = ProcessInfo.get_all() + self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info))) + return process_info + + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): """create audit object per each request in the system + :job_name: is the name of the audit job for health stats :request_id: is the X-ECOMP-RequestID for tracing :req_message: is the request message string for logging - :aud_parent: is the parent Audit - used for sub-query metrics to other systems :kwargs: - put any request related params into kwargs """ + self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) self.request_id = request_id self.req_message = req_message or "" - self.aud_parent = aud_parent self.kwargs = kwargs or {} - self.retry_get_config = False self.max_http_status_code = 0 - self._lock = Lock() - - if self.aud_parent: - if not self.request_id: - self.request_id = self.aud_parent.request_id - if not self.req_message: - self.req_message = self.aud_parent.req_message - self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs) - else: - headers = self.kwargs.get("headers", {}) - if headers: - if not self.request_id: - self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) - if AUDIT_IPADDRESS not in self.kwargs: - self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - - if AUDIT_SERVER not in self.kwargs: - self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) - - created_req = "" - if not self.request_id: - created_req = " with new" - self.request_id = str(uuid.uuid4()) - - self.kwargs[AUDIT_REQUESTID] = self.request_id - - self._started = time.time() - self._start_event = Audit._logger_audit.getStartRecordEvent() - self.metrics_start() + self._lock = threading.Lock() - if not self.aud_parent: - self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ - .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) def merge_all_kwargs(self, **kwargs): """returns the merge of copy of self.kwargs with the param kwargs""" @@ -188,10 +221,15 @@ class Audit(object): def set_http_status_code(self, http_status_code): """accumulate the highest(worst) http status code""" - self._lock.acquire() - if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: - self.max_http_status_code = max(http_status_code, self.max_http_status_code) - self._lock.release() + with self._lock: + if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: + self.max_http_status_code = max(http_status_code, self.max_http_status_code) + + def get_max_http_status_code(self): + """returns the highest(worst) http status code""" + with self._lock: + max_http_status_code = self.max_http_status_code + return max_http_status_code @staticmethod def get_status_code(success): @@ -200,6 +238,71 @@ class Audit(object): return 'COMPLETE' return 'ERROR' + def is_serious_error(self, status_code): + """returns whether the response_code is success and a human text for response code""" + return (AuditResponseCode.PERMISSION_ERROR.value + == AuditResponseCode.get_response_code(status_code).value + or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + def _get_response_status(self): + """calculates the response status fields from max_http_status_code""" + max_http_status_code = self.get_max_http_status_code() + response_code = AuditResponseCode.get_response_code(max_http_status_code) + success = (response_code.value == AuditResponseCode.SUCCESS.value) + response_description = AuditResponseCode.get_human_text(response_code) + return success, max_http_status_code, response_code, response_description + + def is_success(self): + """returns whether the response_code is success and a human text for response code""" + success, _, _, _ = self._get_response_status() + return success + + def debug(self, log_line, **kwargs): + """debug - the debug=lowest level of logging""" + _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + + def info(self, log_line, **kwargs): + """debug - the info level of logging""" + _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + + def info_requested(self, result=None, **kwargs): + """info "requested ..." - the info level of logging""" + self.info("requested {0} {1}".format(self.req_message, result or ""), \ + **self.merge_all_kwargs(**kwargs)) + + def warn(self, log_line, error_code=None, **kwargs): + """debug+error - the warn level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.warn(log_line, **all_kwargs) + _Audit._logger_error.warn(log_line, **all_kwargs) + + def error(self, log_line, error_code=None, **kwargs): + """debug+error - the error level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.error(log_line, **all_kwargs) + _Audit._logger_error.error(log_line, **all_kwargs) + + def fatal(self, log_line, error_code=None, **kwargs): + """debug+error - the fatal level of logging""" + all_kwargs = self.merge_all_kwargs(**kwargs) + + if error_code and isinstance(error_code, AuditResponseCode): + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + + _Audit._logger_debug.fatal(log_line, **all_kwargs) + _Audit._logger_error.fatal(log_line, **all_kwargs) + @staticmethod def hide_secrets(obj): """hides the known secret field values of the dictionary""" @@ -210,7 +313,7 @@ class Audit(object): if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: obj[key] = "*" elif isinstance(obj[key], dict): - obj[key] = Audit.hide_secrets(obj[key]) + obj[key] = _Audit.hide_secrets(obj[key]) return obj @@ -220,101 +323,151 @@ class Audit(object): if not isinstance(obj, dict): return json.dumps(obj, **kwargs) - return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) + return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs) - def get_response_code(self): - """calculates the response_code from max_http_status_code""" - self._lock.acquire() - max_http_status_code = self.max_http_status_code - self._lock.release() - return AuditResponseCode.get_response_code(max_http_status_code) + @staticmethod + def get_elapsed_time(started): + """returns the elapsed time since started in milliseconds""" + return int(round(1000 * (time.time() - (started or 0)))) - def debug(self, log_line, **kwargs): - """debug - the debug=lowest level of logging""" - Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) - def info(self, log_line, **kwargs): - """debug - the info level of logging""" - Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) +class Audit(_Audit): + """Audit class to track the high level operations""" - def info_requested(self, result=None, **kwargs): - """info "requested ..." - the info level of logging""" - self.info("requested {0} {1}".format(self.req_message, result or ""), \ - **self.merge_all_kwargs(**kwargs)) + def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): + """create audit object per each request in the system - def warn(self, log_line, **kwargs): - """debug+error - the warn level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.warn(log_line, **all_kwargs) - Audit._logger_error.warn(log_line, **all_kwargs) + :job_name: is the name of the audit job for health stats + :request_id: is the X-ECOMP-RequestID for tracing + :req_message: is the request message string for logging + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super().__init__(job_name=job_name, + request_id=request_id, + req_message=req_message, + **kwargs) - def error(self, log_line, **kwargs): - """debug+error - the error level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.error(log_line, **all_kwargs) - Audit._logger_error.error(log_line, **all_kwargs) + headers = self.kwargs.get("headers", {}) + if headers: + if not self.request_id: + self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + if AUDIT_IPADDRESS not in self.kwargs: + self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR) + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST) - def fatal(self, log_line, **kwargs): - """debug+error - the fatal level of logging""" + created_req = "" + if not self.request_id: + created_req = " with new" + self.request_id = str(uuid.uuid4()) + + if AUDIT_SERVER not in self.kwargs: + self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME) + + self.kwargs[AUDIT_REQUESTID] = self.request_id + + _Audit._health.start(self.job_name, self.request_id) + _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id) + + self._started = time.time() + self._start_event = Audit._logger_audit.getStartRecordEvent() + + self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\ + .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs))) + + + def audit_done(self, result=None, **kwargs): + """debug+audit - the audit=top level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - Audit._logger_debug.fatal(log_line, **all_kwargs) - Audit._logger_error.fatal(log_line, **all_kwargs) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() + log_line = "{0} {1}".format(self.req_message, result or "").strip() + audit_func = None + timer = _Audit.get_elapsed_time(self._started) + if success: + log_line = "done: {0}".format(log_line) + self.info(log_line, **all_kwargs) + audit_func = _Audit._logger_audit.info + _Audit._health.success(self.job_name, timer, self.request_id) + _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id) + else: + log_line = "failed: {0}".format(log_line) + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + audit_func = _Audit._logger_audit.error + _Audit._health.error(self.job_name, timer, self.request_id) + _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id) + + audit_func(log_line, begTime=self._start_event, timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, + **all_kwargs) + + return (success, max_http_status_code, response_description) + + +class Metrics(_Audit): + """Metrics class to track the calls to outside systems""" + + def __init__(self, aud_parent, **kwargs): + """create audit object per each request in the system + + :aud_parent: is the parent Audit - used for sub-query metrics to other systems + :kwargs: - put any request related params into kwargs + """ + super().__init__(job_name=aud_parent.job_name, + request_id=aud_parent.request_id, + req_message=aud_parent.req_message, + **aud_parent.merge_all_kwargs(**kwargs)) + self.aud_parent = aud_parent + self._metrics_name = _Audit._key_format.sub( + '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name)) + + self._metrics_started = None + self._metrics_start_event = None - @staticmethod - def get_elapsed_time(started): - """returns the elapsed time since started in milliseconds""" - return int(round(1000 * (time.time() - started))) def metrics_start(self, log_line=None, **kwargs): """reset metrics timing""" + self.merge_all_kwargs(**kwargs) self._metrics_started = time.time() - self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent() + self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent() if log_line: self.info(log_line, **self.merge_all_kwargs(**kwargs)) + _Audit._health.start(self._metrics_name, self.request_id) + _Audit._health.start(METRICS_TOTAL_STATS, self.request_id) + def metrics(self, log_line, **kwargs): """debug+metrics - the metrics=sub-audit level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success, max_http_status_code, response_code, response_description = \ + self._get_response_status() metrics_func = None + timer = _Audit.get_elapsed_time(self._metrics_started) if success: log_line = "done: {0}".format(log_line) self.info(log_line, **all_kwargs) - metrics_func = Audit._logger_metrics.info + metrics_func = _Audit._logger_metrics.info + _Audit._health.success(self._metrics_name, timer, self.request_id) + _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id) else: log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) - metrics_func = Audit._logger_metrics.error - - metrics_func(log_line, begTime=self._metrics_start_event, \ - timer=Audit.get_elapsed_time(self._metrics_started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ + self.error(log_line, errorCode=response_code.value, + errorDescription=response_description, **all_kwargs) + metrics_func = _Audit._logger_metrics.error + _Audit._health.error(self._metrics_name, timer, self.request_id) + _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id) + + metrics_func( + log_line, + begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()), + timer=timer, + statusCode=_Audit.get_status_code(success), + responseCode=response_code.value, + responseDescription=response_description, **all_kwargs) - self.metrics_start() - - def audit_done(self, result=None, **kwargs): - """debug+audit - the audit=top level of logging""" - all_kwargs = self.merge_all_kwargs(**kwargs) - response_code = self.get_response_code() - success = (response_code.value == AuditResponseCode.SUCCESS.value) - log_line = "{0} {1}".format(self.req_message, result or "").strip() - audit_func = None - if success: - log_line = "done: {0}".format(log_line) - self.info(log_line, **all_kwargs) - audit_func = Audit._logger_audit.info - else: - log_line = "failed: {0}".format(log_line) - self.error(log_line, errorCode=response_code.value, \ - errorDescription=AuditResponseCode.get_human_text(response_code), **all_kwargs) - audit_func = Audit._logger_audit.error - - audit_func(log_line, begTime=self._start_event, \ - timer=Audit.get_elapsed_time(self._started), \ - statusCode=Audit.get_status_code(success), responseCode=response_code.value, \ - responseDescription=AuditResponseCode.get_human_text(response_code), \ - **all_kwargs) + return (success, max_http_status_code, response_description) |