aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-05-10 09:23:16 -0400
committerAlex Shatov <alexs@att.com>2018-05-10 09:23:16 -0400
commitf2d7bef13705812c1bf147c2fb65162fbf385c6b (patch)
treee6efb8e25287576a48952942aacdb3cf84a825ff /policyhandler
parent50bed534083c96cbf1f8fa4e220cb2b00dff9621 (diff)
2.4.3 policy-handler - try-catch top Exceptions
- added try-except for top level Exception into all threads of policy-handler to avoid losing the thread and tracking the unexpected crashes - rediscover the deployment-handler if not found before and after each catchup - refactored audit - separated metrics from audit - added more stats and runtime info to healthcheck = gc counts and garbage info if any detected = memory usage - to detect the potential memory leaks = request_id to all stats = stats of active requests - avoid reallocating the whole Queue of policy-updates after catchup = clear of the internal queue under proper lock Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-483
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/deploy_handler.py85
-rw-r--r--policyhandler/onap/audit.py443
-rw-r--r--policyhandler/onap/health.py93
-rw-r--r--policyhandler/policy_receiver.py66
-rw-r--r--policyhandler/policy_rest.py560
-rw-r--r--policyhandler/policy_updater.py154
-rw-r--r--policyhandler/step_timer.py27
-rw-r--r--policyhandler/web_server.py11
8 files changed, 863 insertions, 576 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 139e660..4ea5ad1 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -26,7 +26,7 @@ import requests
from .config import Config
from .customize import CustomizerUser
from .discovery import DiscoveryClient
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode
+from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
POOL_SIZE = 1
@@ -44,27 +44,27 @@ class DeployHandler(object):
_server_instance_uuid = None
@staticmethod
- def _lazy_init(audit):
+ def _lazy_init(audit, rediscover=False):
""" set static properties """
- if DeployHandler._lazy_inited:
+ if DeployHandler._lazy_inited and not rediscover:
return
- DeployHandler._lazy_inited = True
- DeployHandler._custom_kwargs = CustomizerUser.get_customizer() \
- .get_deploy_handler_kwargs(audit)
- if not DeployHandler._custom_kwargs \
- or not isinstance(DeployHandler._custom_kwargs, dict):
+ DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
+ .get_deploy_handler_kwargs(audit))
+ if (not DeployHandler._custom_kwargs
+ or not isinstance(DeployHandler._custom_kwargs, dict)):
DeployHandler._custom_kwargs = {}
- DeployHandler._requests_session = requests.Session()
- DeployHandler._requests_session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
- )
- DeployHandler._requests_session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
- )
+ if not DeployHandler._requests_session:
+ DeployHandler._requests_session = requests.Session()
+ DeployHandler._requests_session.mount(
+ 'https://',
+ requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ )
+ DeployHandler._requests_session.mount(
+ 'http://',
+ requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ )
config_dh = Config.config.get("deploy_handler")
if config_dh and isinstance(config_dh, dict):
@@ -77,7 +77,7 @@ class DeployHandler(object):
DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
DeployHandler._url = config_dh.get("url")
DeployHandler._logger.info("dns based routing to %s: url(%s)",
- DeployHandler._target_entity, DeployHandler._url)
+ DeployHandler._target_entity, DeployHandler._url)
if not DeployHandler._url:
# discover routing to deployment-handler at consul-services
@@ -85,14 +85,18 @@ class DeployHandler(object):
# config for policy-handler <= 2.3.1
# "deploy_handler" : "deployment_handler"
DeployHandler._target_entity = str(config_dh or "deployment_handler")
- DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity)
+ DeployHandler._url = DiscoveryClient.get_service_url(audit,
+ DeployHandler._target_entity)
DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
DeployHandler._logger.info(
"got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
+ DeployHandler._lazy_inited = bool(DeployHandler._url)
+
+
@staticmethod
- def policy_update(audit, message):
+ def policy_update(audit, message, rediscover=False):
"""
post policy_updated message to deploy-handler
@@ -101,10 +105,10 @@ class DeployHandler(object):
if not message:
return
- DeployHandler._lazy_init(audit)
- sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity,
- targetServiceName=DeployHandler._url_policy)
- headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
+ DeployHandler._lazy_init(audit, rediscover)
+ metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
+ targetServiceName=DeployHandler._url_policy)
+ headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
msg_str = json.dumps(message)
headers_str = json.dumps(headers)
@@ -114,14 +118,14 @@ class DeployHandler(object):
log_data = " msg={0} headers={1}".format(msg_str, headers_str)
log_line = log_action + log_data
DeployHandler._logger.info(log_line)
- sub_aud.metrics_start(log_line)
+ metrics.metrics_start(log_line)
if not DeployHandler._url:
error_msg = "no url found to {0}".format(log_line)
DeployHandler._logger.error(error_msg)
- sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
- sub_aud.metrics(error_msg)
+ metrics.metrics(error_msg)
return
res = None
@@ -130,25 +134,30 @@ class DeployHandler(object):
DeployHandler._url_policy, json=message, headers=headers,
**DeployHandler._custom_kwargs
)
- except requests.exceptions.RequestException as ex:
- error_msg = "failed to {0}: {1}{2}".format(log_action, str(ex), log_data)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed to {0} {1}: {2}{3}"
+ .format(log_action, type(ex).__name__, str(ex), log_data))
DeployHandler._logger.exception(error_msg)
- sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
- audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
- sub_aud.metrics(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
return
- sub_aud.set_http_status_code(res.status_code)
+ metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
log_line = "response {0} from {1}: text={2}{3}" \
.format(res.status_code, log_action, res.text, log_data)
- sub_aud.metrics(log_line)
- DeployHandler._logger.info(log_line)
+ metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
+ DeployHandler._logger.error(log_line)
return
+ DeployHandler._logger.info(log_line)
result = res.json() or {}
prev_server_instance_uuid = DeployHandler._server_instance_uuid
DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
@@ -156,9 +165,9 @@ class DeployHandler(object):
deployment_handler_changed = (prev_server_instance_uuid
and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
if deployment_handler_changed:
- log_line = "deployment_handler_changed: {1} != {0}" \
- .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)
- sub_aud.info(log_line)
+ log_line = ("deployment_handler_changed: {1} != {0}"
+ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
+ metrics.info(log_line)
DeployHandler._logger.info(log_line)
return deployment_handler_changed
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index 08dcd37..48988fe 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -26,16 +26,19 @@
"""
import copy
+import gc
import json
import os
import re
import subprocess
import sys
+import threading
import time
import uuid
from datetime import datetime
from enum import Enum
-from threading import Lock
+
+import psutil
from .CommonLogger import CommonLogger
from .health import Health
@@ -51,10 +54,15 @@ AUDIT_SERVER = 'server'
AUDIT_TARGET_ENTITY = 'targetEntity'
AUDIT_METRICS = 'metrics'
AUDIT_TOTAL_STATS = 'audit_total_stats'
+METRICS_TOTAL_STATS = 'metrics_total_stats'
HEADER_CLIENTAUTH = "clientauth"
HEADER_AUTHORIZATION = "authorization"
+ERROR_CODE = "errorCode"
+ERROR_DESCRIPTION = "errorDescription"
+
+
class AuditHttpCode(Enum):
"""audit http codes"""
HTTP_OK = 200
@@ -67,6 +75,7 @@ class AuditHttpCode(Enum):
DATA_ERROR = 1030
SCHEMA_ERROR = 1040
+
class AuditResponseCode(Enum):
"""audit response codes"""
SUCCESS = 0
@@ -107,7 +116,60 @@ class AuditResponseCode(Enum):
return "unknown"
return response_code.name.lower().replace("_", " ")
-class Audit(object):
+
+class ProcessInfo(object):
+ """static class to calculate process info"""
+ _KILO_SYMBOLS = ('KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB')
+ _KILO_POWERS = {}
+
+ @staticmethod
+ def init():
+ """init static constants"""
+ for i, kilo_symbol in enumerate(ProcessInfo._KILO_SYMBOLS):
+ ProcessInfo._KILO_POWERS[kilo_symbol] = 1 << (i + 1) * 10
+ ProcessInfo._KILO_SYMBOLS = list(reversed(ProcessInfo._KILO_SYMBOLS))
+
+ @staticmethod
+ def bytes_to_human(byte_count):
+ """converts byte count to human value in kilo-powers"""
+ for kilo_symbol in ProcessInfo._KILO_SYMBOLS:
+ kilo_power = ProcessInfo._KILO_POWERS[kilo_symbol]
+ if byte_count >= kilo_power:
+ value = float(byte_count) / kilo_power
+ return '%.1f%s' % (value, kilo_symbol)
+ return "%sB" % byte_count
+
+ @staticmethod
+ def mem_info():
+ """calculates the memory usage of the current process"""
+ process = psutil.Process()
+ with process.oneshot():
+ mem = process.memory_full_info()
+ return {
+ "uss" : ProcessInfo.bytes_to_human(mem.uss),
+ "rss" : ProcessInfo.bytes_to_human(mem.rss),
+ "swap" : ProcessInfo.bytes_to_human(getattr(mem, "swap", 0)),
+ "pss" : ProcessInfo.bytes_to_human(getattr(mem, "pss", 0))
+ }
+
+
+ @staticmethod
+ def gc_info(full=False):
+ """gets info from garbage collector"""
+ gc_info = {
+ "gc_count" : str(gc.get_count()),
+ "gc_threshold" : str(gc.get_threshold())
+ }
+ try:
+ if gc.garbage:
+ gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage]
+ if full else len(gc.garbage))
+ except Exception:
+ pass
+ return gc_info
+
+
+class _Audit(object):
"""put the audit object on stack per each initiating request in the system
:request_id: is the X-ECOMP-RequestID for tracing
@@ -121,7 +183,7 @@ class Audit(object):
_service_name = ""
_service_version = ""
_service_instance_uuid = str(uuid.uuid4())
- _started = datetime.now()
+ _started = datetime.utcnow()
_key_format = re.compile(r"\W")
_logger_debug = None
_logger_error = None
@@ -129,95 +191,69 @@ class Audit(object):
_logger_audit = None
_health = Health()
_py_ver = sys.version.replace("\n", "")
- try:
- _packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines())
- except subprocess.CalledProcessError:
- _packages = []
+ _packages = []
@staticmethod
def init(service_name, service_version, config_file_path):
"""init static invariants and loggers"""
- Audit._service_name = service_name
- Audit._service_version = service_version
- Audit._logger_debug = CommonLogger(config_file_path, "debug", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_error = CommonLogger(config_file_path, "error", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
- Audit._logger_audit = CommonLogger(config_file_path, "audit", \
- instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
-
- @staticmethod
- def health():
+ _Audit._service_name = service_name
+ _Audit._service_version = service_version
+ _Audit._logger_debug = CommonLogger(config_file_path, "debug", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_error = CommonLogger(config_file_path, "error", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ _Audit._logger_audit = CommonLogger(config_file_path, "audit", \
+ instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ ProcessInfo.init()
+ try:
+ _Audit._packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines())
+ except subprocess.CalledProcessError:
+ pass
+
+
+ def health(self, full=False):
"""returns json for health check"""
- now = datetime.now()
- return {
- "service_name" : Audit._service_name,
- "service_version" : Audit._service_version,
- "service_instance_uuid" : Audit._service_instance_uuid,
- "python" : Audit._py_ver,
- "started" : str(Audit._started),
- "now" : str(now),
- "uptime" : str(now - Audit._started),
- "stats" : Audit._health.dump(),
- "packages" : Audit._packages
+ utcnow = datetime.utcnow()
+ health = {
+ "server" : {
+ "service_name" : _Audit._service_name,
+ "service_version" : _Audit._service_version,
+ "service_instance_uuid" : _Audit._service_instance_uuid
+ },
+ "runtime" : {
+ "started" : str(_Audit._started),
+ "utcnow" : str(utcnow),
+ "uptime" : str(utcnow - _Audit._started),
+ "active_threads" : sorted([thr.name for thr in threading.enumerate()]),
+ "gc" : ProcessInfo.gc_info(full),
+ "mem_info" : ProcessInfo.mem_info()
+ },
+ "stats" : _Audit._health.dump(),
+ "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages}
}
+ health_txt = "{} health: {}".format(_Audit._service_name, json.dumps(health))
+ self.info(health_txt)
+ return health
+
- def __init__(self, job_name=None, request_id=None, req_message=None, aud_parent=None, **kwargs):
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
"""create audit object per each request in the system
:job_name: is the name of the audit job for health stats
:request_id: is the X-ECOMP-RequestID for tracing
:req_message: is the request message string for logging
- :aud_parent: is the parent Audit - used for sub-query metrics to other systems
:kwargs: - put any request related params into kwargs
"""
- self.job_name = Audit._key_format.sub('_', job_name or req_message or Audit._service_name)
+ self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
self.request_id = request_id
self.req_message = req_message or ""
- self.aud_parent = aud_parent
self.kwargs = kwargs or {}
- self.retry_get_config = False
self.max_http_status_code = 0
- self._lock = Lock()
-
- if self.aud_parent:
- self.job_name = Audit._key_format.sub(
- '_', job_name or self.aud_parent.job_name or Audit._service_name)
- if not self.request_id:
- self.request_id = self.aud_parent.request_id
- if not self.req_message:
- self.req_message = self.aud_parent.req_message
- self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
- else:
- headers = self.kwargs.get("headers", {})
- if headers:
- if not self.request_id:
- self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
- if AUDIT_IPADDRESS not in self.kwargs:
- self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
-
- if AUDIT_SERVER not in self.kwargs:
- self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
-
- created_req = ""
- if not self.request_id:
- created_req = " with new"
- self.request_id = str(uuid.uuid4())
-
- self.kwargs[AUDIT_REQUESTID] = self.request_id
+ self._lock = threading.Lock()
- self._started = time.time()
- self._start_event = Audit._logger_audit.getStartRecordEvent()
- self.metrics_start()
-
- if not self.aud_parent:
- self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
- .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
def merge_all_kwargs(self, **kwargs):
"""returns the merge of copy of self.kwargs with the param kwargs"""
@@ -228,16 +264,14 @@ class Audit(object):
def set_http_status_code(self, http_status_code):
"""accumulate the highest(worst) http status code"""
- self._lock.acquire()
- if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
- self.max_http_status_code = max(http_status_code, self.max_http_status_code)
- self._lock.release()
+ with self._lock:
+ if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+ self.max_http_status_code = max(http_status_code, self.max_http_status_code)
def get_max_http_status_code(self):
"""returns the highest(worst) http status code"""
- self._lock.acquire()
- max_http_status_code = self.max_http_status_code
- self._lock.release()
+ with self._lock:
+ max_http_status_code = self.max_http_status_code
return max_http_status_code
@staticmethod
@@ -247,33 +281,11 @@ class Audit(object):
return 'COMPLETE'
return 'ERROR'
- @staticmethod
- def hide_secrets(obj):
- """hides the known secret field values of the dictionary"""
- if not isinstance(obj, dict):
- return obj
-
- for key in obj:
- if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
- obj[key] = "*"
- elif isinstance(obj[key], dict):
- obj[key] = Audit.hide_secrets(obj[key])
-
- return obj
-
- @staticmethod
- def log_json_dumps(obj, **kwargs):
- """hide the known secret field values of the dictionary and return json.dumps"""
- if not isinstance(obj, dict):
- return json.dumps(obj, **kwargs)
-
- return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
-
def is_serious_error(self, status_code):
"""returns whether the response_code is success and a human text for response code"""
- return AuditResponseCode.PERMISSION_ERROR.value \
- == AuditResponseCode.get_response_code(status_code).value \
- or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value
+ return (AuditResponseCode.PERMISSION_ERROR.value
+ == AuditResponseCode.get_response_code(status_code).value
+ or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value)
def _get_response_status(self):
"""calculates the response status fields from max_http_status_code"""
@@ -290,104 +302,215 @@ class Audit(object):
def debug(self, log_line, **kwargs):
"""debug - the debug=lowest level of logging"""
- Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
def info(self, log_line, **kwargs):
"""debug - the info level of logging"""
- Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
def info_requested(self, result=None, **kwargs):
"""info "requested ..." - the info level of logging"""
self.info("requested {0} {1}".format(self.req_message, result or ""), \
**self.merge_all_kwargs(**kwargs))
- def warn(self, log_line, **kwargs):
+ def warn(self, log_line, error_code=None, **kwargs):
"""debug+error - the warn level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.warn(log_line, **all_kwargs)
- Audit._logger_error.warn(log_line, **all_kwargs)
- def error(self, log_line, **kwargs):
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.warn(log_line, **all_kwargs)
+ _Audit._logger_error.warn(log_line, **all_kwargs)
+
+ def error(self, log_line, error_code=None, **kwargs):
"""debug+error - the error level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.error(log_line, **all_kwargs)
- Audit._logger_error.error(log_line, **all_kwargs)
- def fatal(self, log_line, **kwargs):
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.error(log_line, **all_kwargs)
+ _Audit._logger_error.error(log_line, **all_kwargs)
+
+ def fatal(self, log_line, error_code=None, **kwargs):
"""debug+error - the fatal level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- Audit._logger_debug.fatal(log_line, **all_kwargs)
- Audit._logger_error.fatal(log_line, **all_kwargs)
+
+ if error_code and isinstance(error_code, AuditResponseCode):
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+ _Audit._logger_debug.fatal(log_line, **all_kwargs)
+ _Audit._logger_error.fatal(log_line, **all_kwargs)
+
+ @staticmethod
+ def hide_secrets(obj):
+ """hides the known secret field values of the dictionary"""
+ if not isinstance(obj, dict):
+ return obj
+
+ for key in obj:
+ if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
+ obj[key] = "*"
+ elif isinstance(obj[key], dict):
+ obj[key] = _Audit.hide_secrets(obj[key])
+
+ return obj
+
+ @staticmethod
+ def log_json_dumps(obj, **kwargs):
+ """hide the known secret field values of the dictionary and return json.dumps"""
+ if not isinstance(obj, dict):
+ return json.dumps(obj, **kwargs)
+
+ return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
@staticmethod
def get_elapsed_time(started):
"""returns the elapsed time since started in milliseconds"""
- return int(round(1000 * (time.time() - started)))
+ return int(round(1000 * (time.time() - (started or 0))))
- def metrics_start(self, log_line=None, **kwargs):
- """reset metrics timing"""
- self._metrics_started = time.time()
- self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
- if log_line:
- self.info(log_line, **self.merge_all_kwargs(**kwargs))
- def metrics(self, log_line, **kwargs):
- """debug+metrics - the metrics=sub-audit level of logging"""
+class Audit(_Audit):
+ """Audit class to track the high level operations"""
+
+ def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
+ """create audit object per each request in the system
+
+ :job_name: is the name of the audit job for health stats
+ :request_id: is the X-ECOMP-RequestID for tracing
+ :req_message: is the request message string for logging
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ super(Audit, self).__init__(job_name=job_name,
+ request_id=request_id,
+ req_message=req_message,
+ **kwargs)
+
+ headers = self.kwargs.get("headers", {})
+ if headers:
+ if not self.request_id:
+ self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+ if AUDIT_IPADDRESS not in self.kwargs:
+ self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
+
+ created_req = ""
+ if not self.request_id:
+ created_req = " with new"
+ self.request_id = str(uuid.uuid4())
+
+ if AUDIT_SERVER not in self.kwargs:
+ self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
+
+ self.kwargs[AUDIT_REQUESTID] = self.request_id
+
+ _Audit._health.start(self.job_name, self.request_id)
+ _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id)
+
+ self._started = time.time()
+ self._start_event = Audit._logger_audit.getStartRecordEvent()
+
+ self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
+ .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+
+
+ def audit_done(self, result=None, **kwargs):
+ """debug+audit - the audit=top level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
success, max_http_status_code, response_code, response_description = \
self._get_response_status()
- metrics_func = None
- timer = Audit.get_elapsed_time(self._metrics_started)
- metrics_job = Audit._key_format.sub(
- '_', all_kwargs.get(AUDIT_TARGET_ENTITY, AUDIT_METRICS + "_" + self.job_name))
+ log_line = "{0} {1}".format(self.req_message, result or "").strip()
+ audit_func = None
+ timer = _Audit.get_elapsed_time(self._started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
- metrics_func = Audit._logger_metrics.info
- Audit._health.success(metrics_job, timer)
+ audit_func = _Audit._logger_audit.info
+ _Audit._health.success(self.job_name, timer, self.request_id)
+ _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id)
else:
log_line = "failed: {0}".format(log_line)
- self.error(log_line, errorCode=response_code.value, \
- errorDescription=response_description, **all_kwargs)
- metrics_func = Audit._logger_metrics.error
- Audit._health.error(metrics_job, timer)
-
- metrics_func(log_line, begTime=self._metrics_start_event, timer=timer,
- statusCode=Audit.get_status_code(success), responseCode=response_code.value,
- responseDescription=response_description,
- **all_kwargs
- )
-
- self.metrics_start()
+ self.error(log_line, errorCode=response_code.value,
+ errorDescription=response_description, **all_kwargs)
+ audit_func = _Audit._logger_audit.error
+ _Audit._health.error(self.job_name, timer, self.request_id)
+ _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id)
+
+ audit_func(log_line, begTime=self._start_event, timer=timer,
+ statusCode=_Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs)
+
return (success, max_http_status_code, response_description)
- def audit_done(self, result=None, **kwargs):
- """debug+audit - the audit=top level of logging"""
+
+class Metrics(_Audit):
+ """Metrics class to track the calls to outside systems"""
+
+ def __init__(self, aud_parent, **kwargs):
+ """create audit object per each request in the system
+
+ :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+ :kwargs: - put any request related params into kwargs
+ """
+ super(Metrics, self).__init__(job_name=aud_parent.job_name,
+ request_id=aud_parent.request_id,
+ req_message=aud_parent.req_message,
+ **aud_parent.merge_all_kwargs(**kwargs))
+ self.aud_parent = aud_parent
+ self._metrics_name = _Audit._key_format.sub(
+ '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name))
+
+ self._metrics_started = None
+ self._metrics_start_event = None
+
+
+ def metrics_start(self, log_line=None, **kwargs):
+ """reset metrics timing"""
+ self.merge_all_kwargs(**kwargs)
+ self._metrics_started = time.time()
+ self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent()
+ if log_line:
+ self.info(log_line, **self.merge_all_kwargs(**kwargs))
+ _Audit._health.start(self._metrics_name, self.request_id)
+ _Audit._health.start(METRICS_TOTAL_STATS, self.request_id)
+
+
+ def metrics(self, log_line, **kwargs):
+ """debug+metrics - the metrics=sub-audit level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
success, max_http_status_code, response_code, response_description = \
self._get_response_status()
- log_line = "{0} {1}".format(self.req_message, result or "").strip()
- audit_func = None
- timer = Audit.get_elapsed_time(self._started)
+ metrics_func = None
+ timer = _Audit.get_elapsed_time(self._metrics_started)
if success:
log_line = "done: {0}".format(log_line)
self.info(log_line, **all_kwargs)
- audit_func = Audit._logger_audit.info
- Audit._health.success(self.job_name, timer)
- Audit._health.success(AUDIT_TOTAL_STATS, timer)
+ metrics_func = _Audit._logger_metrics.info
+ _Audit._health.success(self._metrics_name, timer, self.request_id)
+ _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id)
else:
log_line = "failed: {0}".format(log_line)
self.error(log_line, errorCode=response_code.value,
errorDescription=response_description, **all_kwargs)
- audit_func = Audit._logger_audit.error
- Audit._health.error(self.job_name, timer)
- Audit._health.error(AUDIT_TOTAL_STATS, timer)
-
- audit_func(log_line, begTime=self._start_event, timer=timer,
- statusCode=Audit.get_status_code(success),
- responseCode=response_code.value,
- responseDescription=response_description,
- **all_kwargs
- )
+ metrics_func = _Audit._logger_metrics.error
+ _Audit._health.error(self._metrics_name, timer, self.request_id)
+ _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id)
+
+ metrics_func(
+ log_line,
+ begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()),
+ timer=timer,
+ statusCode=_Audit.get_status_code(success),
+ responseCode=response_code.value,
+ responseDescription=response_description,
+ **all_kwargs)
return (success, max_http_status_code, response_description)
diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py
index 485f422..e6a6f69 100644
--- a/policyhandler/onap/health.py
+++ b/policyhandler/onap/health.py
@@ -28,46 +28,95 @@ class HealthStats(object):
"""keep track of stats for metrics calls"""
self._name = name or "stats_" + str(uuid.uuid4())
self._lock = Lock()
+
self._call_count = 0
self._error_count = 0
+ self._active_count = 0
+
self._longest_timer = 0
self._total_timer = 0
+
self._last_success = None
self._last_error = None
+ self._last_start = None
+ self._longest_end_ts = None
+
+ self._last_success_request_id = None
+ self._last_error_request_id = None
+ self._last_started_request_id = None
+ self._longest_request_id = None
+
def dump(self):
"""returns dict of stats"""
dump = None
with self._lock:
dump = {
- "call_count" : self._call_count,
- "error_count" : self._error_count,
- "last_success" : str(self._last_success),
- "last_error" : str(self._last_error),
- "longest_timer_millisecs" : self._longest_timer,
- "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \
- if self._call_count else 0)
+ "total" : {
+ "call_count" : self._call_count,
+ "ave_timer_millisecs" : (float(self._total_timer)/self._call_count
+ if self._call_count else 0)
+ },
+ "success" : {
+ "success_count" : (self._call_count - self._error_count),
+ "last_success" : str(self._last_success),
+ "last_success_request_id" : self._last_success_request_id
+ },
+ "error" : {
+ "error_count" : self._error_count,
+ "last_error" : str(self._last_error),
+ "last_error_request_id" : self._last_error_request_id
+ },
+ "active" : {
+ "active_count" : self._active_count,
+ "last_start" : str(self._last_start),
+ "last_started_request_id" : self._last_started_request_id
+ },
+ "longest" : {
+ "longest_timer_millisecs" : self._longest_timer,
+ "longest_request_id" : self._longest_request_id,
+ "longest_end" : str(self._longest_end_ts)
+ }
}
return dump
- def success(self, timer):
+
+ def start(self, request_id=None):
+ """records the start of active execution"""
+ with self._lock:
+ self._active_count += 1
+ self._last_start = datetime.utcnow()
+ self._last_started_request_id = request_id
+
+
+ def success(self, timer, request_id=None):
"""records the successful execution"""
with self._lock:
+ self._active_count -= 1
self._call_count += 1
- self._last_success = datetime.now()
+ self._last_success = datetime.utcnow()
+ self._last_success_request_id = request_id
self._total_timer += timer
if not self._longest_timer or self._longest_timer < timer:
self._longest_timer = timer
+ self._longest_request_id = request_id
+ self._longest_end_ts = self._last_success
+
- def error(self, timer):
+ def error(self, timer, request_id=None):
"""records the errored execution"""
with self._lock:
+ self._active_count -= 1
self._call_count += 1
self._error_count += 1
- self._last_error = datetime.now()
+ self._last_error = datetime.utcnow()
+ self._last_error_request_id = request_id
self._total_timer += timer
if not self._longest_timer or self._longest_timer < timer:
self._longest_timer = timer
+ self._longest_request_id = request_id
+ self._longest_end_ts = self._last_error
+
class Health(object):
"""Health stats for multiple requests"""
@@ -76,28 +125,36 @@ class Health(object):
self._all_stats = {}
self._lock = Lock()
+
def _add_or_get_stats(self, stats_name):
"""add to or get from the ever growing dict of HealthStats"""
- stats = None
with self._lock:
stats = self._all_stats.get(stats_name)
if not stats:
self._all_stats[stats_name] = stats = HealthStats(stats_name)
- return stats
+ return stats
+
+
+ def start(self, stats_name, request_id=None):
+ """records the start of execution on stats_name"""
+ stats = self._add_or_get_stats(stats_name)
+ stats.start(request_id)
+
- def success(self, stats_name, timer):
+ def success(self, stats_name, timer, request_id=None):
"""records the successful execution on stats_name"""
stats = self._add_or_get_stats(stats_name)
- stats.success(timer)
+ stats.success(timer, request_id)
+
- def error(self, stats_name, timer):
+ def error(self, stats_name, timer, request_id=None):
"""records the error execution on stats_name"""
stats = self._add_or_get_stats(stats_name)
- stats.error(timer)
+ stats.error(timer, request_id)
+
def dump(self):
"""returns dict of stats"""
with self._lock:
stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems())
-
return stats
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 751bea8..dd9eea6 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -35,7 +35,7 @@ from threading import Lock, Thread
import websocket
from .config import Config
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_updater import PolicyUpdater
LOADED_POLICIES = 'loadedPolicies'
@@ -115,32 +115,41 @@ class _PolicyReceiver(Thread):
def _on_pdp_message(self, _, message):
"""received the notification from PDP"""
- _PolicyReceiver._logger.info("Received notification message: %s", message)
- if not message:
- return
- message = json.loads(message)
-
- if not message:
- return
-
- policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(LOADED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
- policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(REMOVED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
-
- if not policies_updated and not policies_removed:
- _PolicyReceiver._logger.info(
- "no policy updated or removed for scopes %s", self._policy_scopes.pattern
- )
- return
+ try:
+ _PolicyReceiver._logger.info("Received notification message: %s", message)
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message))
+ return
+
+ policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(LOADED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+ policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+ for policy in message.get(REMOVED_POLICIES, [])
+ if self._policy_scopes.match(policy.get(POLICY_NAME))]
+
+ if not policies_updated and not policies_removed:
+ _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
+ self._policy_scopes.pattern)
+ return
+
+ audit = Audit(job_name="policy_update",
+ req_message="policy-notification - updated[{0}], removed[{1}]"
+ .format(len(policies_updated), len(policies_removed)),
+ retry_get_config=True)
+ self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _PolicyReceiver._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- audit = Audit(job_name="policy_update",
- req_message="policy-notification - updated[{0}], removed[{1}]"
- .format(len(policies_updated), len(policies_removed)))
- audit.retry_get_config = True
- self._policy_updater.enqueue(audit, policies_updated, policies_removed)
def _on_ws_error(self, _, error):
"""report an error"""
@@ -184,12 +193,7 @@ class PolicyReceiver(object):
@staticmethod
def run(audit):
"""Using policy-engine client to talk to policy engine"""
- sub_aud = Audit(aud_parent=audit)
- sub_aud.metrics_start("start policy receiver")
-
PolicyReceiver._policy_receiver = _PolicyReceiver()
PolicyReceiver._policy_receiver.start()
- sub_aud.metrics("started policy receiver")
-
PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 0c8920a..977a9a1 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -27,11 +27,11 @@ from multiprocessing.dummy import Pool as ThreadPool
import requests
from .config import Config
-from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode,
- AuditResponseCode)
-from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY,
- POLICY_CONFIG, POLICY_FILTER, POLICY_ID,
- POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES)
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES,
+ POLICY_BODY, POLICY_CONFIG, POLICY_FILTER,
+ POLICY_ID, POLICY_NAME, SCOPE_PREFIXES)
from .policy_utils import PolicyUtils
@@ -95,43 +95,47 @@ class PolicyRest(object):
PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1
PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
- PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \
- PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \
+ PolicyRest._logger.info(
+ "PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
+ PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers),
json.dumps(PolicyRest._scope_prefixes))
@staticmethod
def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
- sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \
- targetServiceName=PolicyRest._url_get_config)
+ metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
+ targetServiceName=PolicyRest._url_get_config)
msg = json.dumps(json_body)
headers = copy.copy(PolicyRest._headers)
- headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id
- headers_str = Audit.log_json_dumps(headers)
+ headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+ headers_str = Metrics.log_json_dumps(headers)
log_line = "post to PDP {0} msg={1} headers={2}".format(
PolicyRest._url_get_config, msg, headers_str)
- sub_aud.metrics_start(log_line)
+ metrics.metrics_start(log_line)
PolicyRest._logger.info(log_line)
res = None
try:
res = PolicyRest._requests_session.post(
PolicyRest._url_get_config, json=json_body, headers=headers)
- except requests.exceptions.RequestException as ex:
- error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
- error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \
- .format(PolicyRest._url_get_config, str(ex), msg, headers_str)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = (
+ "failed to post to PDP {0} {1}: {2} msg={3} headers={4}"
+ .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str))
PolicyRest._logger.exception(error_msg)
- sub_aud.set_http_status_code(error_code)
+ metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
- sub_aud.metrics(error_msg)
+ metrics.metrics(error_msg)
return (error_code, None)
log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
PolicyRest._url_get_config, res.status_code, msg, res.text,
- Audit.log_json_dumps(dict(res.request.headers.items())))
+ Metrics.log_json_dumps(dict(res.request.headers.items())))
res_data = None
if res.status_code == requests.codes.ok:
@@ -146,9 +150,9 @@ class PolicyRest(object):
error_msg = "unexpected {0}".format(log_line)
PolicyRest._logger.error(error_msg)
- sub_aud.set_http_status_code(error_code)
+ metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
- sub_aud.metrics(error_msg)
+ metrics.metrics(error_msg)
return (error_code, None)
elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR:
@@ -163,14 +167,14 @@ class PolicyRest(object):
info_msg = "not found {0}".format(log_line)
PolicyRest._logger.info(info_msg)
- sub_aud.set_http_status_code(status_code)
- sub_aud.metrics(info_msg)
+ metrics.set_http_status_code(status_code)
+ metrics.metrics(info_msg)
return (status_code, None)
except ValueError:
pass
- sub_aud.set_http_status_code(res.status_code)
- sub_aud.metrics(log_line)
+ metrics.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
PolicyRest._logger.info(log_line)
return res.status_code, res_data
@@ -191,165 +195,187 @@ class PolicyRest(object):
@staticmethod
def get_latest_policy(aud_policy_id):
"""Get the latest policy for the policy_id from the policy-engine"""
- PolicyRest._lazy_init()
audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
+ str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format(
+ policy_id, min_version_expected, json.dumps(ignore_policy_names))
- status_code = 0
- policy_configs = None
- latest_policy = None
- expect_policy_removed = (ignore_policy_names and not min_version_expected)
-
- for retry in xrange(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug("%s", policy_id)
-
- status_code, policy_configs = PolicyRest._pdp_get_config(
- audit, {POLICY_NAME:policy_id}
- )
-
- PolicyRest._logger.debug("%s %s policy_configs: %s",
- status_code, policy_id, json.dumps(policy_configs or []))
-
- latest_policy = PolicyUtils.select_latest_policy(
- policy_configs, min_version_expected, ignore_policy_names
- )
-
- if not latest_policy and not expect_policy_removed:
- audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
- .format(policy_id, json.dumps(policy_configs or [])),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(
- AuditResponseCode.DATA_ERROR))
-
- if latest_policy or not audit.retry_get_config \
- or (expect_policy_removed and not policy_configs) \
- or not PolicyRest._policy_retry_sleep \
- or audit.is_serious_error(status_code):
- break
-
- if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
- .format(PolicyRest._url_get_config, retry, policy_id),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(
- AuditResponseCode.DATA_ERROR))
- break
-
- audit.warn(
- "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
- retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(
- AuditResponseCode.DATA_ERROR))
- time.sleep(PolicyRest._policy_retry_sleep)
-
- if expect_policy_removed and not latest_policy \
- and AuditHttpCode.RESPONSE_ERROR.value == status_code:
- audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
- return None
+ try:
+ PolicyRest._lazy_init()
+ status_code = 0
+ retry_get_config = audit.kwargs.get("retry_get_config")
+ policy_configs = None
+ latest_policy = None
+ expect_policy_removed = (ignore_policy_names and not min_version_expected)
+
+ for retry in xrange(1, PolicyRest._policy_retry_count + 1):
+ PolicyRest._logger.debug(str_metrics)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(
+ audit, {POLICY_NAME:policy_id}
+ )
+
+ PolicyRest._logger.debug("%s %s policy_configs: %s",
+ status_code, policy_id, json.dumps(policy_configs or []))
+
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_configs, min_version_expected, ignore_policy_names
+ )
- audit.set_http_status_code(status_code)
- if not PolicyRest._validate_policy(latest_policy):
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
- "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
- )
+ if not latest_policy and not expect_policy_removed:
+ audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ .format(policy_id, json.dumps(policy_configs or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ if (latest_policy
+ or not retry_get_config
+ or (expect_policy_removed and not policy_configs)
+ or not PolicyRest._policy_retry_sleep
+ or audit.is_serious_error(status_code)):
+ break
+
+ if retry == PolicyRest._policy_retry_count:
+ audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ .format(PolicyRest._url_get_config, retry, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
+ break
+
+ audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ retry, PolicyRest._url_get_config,
+ PolicyRest._policy_retry_sleep, policy_id),
+ error_code=AuditResponseCode.DATA_ERROR)
+ time.sleep(PolicyRest._policy_retry_sleep)
+
+ if (expect_policy_removed and not latest_policy
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code):
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
+ audit.set_http_status_code(status_code)
+ if not PolicyRest._validate_policy(latest_policy):
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return latest_policy
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policy", str_metrics))
+
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
- return latest_policy
@staticmethod
def get_latest_updated_policies(aud_policy_updates):
"""Get the latest policies of the list of policy_names from the policy-engine"""
- PolicyRest._lazy_init()
audit, policies_updated, policies_removed = aud_policy_updates
if not policies_updated and not policies_removed:
- return
+ return None, None
str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
len(policies_updated), json.dumps(policies_updated),
len(policies_removed), json.dumps(policies_removed))
- audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
- PolicyRest._logger.debug(str_metrics)
-
- policies_to_find = {}
- for (policy_name, policy_version) in policies_updated:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id or not policy_version.isdigit():
- continue
- policy = policies_to_find.get(policy_id)
- if not policy:
- policies_to_find[policy_id] = {
- POLICY_ID: policy_id,
- PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
- PolicyRest.IGNORE_POLICY_NAMES: {}
- }
- continue
- if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
- policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
-
- for (policy_name, _) in policies_removed:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id:
- continue
- policy = policies_to_find.get(policy_id)
- if not policy:
- policies_to_find[policy_id] = {
- POLICY_ID: policy_id,
- PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
- }
- continue
- policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
-
- apns = [(audit, policy_id,
- policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
- policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
- for (policy_id, policy_to_find) in policies_to_find.iteritems()]
-
- policies = None
- apns_length = len(apns)
- if apns_length == 1:
- policies = [PolicyRest.get_latest_policy(apns[0])]
- else:
- pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
- policies = pool.map(PolicyRest.get_latest_policy, apns)
- pool.close()
- pool.join()
-
- audit.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)),
- targetEntity=PolicyRest._target_entity,
- targetServiceName=PolicyRest._url_get_config)
-
- updated_policies = dict((policy[POLICY_ID], policy)
- for policy in policies
- if policy and policy.get(POLICY_ID))
-
- removed_policies = dict((policy_id, True)
- for (policy_id, policy_to_find) in policies_to_find.iteritems()
- if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
- and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
- and policy_id not in updated_policies)
-
- errored_policies = dict((policy_id, policy_to_find)
- for (policy_id, policy_to_find) in policies_to_find.iteritems()
- if policy_id not in updated_policies
- and policy_id not in removed_policies)
-
- PolicyRest._logger.debug(
- "result updated_policies %s, removed_policies %s, errored_policies %s",
- json.dumps(updated_policies), json.dumps(removed_policies),
- json.dumps(errored_policies))
-
- if errored_policies:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.error(
- "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
- )
-
- return updated_policies, removed_policies
+
+ target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity)
+ try:
+ PolicyRest._lazy_init()
+ metrics = Metrics(aud_parent=audit,
+ targetEntity=target_entity,
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ PolicyRest._logger.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_name, policy_version) in policies_updated:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+ policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+
+ for (policy_name, _) in policies_removed:
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()]
+
+ policies = None
+ apns_length = len(apns)
+ if apns_length == 1:
+ policies = [PolicyRest.get_latest_policy(apns[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
+ policies = pool.map(PolicyRest.get_latest_policy, apns)
+ pool.close()
+ pool.join()
+
+ metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)))
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()
+ if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.iteritems()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ PolicyRest._logger.debug(
+ "result updated_policies %s, removed_policies %s, errored_policies %s",
+ json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.error(
+ "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return updated_policies, removed_policies
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_updated_policies", str_metrics))
+
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None
+
@staticmethod
def _get_latest_policies(aud_policy_filter):
@@ -358,104 +384,120 @@ class PolicyRest(object):
or all the latest policies of the same scope from the policy-engine
"""
audit, policy_filter, scope_prefix = aud_policy_filter
- str_policy_filter = json.dumps(policy_filter)
- PolicyRest._logger.debug("%s", str_policy_filter)
-
- status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
-
- PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
- str_policy_filter, json.dumps(policy_configs or []))
+ try:
+ str_policy_filter = json.dumps(policy_filter)
+ PolicyRest._logger.debug("%s", str_policy_filter)
+
+ status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+
+ PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_configs or []))
+
+ latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+
+ if (scope_prefix and not policy_configs
+ and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value):
+ audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
+ error_code=AuditResponseCode.DATA_ERROR)
+ return None, latest_policies, scope_prefix
+
+ if not latest_policies:
+ if not scope_prefix:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.warn(
+ "received no policies from PDP for policy_filter {0}: {1}"
+ .format(str_policy_filter, json.dumps(policy_configs or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+ return None, latest_policies, None
+
+ audit.set_http_status_code(status_code)
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in latest_policies.iteritems():
+ if PolicyRest._validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+ return valid_policies, errored_policies, None
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "_get_latest_policies", json.dumps(policy_filter), scope_prefix))
- latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None, scope_prefix
- if scope_prefix and not policy_configs \
- and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value:
- audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(
- AuditResponseCode.DATA_ERROR)
- )
- return None, latest_policies, scope_prefix
-
- if not latest_policies:
- if not scope_prefix:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.warn(
- "received no policies from PDP for policy_filter {0}: {1}"
- .format(str_policy_filter, json.dumps(policy_configs or [])),
- errorCode=AuditResponseCode.DATA_ERROR.value,
- errorDescription=AuditResponseCode.get_human_text(
- AuditResponseCode.DATA_ERROR)
- )
- return None, latest_policies, None
-
- audit.set_http_status_code(status_code)
- valid_policies = {}
- errored_policies = {}
- for (policy_id, policy) in latest_policies.iteritems():
- if PolicyRest._validate_policy(policy):
- valid_policies[policy_id] = policy
- else:
- errored_policies[policy_id] = policy
- return valid_policies, errored_policies, None
@staticmethod
def get_latest_policies(audit, policy_filter=None):
"""Get the latest policies of the same scope from the policy-engine"""
- PolicyRest._lazy_init()
-
result = {}
aud_policy_filters = None
str_policy_filters = None
str_metrics = None
target_entity = None
- if policy_filter is not None:
- aud_policy_filters = [(audit, policy_filter, None)]
- str_policy_filters = json.dumps(policy_filter)
- str_metrics = "get_latest_policies for policy_filter {0}".format(
- str_policy_filters)
- target_entity = ("{0} total get_latest_policies by policy_filter"
- .format(PolicyRest._target_entity))
- result[POLICY_FILTER] = copy.deepcopy(policy_filter)
- else:
- aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
- for scope_prefix in PolicyRest._scope_prefixes]
- str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
- str_metrics = "get_latest_policies for scopes {0} {1}".format( \
- len(PolicyRest._scope_prefixes), str_policy_filters)
- target_entity = ("{0} total get_latest_policies by scope_prefixes"
- .format(PolicyRest._target_entity))
- result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
-
- PolicyRest._logger.debug("%s", str_policy_filters)
- audit.metrics_start(str_metrics)
-
- latest_policies = None
- apfs_length = len(aud_policy_filters)
- if apfs_length == 1:
- latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
- else:
- pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
- latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
- pool.close()
- pool.join()
-
- audit.metrics(
- "total result {0}: {1} {2}".format(
- str_metrics, len(latest_policies), json.dumps(latest_policies)),
- targetEntity=target_entity, targetServiceName=PolicyRest._url_get_config)
-
- # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
- result[LATEST_POLICIES] = dict(
- pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems())
-
- result[ERRORED_POLICIES] = dict(
- pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
-
- result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
-
- PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
- str_policy_filters, json.dumps(result))
-
- return result
+ try:
+ PolicyRest._lazy_init()
+ if policy_filter is not None:
+ aud_policy_filters = [(audit, policy_filter, None)]
+ str_policy_filters = json.dumps(policy_filter)
+ str_metrics = "get_latest_policies for policy_filter {0}".format(
+ str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by policy_filter"
+ .format(PolicyRest._target_entity))
+ result[POLICY_FILTER] = copy.deepcopy(policy_filter)
+ else:
+ aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
+ for scope_prefix in PolicyRest._scope_prefixes]
+ str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
+ str_metrics = "get_latest_policies for scopes {0} {1}".format( \
+ len(PolicyRest._scope_prefixes), str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by scope_prefixes"
+ .format(PolicyRest._target_entity))
+ result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
+
+ PolicyRest._logger.debug("%s", str_policy_filters)
+ metrics = Metrics(aud_parent=audit, targetEntity=target_entity,
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics.metrics_start(str_metrics)
+
+ latest_policies = None
+ apfs_length = len(aud_policy_filters)
+ if apfs_length == 1:
+ latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+ latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
+ pool.close()
+ pool.join()
+
+ metrics.metrics("total result {0}: {1} {2}".format(
+ str_metrics, len(latest_policies), json.dumps(latest_policies)))
+
+ # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+ result[LATEST_POLICIES] = dict(
+ pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems())
+
+ result[ERRORED_POLICIES] = dict(
+ pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
+
+ result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
+
+ PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
+ str_policy_filters, json.dumps(result))
+ return result
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policies", str_metrics))
+
+ PolicyRest._logger.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 70823fa..38ce93a 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -26,7 +26,7 @@ from threading import Lock, Thread
from .config import Config
from .deploy_handler import DeployHandler
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
REMOVED_POLICIES)
from .policy_rest import PolicyRest
@@ -56,6 +56,7 @@ class PolicyUpdater(Thread):
self._lock = Lock()
self._queue = Queue()
+
def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
"""enqueue the policy-updates"""
policies_updated = policies_updated or []
@@ -65,7 +66,23 @@ class PolicyUpdater(Thread):
"enqueue request_id %s policies_updated %s policies_removed %s",
((audit and audit.request_id) or "none"),
json.dumps(policies_updated), json.dumps(policies_removed))
- self._queue.put((audit, policies_updated, policies_removed))
+
+ with self._lock:
+ self._queue.put((audit, policies_updated, policies_removed))
+
+
+ def catch_up(self, audit=None):
+ """need to bring the latest policies to DCAE-Controller"""
+ with self._lock:
+ if not self._aud_catch_up:
+ self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+ PolicyUpdater._logger.info(
+ "catch_up %s request_id %s",
+ self._aud_catch_up.req_message, self._aud_catch_up.request_id
+ )
+
+ self.enqueue()
+
def run(self):
"""wait and run the policy-update in thread"""
@@ -78,32 +95,15 @@ class PolicyUpdater(Thread):
json.dumps(policies_updated), json.dumps(policies_removed))
if not self._keep_running():
- self._queue.task_done()
break
if self._on_catch_up():
self._reset_queue()
continue
elif not queued_audit:
- self._queue.task_done()
continue
- updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (queued_audit, policies_updated, policies_removed))
-
- message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
-
- deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
-
- self._queue.task_done()
- queued_audit.audit_done()
-
- if deployment_handler_changed:
- self._catch_up_prev_message = None
- self._pause_catch_up_timer()
- self.catch_up()
- elif not queued_audit.is_success():
- self._catch_up_prev_message = None
+ self._on_policies_update(queued_audit, policies_updated, policies_removed)
PolicyUpdater._logger.info("exit policy-updater")
@@ -116,17 +116,6 @@ class PolicyUpdater(Thread):
self._aud_shutdown.audit_done()
return keep_running
- def catch_up(self, audit=None):
- """need to bring the latest policies to DCAE-Controller"""
- with self._lock:
- self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
- PolicyUpdater._logger.info(
- "catch_up %s request_id %s",
- self._aud_catch_up.req_message, self._aud_catch_up.request_id
- )
-
- self.enqueue()
-
def _run_catch_up_timer(self):
"""create and start the catch_up timer"""
if not self._catch_up_interval:
@@ -190,9 +179,9 @@ class PolicyUpdater(Thread):
def _reset_queue(self):
"""clear up the queue"""
with self._lock:
- self._aud_catch_up = None
- self._queue.task_done()
- self._queue = Queue()
+ if not self._aud_catch_up and not self._aud_shutdown:
+ with self._queue.mutex:
+ self._queue.queue.clear()
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
@@ -207,37 +196,94 @@ class PolicyUpdater(Thread):
log_line = "catch_up {0} request_id {1}".format(
aud_catch_up.req_message, aud_catch_up.request_id
)
+ try:
+ PolicyUpdater._logger.info(log_line)
+ self._pause_catch_up_timer()
- PolicyUpdater._logger.info(log_line)
- self._pause_catch_up_timer()
+ catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+ catch_up_message[CATCH_UP] = True
- catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
- catch_up_message[CATCH_UP] = True
-
- catch_up_result = ""
- if not aud_catch_up.is_success():
- catch_up_result = "- not sending catch-up to deployment-handler due to errors"
- PolicyUpdater._logger.warn(catch_up_result)
- elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
- DeployHandler.policy_update(aud_catch_up, catch_up_message)
- if aud_catch_up.is_success():
- catch_up_result = "- sent catch-up to deployment-handler"
- else:
- catch_up_result = "- failed to send catch-up to deployment-handler"
+ catch_up_result = ""
+ if not aud_catch_up.is_success():
+ catch_up_result = "- not sending catch-up to deployment-handler due to errors"
PolicyUpdater._logger.warn(catch_up_result)
- else:
- catch_up_result = "- skipped sending the same policies"
- success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
- PolicyUpdater._logger.info(log_line + " " + catch_up_result)
- PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+ elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+ catch_up_result = "- skipped sending the same policies"
+ else:
+ DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+ if not aud_catch_up.is_success():
+ catch_up_result = "- failed to send catch-up to deployment-handler"
+ PolicyUpdater._logger.warn(catch_up_result)
+ else:
+ catch_up_result = "- sent catch-up to deployment-handler"
+ success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(log_line + " " + catch_up_result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(aud_catch_up.request_id, type(ex).__name__, str(ex),
+ "on_catch_up", log_line + " " + catch_up_result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
if not success:
self._catch_up_prev_message = None
self._run_catch_up_timer()
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_catch_up.health(full=True)))
return success
+
+ def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
+ """handle the event of policy-updates from the queue"""
+ deployment_handler_changed = None
+ result = ""
+
+ log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
+ ((queued_audit and queued_audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+ try:
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (queued_audit, policies_updated, policies_removed))
+
+ if not queued_audit.is_success():
+ result = "- not sending policy-updates to deployment-handler due to errors"
+ PolicyUpdater._logger.warn(result)
+ else:
+ message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+ deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+ if not queued_audit.is_success():
+ result = "- failed to send policy-updates to deployment-handler"
+ PolicyUpdater._logger.warn(result)
+ else:
+ result = "- sent policy-updates to deployment-handler"
+
+ success, _, _ = queued_audit.audit_done(result=result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(queued_audit.request_id, type(ex).__name__, str(ex),
+ "on_policies_update", log_line + " " + result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
+
+ if deployment_handler_changed:
+ self._catch_up_prev_message = None
+ self._pause_catch_up_timer()
+ self.catch_up()
+ elif not success:
+ self._catch_up_prev_message = None
+
+
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
index 3936107..2a13dd5 100644
--- a/policyhandler/step_timer.py
+++ b/policyhandler/step_timer.py
@@ -18,6 +18,7 @@
"""periodically callback"""
+import json
from datetime import datetime
from threading import Event, RLock, Thread
@@ -51,11 +52,11 @@ class StepTimer(Thread):
self._request = StepTimer.INIT
self._req_count = 0
self._req_time = 0
- self._req_ts = datetime.now()
+ self._req_ts = datetime.utcnow()
self._substep = None
self._substep_time = 0
- self._substep_ts = datetime.now()
+ self._substep_ts = datetime.utcnow()
def get_timer_status(self):
"""returns timer status"""
@@ -104,9 +105,9 @@ class StepTimer(Thread):
prev_req = self._request
self._request = request
- now = datetime.now()
- self._req_time = (now - self._req_ts).total_seconds()
- self._req_ts = now
+ utcnow = datetime.utcnow()
+ self._req_time = (utcnow - self._req_ts).total_seconds()
+ self._req_ts = utcnow
self._logger.info("{0}[{1}] {2}->{3}".format(
self.name, self._req_time, prev_req, self.get_timer_status()))
@@ -114,9 +115,9 @@ class StepTimer(Thread):
"""log exe step"""
with self._lock:
self._substep = substep
- now = datetime.now()
- self._substep_time = (now - self._substep_ts).total_seconds()
- self._substep_ts = now
+ utcnow = datetime.utcnow()
+ self._substep_time = (utcnow - self._substep_ts).total_seconds()
+ self._substep_ts = utcnow
self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status()))
def run(self):
@@ -147,8 +148,14 @@ class StepTimer(Thread):
if self._paused:
self._timer_substep("paused - skip on_time event")
else:
- self._timer_substep("on_time event")
- self._on_time(*self._args, **self._kwargs)
+ try:
+ self._timer_substep("on_time event")
+ self._on_time(*self._args, **self._kwargs)
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})"
+ .format(self.name, type(ex).__name__, str(ex), "_on_time",
+ json.dumps(self._args), json.dumps(self._kwargs)))
+ self._logger.exception(error_msg)
self._timer_substep("waiting for next...")
self._next.wait()
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index e9cc9cc..5314791 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -169,7 +169,7 @@ class _PolicyWeb(object):
@cherrypy.tools.json_out()
def catch_up(self):
"""catch up with all DCAE policies"""
- started = str(datetime.now())
+ started = str(datetime.utcnow())
req_info = _PolicyWeb._get_request_info(cherrypy.request)
audit = Audit(job_name="catch_up", req_message=req_info, headers=cherrypy.request.headers)
@@ -193,11 +193,10 @@ class _PolicyWeb(object):
PolicyReceiver.shutdown(audit)
- health = json.dumps(Audit.health())
- audit.info("policy_handler health: {0}".format(health))
- PolicyWeb.logger.info("policy_handler health: %s", health)
+ PolicyWeb.logger.info("policy_handler health: {0}"
+ .format(json.dumps(audit.health(full=True))))
PolicyWeb.logger.info("%s: --------- the end -----------", req_info)
- res = str(datetime.now())
+ res = str(datetime.utcnow())
audit.info_requested(res)
return "goodbye! shutdown requested {0}".format(res)
@@ -211,7 +210,7 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- res = Audit.health()
+ res = audit.health()
PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res))