aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/onap
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-05-10 09:23:16 -0400
committerAlex Shatov <alexs@att.com>2018-05-10 09:23:16 -0400
commitf2d7bef13705812c1bf147c2fb65162fbf385c6b (patch)
treee6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler/onap
parent50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff)
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads of policy-handler to avoid losing the thread and tracking the unexpected crashes - rediscover the deployment-handler if not found before and after each catchup - refactored audit - separated metrics from audit - added more stats and runtime info to healthcheck = gc counts and garbage info if any detected = memory usage - to detect the potential memory leaks = request_id to all stats = stats of active requests - avoid reallocating the whole Queue of policy-updates after catchup = clear of the internal queue under proper lock Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler/onap')
-rw-r--r--policyhandler/onap/audit.py443
-rw-r--r--policyhandler/onap/health.py93
2 files changed, 358 insertions, 178 deletions
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index 08dcd37..48988fe 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -26,16 +26,19 @@
"""
import copy
+import gc
import json
import os
import re
import subprocess
import sys
+import threading
import time
import uuid
from datetime import datetime
from enum import Enum
-from threading import Lock
+
+import psutil
from .CommonLogger import CommonLogger
from .health import Health
@@ -51,10 +54,15 @@ AUDIT_SERVER = 'server'
AUDIT_TARGET_ENTITY = 'targetEntity'
AUDIT_METRICS = 'metrics'
AUDIT_TOTAL_STATS = 'audit_total_stats'
+METRICS_TOTAL_STATS = 'metrics_total_stats'
HEADER_CLIENTAUTH = "clientauth"
HEADER_AUTHORIZATION = "authorization"
+ERROR_CODE = "errorCode"
+ERROR_DESCRIPTION = "errorDescription"
+
+
class AuditHttpCode(Enum):
"""audit http codes"""
HTTP_OK = 200
@@ -67,6 +75,7 @@ class AuditHttpCode(Enum):
DATA_ERROR = 1030
SCHEMA_ERROR = 1040
+
class AuditResponseCode(Enum):
"""audit response codes"""
SUCCESS = 0
@@ -107,7 +116,60 @@ class AuditResponseCode(Enum):
return "unknown"
return response_code.name.lower().replace("_", " ")
-class Audit(object):
+
+class ProcessInfo(object):
+ """static class to calculate process info"""
+ _KILO_SYMBOLS = ('KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB')
+ _KILO_POWERS = {}
+
+ @staticmethod
+ def init():
+ """init static constants"""
+ for i, kilo_symbol in enumerate(ProcessInfo._KILO_SYMBOLS):
+ ProcessInfo._KILO_POWERS[kilo_symbol] = 1 << (i + 1) * 10
+ ProcessInfo._KILO_SYMBOLS = list(reversed(ProcessInfo._KILO_SYMBOLS))
+
+ @staticmethod
+ def bytes_to_human(byte_count):
+ """converts byte count to human value in kilo-powers"""
+ for kilo_symbol in ProcessInfo._KILO_SYMBOLS:
+ kilo_power = ProcessInfo._KILO_POWERS[kilo_symbol]
+ if byte_count >= kilo_power:
+ value = float(byte_count) / kilo_power
+ return '%.1f%s' % (value, kilo_symbol)
+ return "%sB" % byte_count
+
+ @staticmethod
+ def mem_info():
+ """calculates the memory usage of the current process"""
+ process = psutil.Process()
+ with process.oneshot():
+ mem = process.memory_full_info()
+ return {
+ "uss" : ProcessInfo.bytes_to_human(mem.uss),
+ "rss" : ProcessInfo.bytes_to_human(mem.rss),
+ "swap" : ProcessInfo.bytes_to_human(getattr(mem, "swap", 0)),
+ "pss" : ProcessInfo.bytes_to_human(getattr(mem, "pss", 0))
+ }
+
+
+ @staticmethod
+ def gc_info(full=False):
+ """gets info from garbage collector"""
+ gc_info = {
+ "gc_count" : str(gc.get_count()),
+ "gc_threshold" : str(gc.get_threshold())
+ }
+ try:
+ if gc.garbage:
+ gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage]
+ if full else len(gc.garbage))
+ except Exception:
+ pass
+ return gc_info
+
+
+class _Audit(object):
"""put the audit object on stack per each initiating request in the system
:request_id: is the X-ECOMP-RequestID for tracing
@@ -121,7 +183,7 @@ class Audit(object):
_service_name = ""
_service_version = ""
_service_instance_uuid = str(uuid.uuid4())
- _started = datetime.now()
+ _started = datetime.utcnow()
_key_format = re.compile(r"\W")
_logger_debug = None
_logger_error = None
@@ -129,95 +191,69 @@ class Audit(object):
_logger_audit = None
_health = Health()
_py_ver = sys.version.replace("\n", "")
- try:
- _packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines())
- except subprocess.CalledProcessError:
- _packages = []
+ _packages = []
@staticmethod
def init(service_name, service_version, config_file_path):
"""init static invariants and loggers"""
- Audit._service_name = service_name
- Audit._service_version = service_version
- Audit._logger_debug = CommonLogger(config_file_path, "debug", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_error = CommonLogger(config_file_path, "error", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_audit = CommonLogger(config_file_path, "audit", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
-
- @staticmethod
- def health():
+ _Audit._service_name = service_name
+ _Audit._service_version = service_version
+ _Audit._logger_debug = CommonLogger(config_file_path, "debug", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_error = CommonLogger(config_file_path, "error", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_audit = CommonLogger(config_file_path, "audit", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ ProcessInfo.init()
+ try:
+ _Audit._packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines())
+ except subprocess.CalledProcessError:
+ pass
+
+
+ def health(self, full=False):
"""returns json for health check"""
- now = datetime.now()
- return {
- "service_name" : Audit._service_name,
- "service_version" : Audit._service_version,
- "service_instance_uuid" : Audit._service_instance_uuid,
- "python" : Audit._py_ver,
- "started" : str(Audit._started),
- "now" : str(now),
- "uptime" : str(now - Audit._started),
- "stats" : Audit._health.dump(),
- "packages" : Audit._packages
+ utcnow = datetime.utcnow()
+ health = {
+ "server" : {
+ "service_name" : _Audit._service_name,
+ "service_version" : _Audit._service_version,
+ "service_instance_uuid" : _Audit._service_instance_uuid
+ },
+ "runtime" : {
+ "started" : str(_Audit._started),
+ "utcnow" : str(utcnow),
+ "uptime" : str(utcnow - _Audit._started),
+ "active_threads" : sorted([thr.name for thr in threading.enumerate()]),
+ "gc" : ProcessInfo.gc_info(full),
+ "mem_info" : ProcessInfo.mem_info()
+ },
+ "stats" : _Audit._health.dump(),
+ "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages}
}
+ health_txt = "{} health: {}".format(_Audit._service_name, json.dumps(health))
+ self.info(health_txt)
+ return health
+
- def __init__(self, job_name=None, request_id=None, req_message=None, aud_parent=None, **kwargs):
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
"""create audit object per each request in the system
:job_name: is the name of the audit job for health stats
:request_id: is the X-ECOMP-RequestID for tracing
:req_message: is the request message string for logging
- :aud_parent: is the parent Audit - used for sub-query metrics to other systems
:kwargs: - put any request related params into kwargs
"""
- self.job_name = Audit._key_format.sub('_', job_name or req_message or Audit._service_name)
+ self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
self.request_id = request_id
self.req_message = req_message or ""
- self.aud_parent = aud_parent
self.kwargs = kwargs or {}
- self.retry_get_config = False
self.max_http_status_code = 0
- self._lock = Lock()
-
- if self.aud_parent:
- self.job_name = Audit._key_format.sub(
- '_', job_name or self.aud_parent.job_name or Audit._service_name)
- if not self.request_id:
- self.request_id = self.aud_parent.request_id
- if not self.req_message:
- self.req_message = self.aud_parent.req_message
- self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
- else:
- headers = self.kwargs.get("headers", {})
- if headers:
- if not self.request_id:
- self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
- if AUDIT_IPADDRESS not in self.kwargs:
- self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
-
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
-
- created_req = ""
- if not self.request_id:
- created_req = " with new"
- self.request_id = str(uuid.uuid4())
-
- self.kwargs[AUDIT_REQUESTID] = self.request_id
+ self._lock = threading.Lock()
- self._started = time.time()
- self._start_event = Audit._logger_audit.getStartRecordEvent()
- self.metrics_start()
-
- if not self.aud_parent:
- self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
- .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
def merge_all_kwargs(self, **kwargs):
"""returns the merge of copy of self.kwargs with the param kwargs"""
@@ -228,16 +264,14 @@ class Audit(object):
def set_http_status_code(self, http_status_code):
"""accumulate the highest(worst) http status code"""
- self._lock.acquire()
- if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- self.max_http_status_code = max(http_status_code, self.max_http_status_code)
- self._lock.release()
+ with self._lock:
+ if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ self.max_http_status_code = max(http_status_code, self.max_http_status_code)
def get_max_http_status_code(self):
"""returns the highest(worst) http status code"""
- self._lock.acquire()
- max_http_status_code = self.max_http_status_code
- self._lock.release()
+ with self._lock:
+ max_http_status_code = self.max_http_status_code
return max_http_status_code
@staticmethod
@@ -247,33 +281,11 @@ class Audit(object):
return 'COMPLETE'
return 'ERROR'
- @staticmethod
- def hide_secrets(obj):
- """hides the known secret field values of the dictionary"""
- if not isinstance(obj, dict):
- return obj
-
- for key in obj:
- if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
- obj[key] = "*"
- elif isinstance(obj[key], dict):
- obj[key] = Audit.hide_secrets(obj[key])
-
- return obj
-
- @staticmethod
- def log_json_dumps(obj, **kwargs):
- """hide the known secret field values of the dictionary and return json.dumps"""
- if not isinstance(obj, dict):
- return json.dumps(obj, **kwargs)
-
- return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
-
def is_serious_error(self, status_code):
"""returns whether the response_code is success and a human text for response code"""
- return AuditResponseCode.PERMISSION_ERROR.value \
- == AuditResponseCode.get_response_code(status_code).value \
- or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value
+ return (AuditResponseCode.PERMISSION_ERROR.value
+ == AuditResponseCode.get_response_code(status_code).value
+ or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value)
def _get_response_status(self):
"""calculates the response status fields from max_http_status_code"""
@@ -290,104 +302,215 @@ class Audit(object):
def debug(self, log_line, **kwargs):
"""debug - the debug=lowest level of logging"""
- Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
def info(self, log_line, **kwargs):
"""debug - the info level of logging"""
- Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
def info_requested(self, result=None, **kwargs):
"""info "requested ..." - the info level of logging"""
self.info("requested {0} {1}".format(self.req_message, result or ""), \
**self.merge_all_kwargs(**kwargs))
- def warn(self, log_line, **kwargs):
+ def warn(self, log_line, error_code=None, **kwargs):
"""debug+error - the warn level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.warn(log_line, **all_kwargs)
- Audit._logger_error.warn(log_line, **all_kwargs)
- def error(self, log_line, **kwargs):
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.warn(log_line, **all_kwargs)
+ _Audit._logger_error.warn(log_line, **all_kwargs)
+
+ def error(self, log_line, error_code=None, **kwargs):
"""debug+error - the error level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.error(log_line, **all_kwargs)
- Audit._logger_error.error(log_line, **all_kwargs)
- def fatal(self, log_line, **kwargs):
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.error(log_line, **all_kwargs)
+ _Audit._logger_error.error(log_line, **all_kwargs)
+
+ def fatal(self, log_line, error_code=None, **kwargs):
"""debug+error - the fatal level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.fatal(log_line, **all_kwargs)
- Audit._logger_error.fatal(log_line, **all_kwargs)
+
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.fatal(log_line, **all_kwargs)
+ _Audit._logger_error.fatal(log_line, **all_kwargs)
+
+ @staticmethod
+ def hide_secrets(obj):
+ """hides the known secret field values of the dictionary"""
+ if not isinstance(obj, dict):
+ return obj
+
+ for key in obj:
+ if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
+ obj[key] = "*"
+ elif isinstance(obj[key], dict):
+ obj[key] = _Audit.hide_secrets(obj[key])
+
+ return obj
+
+ @staticmethod
+ def log_json_dumps(obj, **kwargs):
+ """hide the known secret field values of the dictionary and return json.dumps"""
+ if not isinstance(obj, dict):
+ return json.dumps(obj, **kwargs)
+
+ return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
@staticmethod
def get_elapsed_time(started):
"""returns the elapsed time since started in milliseconds"""
- return int(round(1000 * (time.time() - started)))
+ return int(round(1000 * (time.time() - (started or 0))))
- def metrics_start(self, log_line=None, **kwargs):
- """reset metrics timing"""
- self._metrics_started = time.time()
- self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
- if log_line:
- self.info(log_line, **self.merge_all_kwargs(**kwargs))
- def metrics(self, log_line, **kwargs):
- """debug+metrics - the metrics=sub-audit level of logging"""
+class Audit(_Audit):
+ """Audit class to track the high level operations"""
+
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
+ """create audit object per each request in the system
+
+ :job_name: is the name of the audit job for health stats
+ :request_id: is the X-ECOMP-RequestID for tracing
+ :req_message: is the request message string for logging
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ super(Audit, self).__init__(job_name=job_name,
+ request_id=request_id,
+ req_message=req_message,
+ **kwargs)
+
+ headers = self.kwargs.get("headers", {})
+ if headers:
+ if not self.request_id:
+ self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+ if AUDIT_IPADDRESS not in self.kwargs:
+ self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
+
+ created_req = ""
+ if not self.request_id:
+ created_req = " with new"
+ self.request_id = str(uuid.uuid4())
+
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
+
+ self.kwargs[AUDIT_REQUESTID] = self.request_id
+
+ _Audit._health.start(self.job_name, self.request_id)
+ _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id)
+
+ self._started = time.time()
+ self._start_event = Audit._logger_audit.getStartRecordEvent()
+
+ self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
+ .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+
+
+ def audit_done(self, result=None, **kwargs):
+ """debug+audit - the audit=top level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
success, max_http_status_code, response_code, response_description = \
self._get_response_status()
- metrics_func = None
- timer = Audit.get_elapsed_time(self._metrics_started)
- metrics_job = Audit._key_format.sub(
- '_', all_kwargs.get(AUDIT_TARGET_ENTITY, AUDIT_METRICS + "_" + self.job_name))
+ log_line = "{0} {1}".format(self.req_message, result or "").strip()
+ audit_func = None
+ timer = _Audit.get_elapsed_time(self._started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
- metrics_func = Audit._logger_metrics.info
- Audit._health.success(metrics_job, timer)
+ audit_func = _Audit._logger_audit.info
+ _Audit._health.success(self.job_name, timer, self.request_id)
+ _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id)
else:
log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value, \
- errorDescription=response_description, **all_kwargs)
- metrics_func = Audit._logger_metrics.error
- Audit._health.error(metrics_job, timer)
-
- metrics_func(log_line, begTime=self._metrics_start_event, timer=timer,
- statusCode=Audit.get_status_code(success), responseCode=response_code.value,
- responseDescription=response_description,
- **all_kwargs
- )
-
- self.metrics_start()
+ self.error(log_line, errorCode=response_code.value,
+ errorDescription=response_description, **all_kwargs)
+ audit_func = _Audit._logger_audit.error
+ _Audit._health.error(self.job_name, timer, self.request_id)
+ _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id)
+
+ audit_func(log_line, begTime=self._start_event, timer=timer,
+ statusCode=_Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs)
+
return (success, max_http_status_code, response_description)
- def audit_done(self, result=None, **kwargs):
- """debug+audit - the audit=top level of logging"""
+
+class Metrics(_Audit):
+ """Metrics class to track the calls to outside systems"""
+
+ def __init__(self, aud_parent, **kwargs):
+ """create audit object per each request in the system
+
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ super(Metrics, self).__init__(job_name=aud_parent.job_name,
+ request_id=aud_parent.request_id,
+ req_message=aud_parent.req_message,
+ **aud_parent.merge_all_kwargs(**kwargs))
+ self.aud_parent = aud_parent
+ self._metrics_name = _Audit._key_format.sub(
+ '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name))
+
+ self._metrics_started = None
+ self._metrics_start_event = None
+
+
+ def metrics_start(self, log_line=None, **kwargs):
+ """reset metrics timing"""
+ self.merge_all_kwargs(**kwargs)
+ self._metrics_started = time.time()
+ self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent()
+ if log_line:
+ self.info(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._health.start(self._metrics_name, self.request_id)
+ _Audit._health.start(METRICS_TOTAL_STATS, self.request_id)
+
+
+ def metrics(self, log_line, **kwargs):
+ """debug+metrics - the metrics=sub-audit level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
success, max_http_status_code, response_code, response_description = \
self._get_response_status()
- log_line = "{0} {1}".format(self.req_message, result or "").strip()
- audit_func = None
- timer = Audit.get_elapsed_time(self._started)
+ metrics_func = None
+ timer = _Audit.get_elapsed_time(self._metrics_started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
- audit_func = Audit._logger_audit.info
- Audit._health.success(self.job_name, timer)
- Audit._health.success(AUDIT_TOTAL_STATS, timer)
+ metrics_func = _Audit._logger_metrics.info
+ _Audit._health.success(self._metrics_name, timer, self.request_id)
+ _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id)
else:
log_line = "failed: {0}".format(log_line)
self.error(log_line, errorCode=response_code.value,
errorDescription=response_description, **all_kwargs)
- audit_func = Audit._logger_audit.error
- Audit._health.error(self.job_name, timer)
- Audit._health.error(AUDIT_TOTAL_STATS, timer)
-
- audit_func(log_line, begTime=self._start_event, timer=timer,
- statusCode=Audit.get_status_code(success),
- responseCode=response_code.value,
- responseDescription=response_description,
- **all_kwargs
- )
+ metrics_func = _Audit._logger_metrics.error
+ _Audit._health.error(self._metrics_name, timer, self.request_id)
+ _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id)
+
+ metrics_func(
+ log_line,
+ begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()),
+ timer=timer,
+ statusCode=_Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs)
return (success, max_http_status_code, response_description)
diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py
index 485f422..e6a6f69 100644
--- a/policyhandler/onap/health.py
+++ b/policyhandler/onap/health.py
@@ -28,46 +28,95 @@ class HealthStats(object):
"""keep track of stats for metrics calls"""
self._name = name or "stats_" + str(uuid.uuid4())
self._lock = Lock()
+
self._call_count = 0
self._error_count = 0
+ self._active_count = 0
+
self._longest_timer = 0
self._total_timer = 0
+
self._last_success = None
self._last_error = None
+ self._last_start = None
+ self._longest_end_ts = None
+
+ self._last_success_request_id = None
+ self._last_error_request_id = None
+ self._last_started_request_id = None
+ self._longest_request_id = None
+
def dump(self):
"""returns dict of stats"""
dump = None
with self._lock:
dump = {
- "call_count" : self._call_count,
- "error_count" : self._error_count,
- "last_success" : str(self._last_success),
- "last_error" : str(self._last_error),
- "longest_timer_millisecs" : self._longest_timer,
- "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \
- if self._call_count else 0)
+ "total" : {
+ "call_count" : self._call_count,
+ "ave_timer_millisecs" : (float(self._total_timer)/self._call_count
+ if self._call_count else 0)
+ },
+ "success" : {
+ "success_count" : (self._call_count - self._error_count),
+ "last_success" : str(self._last_success),
+ "last_success_request_id" : self._last_success_request_id
+ },
+ "error" : {
+ "error_count" : self._error_count,
+ "last_error" : str(self._last_error),
+ "last_error_request_id" : self._last_error_request_id
+ },
+ "active" : {
+ "active_count" : self._active_count,
+ "last_start" : str(self._last_start),
+ "last_started_request_id" : self._last_started_request_id
+ },
+ "longest" : {
+ "longest_timer_millisecs" : self._longest_timer,
+ "longest_request_id" : self._longest_request_id,
+ "longest_end" : str(self._longest_end_ts)
+ }
}
return dump
- def success(self, timer):
+
+ def start(self, request_id=None):
+ """records the start of active execution"""
+ with self._lock:
+ self._active_count += 1
+ self._last_start = datetime.utcnow()
+ self._last_started_request_id = request_id
+
+
+ def success(self, timer, request_id=None):
"""records the successful execution"""
with self._lock:
+ self._active_count -= 1
self._call_count += 1
- self._last_success = datetime.now()
+ self._last_success = datetime.utcnow()
+ self._last_success_request_id = request_id
self._total_timer += timer
if not self._longest_timer or self._longest_timer < timer:
self._longest_timer = timer
+ self._longest_request_id = request_id
+ self._longest_end_ts = self._last_success
+
- def error(self, timer):
+ def error(self, timer, request_id=None):
"""records the errored execution"""
with self._lock:
+ self._active_count -= 1
self._call_count += 1
self._error_count += 1
- self._last_error = datetime.now()
+ self._last_error = datetime.utcnow()
+ self._last_error_request_id = request_id
self._total_timer += timer
if not self._longest_timer or self._longest_timer < timer:
self._longest_timer = timer
+ self._longest_request_id = request_id
+ self._longest_end_ts = self._last_error
+
class Health(object):
"""Health stats for multiple requests"""
@@ -76,28 +125,36 @@ class Health(object):
self._all_stats = {}
self._lock = Lock()
+
def _add_or_get_stats(self, stats_name):
"""add to or get from the ever growing dict of HealthStats"""
- stats = None
with self._lock:
stats = self._all_stats.get(stats_name)
if not stats:
self._all_stats[stats_name] = stats = HealthStats(stats_name)
- return stats
+ return stats
+
+
+ def start(self, stats_name, request_id=None):
+ """records the start of execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.start(request_id)
+
- def success(self, stats_name, timer):
+ def success(self, stats_name, timer, request_id=None):
"""records the successful execution on stats_name"""
stats = self._add_or_get_stats(stats_name)
- stats.success(timer)
+ stats.success(timer, request_id)
+
- def error(self, stats_name, timer):
+ def error(self, stats_name, timer, request_id=None):
"""records the error execution on stats_name"""
stats = self._add_or_get_stats(stats_name)
- stats.error(timer)
+ stats.error(timer, request_id)
+
def dump(self):
"""returns dict of stats"""
with self._lock:
stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems())
-
return stats