diff options
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/onap/audit.py | 69 | ||||
-rw-r--r-- | policyhandler/onap/process_info.py | 152 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 1 | ||||
-rw-r--r-- | policyhandler/policy_utils.py | 3 | ||||
-rw-r--r-- | policyhandler/web_server.py | 1 |
5 files changed, 166 insertions, 60 deletions
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index 48988fe..0aa1c50 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -26,7 +26,6 @@ """ import copy -import gc import json import os import re @@ -38,10 +37,9 @@ import uuid from datetime import datetime from enum import Enum -import psutil - from .CommonLogger import CommonLogger from .health import Health +from .process_info import ProcessInfo REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" REQUEST_REMOTE_ADDR = "Remote-Addr" @@ -117,58 +115,6 @@ class AuditResponseCode(Enum): return response_code.name.lower().replace("_", " ") -class ProcessInfo(object): - """static class to calculate process info""" - _KILO_SYMBOLS = ('KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB') - _KILO_POWERS = {} - - @staticmethod - def init(): - """init static constants""" - for i, kilo_symbol in enumerate(ProcessInfo._KILO_SYMBOLS): - ProcessInfo._KILO_POWERS[kilo_symbol] = 1 << (i + 1) * 10 - ProcessInfo._KILO_SYMBOLS = list(reversed(ProcessInfo._KILO_SYMBOLS)) - - @staticmethod - def bytes_to_human(byte_count): - """converts byte count to human value in kilo-powers""" - for kilo_symbol in ProcessInfo._KILO_SYMBOLS: - kilo_power = ProcessInfo._KILO_POWERS[kilo_symbol] - if byte_count >= kilo_power: - value = float(byte_count) / kilo_power - return '%.1f%s' % (value, kilo_symbol) - return "%sB" % byte_count - - @staticmethod - def mem_info(): - """calculates the memory usage of the current process""" - process = psutil.Process() - with process.oneshot(): - mem = process.memory_full_info() - return { - "uss" : ProcessInfo.bytes_to_human(mem.uss), - "rss" : ProcessInfo.bytes_to_human(mem.rss), - "swap" : ProcessInfo.bytes_to_human(getattr(mem, "swap", 0)), - "pss" : ProcessInfo.bytes_to_human(getattr(mem, "pss", 0)) - } - - - @staticmethod - def gc_info(full=False): - """gets info from garbage collector""" - gc_info = { - "gc_count" : str(gc.get_count()), - "gc_threshold" : str(gc.get_threshold()) - } - try: - if gc.garbage: - gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage] - if full else len(gc.garbage)) - except Exception: - pass - return gc_info - - class _Audit(object): """put the audit object on stack per each initiating request in the system @@ -226,17 +172,22 @@ class _Audit(object): "started" : str(_Audit._started), "utcnow" : str(utcnow), "uptime" : str(utcnow - _Audit._started), - "active_threads" : sorted([thr.name for thr in threading.enumerate()]), + "active_threads" : ProcessInfo.active_threads(), "gc" : ProcessInfo.gc_info(full), - "mem_info" : ProcessInfo.mem_info() + "virtual_memory" : ProcessInfo.virtual_memory(), + "process_memory" : ProcessInfo.process_memory() }, "stats" : _Audit._health.dump(), "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} } - health_txt = "{} health: {}".format(_Audit._service_name, json.dumps(health)) - self.info(health_txt) + self.info("{} health: {}".format(_Audit._service_name, json.dumps(health))) return health + def process_info(self): + """get the debug info on all the threads and memory""" + process_info = ProcessInfo.get_all() + self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info))) + return process_info def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs): """create audit object per each request in the system diff --git a/policyhandler/onap/process_info.py b/policyhandler/onap/process_info.py new file mode 100644 index 0000000..9fb6334 --- /dev/null +++ b/policyhandler/onap/process_info.py @@ -0,0 +1,152 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""generic class to keep get real time info about the current process""" + +import gc +import sys +import threading +import traceback +from functools import wraps + +import psutil + + +def safe_operation(func): + """safequard the function against any exception""" + if not func: + return + + @wraps(func) + def wrapper(*args, **kwargs): + """wrapper around the function""" + try: + return func(*args, **kwargs) + except Exception as ex: + return "%s: %s" % (type(ex).__name__, str(ex)) + return wrapper + + +class ProcessInfo(object): + """static class to calculate process info""" + _BIBYTES_SYMBOLS = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') + _BIBYTES_VALS = {} + _inited = False + _lock = threading.Lock() + + @staticmethod + def init(): + """init static constants""" + if ProcessInfo._inited: + return + with ProcessInfo._lock: + if ProcessInfo._inited: + return + + for i, bibytes_symbol in enumerate(ProcessInfo._BIBYTES_SYMBOLS): + ProcessInfo._BIBYTES_VALS[bibytes_symbol] = 1 << (i + 1) * 10 + ProcessInfo._BIBYTES_SYMBOLS = list(reversed(ProcessInfo._BIBYTES_SYMBOLS)) + ProcessInfo._inited = True + + @staticmethod + def bytes_to_bibytes(byte_count): + """converts byte count to human value in kibi-mebi-gibi-...-bytes""" + if byte_count is None: + return "unknown" + if not byte_count or not isinstance(byte_count, int): + return byte_count + ProcessInfo.init() + + for bibytes_symbol in ProcessInfo._BIBYTES_SYMBOLS: + bibytes_value = ProcessInfo._BIBYTES_VALS[bibytes_symbol] + if byte_count >= bibytes_value: + value = float(byte_count) / bibytes_value + return '%.2f %s' % (value, bibytes_symbol) + return "%s B" % byte_count + + @staticmethod + @safe_operation + def process_memory(): + """calculates the memory usage of the current process""" + process = psutil.Process() + with process.oneshot(): + return dict((k, ProcessInfo.bytes_to_bibytes(v)) + for k, v in process.memory_full_info()._asdict().iteritems()) + + + @staticmethod + @safe_operation + def virtual_memory(): + """calculates the virtual memory usage of the whole vm""" + return dict((k, ProcessInfo.bytes_to_bibytes(v)) + for k, v in psutil.virtual_memory()._asdict().iteritems()) + + + @staticmethod + @safe_operation + def active_threads(): + """list of active threads""" + return sorted([thr.name + "(" + str(thr.ident) + ")" for thr in threading.enumerate()]) + + + @staticmethod + @safe_operation + def thread_stacks(): + """returns the current threads with their stack""" + thread_names = dict((thr.ident, thr.name) for thr in threading.enumerate()) + return [ + { + "thread_id" : thread_id, + "thread_name" : thread_names.get(thread_id), + "thread_stack" : [ + { + "filename" : filename, + "lineno" : lineno, + "function" : function_name, + "line" : line.strip() if line else None + } + for filename, lineno, function_name, line in traceback.extract_stack(stack) + ] + } + for thread_id, stack in sys._current_frames().items() + ] + + + @staticmethod + @safe_operation + def gc_info(full=False): + """gets info from garbage collector""" + gc_info = { + "gc_count" : str(gc.get_count()), + "gc_threshold" : str(gc.get_threshold()) + } + if gc.garbage: + gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage] + if full else len(gc.garbage)) + return gc_info + + @staticmethod + def get_all(): + """all info""" + return { + "active_threads" : ProcessInfo.active_threads(), + "gc" : ProcessInfo.gc_info(full=True), + "process_memory" : ProcessInfo.process_memory(), + "virtual_memory" : ProcessInfo.virtual_memory(), + "thread_stacks" : ProcessInfo.thread_stacks() + } diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 38ce93a..9f24d3d 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -236,6 +236,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True))) + PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) return success diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index 69978b6..c96d4f6 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -134,7 +134,8 @@ class Utils(object): try: return json.loads(json_str) except (ValueError, TypeError) as err: - Utils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) + Utils._logger.warn("unexpected json error(%s): len(%s) str[:100]: (%s)", + str(err), len(json_str), str(json_str)[:100]) return json_str @staticmethod diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 5314791..041a442 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -198,6 +198,7 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s: --------- the end -----------", req_info) res = str(datetime.utcnow()) audit.info_requested(res) + PolicyWeb.logger.info("process_info: %s", json.dumps(audit.process_info())) return "goodbye! shutdown requested {0}".format(res) @cherrypy.expose |