aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-08-07 12:11:35 -0400
committerAlex Shatov <alexs@att.com>2018-08-07 12:11:35 -0400
commitd7f34d4b71ec4d86547628cda351d20bff4d017f (patch)
tree101c7669fb5508a103894e262964da0d0c8319bc /policyhandler
parenta29f70823b18f492417629f56c86f61f94b96af8 (diff)
4.0.0 new dataflow on policy-update and catchup
- changed API and functionality - new dataflow - new dataflow between policy-handler and deployment-handler on policy-update and catchup = GETting policy_ids+versions and policy-filters from deployment-handler = PUTting policy-update and catchup in the new message format = data segmenting the policy-update/catchup messages to deployment-handler to avoid 413 on deployment-handler side = matching policies from policy-engine to policies and policy-filters from deployment-handler = coarsening the policyName filter received from deployment-handler to reduce the number messages passed to policy-engine on catchup = consolidating sequential policy-updates into a single request when the policy-update is busy - removed policy scope-prefixes from config and logic - it is not needed anymore because = the policy matching happens directly to policies and policy-filters received from deployment-handler = on catchup - the policy scope-prefix equivalents are calculated based on the data received from deployment-handler - API - GET /policies_latest now returns the info on deployed policy_ids+versions and policy-filters, rather than policies of the scope-prefixes previously found in config (obsolete) - not sending an empty catch_up message to deployment-handler when nothing changed - send policy-removed to deployment-handler when getting 404-not found from PDP on removal of policy - config change: removed catch_up.max_skips - obsolete - brought the latest CommonLogger.py - minor refactoring - improved naming of variables Change-Id: I36b3412eefd439088cb693703a6e5f18f4238b00 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-492
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/customize/customizer_base.py4
-rw-r--r--policyhandler/deploy_handler.py266
-rw-r--r--policyhandler/onap/CommonLogger.py1916
-rw-r--r--policyhandler/onap/audit.py29
-rw-r--r--policyhandler/policy_consts.py8
-rw-r--r--policyhandler/policy_matcher.py236
-rw-r--r--policyhandler/policy_receiver.py42
-rw-r--r--policyhandler/policy_rest.py146
-rw-r--r--policyhandler/policy_updater.py256
-rw-r--r--policyhandler/policy_utils.py260
-rw-r--r--policyhandler/web_server.py33
11 files changed, 1960 insertions, 1236 deletions
diff --git a/policyhandler/customize/customizer_base.py b/policyhandler/customize/customizer_base.py
index c98a9eb..561891f 100644
--- a/policyhandler/customize/customizer_base.py
+++ b/policyhandler/customize/customizer_base.py
@@ -55,8 +55,8 @@ class CustomizerBase(object):
return service_url
def get_deploy_handler_kwargs(self, audit):
- """returns the optional dict-kwargs for requests.post to deploy_handler"""
- info = "no optional kwargs for requests.post to deploy_handler"
+ """returns the optional dict-kwargs for requests.put to deploy_handler"""
+ info = "no optional kwargs for requests.put to deploy_handler"
self._logger.info(info)
audit.info(info)
kwargs = {}
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index ea703f4..6b7788c 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -16,32 +16,144 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-""" send notification to deploy-handler"""
+"""send policy-update notification to deployment-handler"""
import json
import logging
+from copy import copy, deepcopy
import requests
from .config import Config
from .customize import CustomizerUser
from .discovery import DiscoveryClient
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
+ POLICY_FILTER_MATCHES, POLICY_FILTERS,
+ REMOVED_POLICIES)
+
+
+class PolicyUpdateMessage(object):
+ """class for messages to deployment-handler on policy-update"""
+ BYTES_IN_MB = 1 << 2 * 10
+
+ def __init__(self, latest_policies=None,
+ removed_policies=None, policy_filter_matches=None, catch_up=True):
+ """init"""
+ self._catch_up = catch_up
+ self._latest_policies = deepcopy(latest_policies or {})
+ self._removed_policies = copy(removed_policies or {})
+ self._policy_filter_matches = deepcopy(policy_filter_matches or {})
+
+ self._message = {
+ CATCH_UP: self._catch_up,
+ LATEST_POLICIES: self._latest_policies,
+ REMOVED_POLICIES: self._removed_policies,
+ POLICY_FILTER_MATCHES: self._policy_filter_matches
+ }
+ self.msg_length = 0
+ self._calc_stats()
+
+ def _calc_stats(self):
+ """generate the message and calc stats"""
+ self.msg_length = len(json.dumps(self._message))
+
+ def empty(self):
+ """checks whether have any data"""
+ return (not self._latest_policies
+ and not self._removed_policies
+ and not self._policy_filter_matches)
+
+ def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None):
+ """add the parts from the other message to the current message"""
+ if not policy_id or not (latest_policy or policy_filter_ids or removed_policy):
+ return
+
+ if latest_policy:
+ self._latest_policies[policy_id] = deepcopy(latest_policy)
+
+ if policy_filter_ids:
+ if policy_id not in self._policy_filter_matches:
+ self._policy_filter_matches[policy_id] = {}
+ self._policy_filter_matches[policy_id].update(policy_filter_ids)
+
+ if removed_policy is not None:
+ self._removed_policies[policy_id] = removed_policy
+
+ self._calc_stats()
+
+ def get_message(self):
+ """expose the copy of the message"""
+ return deepcopy(self._message)
+
+ def __str__(self):
+ """to string"""
+ return json.dumps(self._message)
+
+ def _iter_over_removed_policies(self):
+ """generator of iterator over removed_policies"""
+ for (policy_id, value) in self._removed_policies.items():
+ yield (policy_id, value)
+
+ def _iter_over_latest_policies(self):
+ """generator of iterator over latest_policies and policy_filter_matches"""
+ for (policy_id, policy) in self._latest_policies.items():
+ yield (policy_id, policy, self._policy_filter_matches.get(policy_id))
+
+ def gen_segmented_messages(self, max_msg_length_mb):
+ """
+ Break the policy-update message into a list of segmented messages.
+
+ Each segmented message should not exceed the max_msg_length_mb from config.
+ """
+ max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB
+
+ messages = []
+ curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
+
+ for (policy_id, value) in self._iter_over_removed_policies():
+ if (not curr_message.empty()
+ and (len(policy_id) + len(str(value)) + curr_message.msg_length
+ > max_msg_length_mb)):
+ messages.append(curr_message.get_message())
+ curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
+ curr_message.add(policy_id, removed_policy=value)
+
+ for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies():
+ if (not curr_message.empty()
+ and (2 * len(policy_id) + len(json.dumps(policy))
+ + len(json.dumps(policy_filter_ids))
+ + curr_message.msg_length > max_msg_length_mb)):
+ messages.append(curr_message.get_message())
+ curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
+ curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids)
+
+ if not curr_message.empty():
+ messages.append(curr_message.get_message())
+
+ msg_count = len(messages)
+ if msg_count > 1:
+ msg_count = "/" + str(msg_count)
+ for idx, msg in enumerate(messages):
+ msg["data_segment"] = str((idx+1)) + msg_count
+
+ return messages
-POOL_SIZE = 1
class DeployHandler(object):
- """ deploy-handler """
+ """calling the deployment-handler web apis"""
_logger = logging.getLogger("policy_handler.deploy_handler")
_lazy_inited = False
_requests_session = None
- _config = None
_url = None
_url_policy = None
+ _max_msg_length_mb = 10
_target_entity = None
_custom_kwargs = None
_server_instance_uuid = None
+ server_instance_changed = False
@staticmethod
def _lazy_init(audit, rediscover=False):
@@ -56,14 +168,15 @@ class DeployHandler(object):
DeployHandler._custom_kwargs = {}
if not DeployHandler._requests_session:
+ pool_size = Config.settings.get("pool_connections", 20)
DeployHandler._requests_session = requests.Session()
DeployHandler._requests_session.mount(
'https://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
DeployHandler._requests_session.mount(
'http://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
config_dh = Config.settings.get("deploy_handler")
@@ -72,10 +185,13 @@ class DeployHandler(object):
# config for policy-handler >= 2.4.0
# "deploy_handler" : {
# "target_entity" : "deployment_handler",
- # "url" : "http://deployment_handler:8188"
+ # "url" : "http://deployment_handler:8188",
+ # "max_msg_length_mb" : 100
# }
DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
DeployHandler._url = config_dh.get("url")
+ DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
+ DeployHandler._max_msg_length_mb)
DeployHandler._logger.info("dns based routing to %s: url(%s)",
DeployHandler._target_entity, DeployHandler._url)
@@ -96,26 +212,52 @@ class DeployHandler(object):
@staticmethod
- def policy_update(audit, message, rediscover=False):
+ def policy_update(audit, policy_update_message, rediscover=False):
"""
- post policy_updated message to deploy-handler
+ segments the big policy_update_message limited by size
+ and sequatially sends each segment as put to deployment-handler at /policy.
- returns condition whether it needs to catch_up
+ param policy_update_message is of PolicyUpdateMessage type
"""
- if not message:
+ if not policy_update_message or policy_update_message.empty():
return
DeployHandler._lazy_init(audit, rediscover)
+
+ str_metrics = "policy_update {0}".format(str(policy_update_message))
+
+ metrics_total = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total policy_update".format(DeployHandler._target_entity),
+ targetServiceName=DeployHandler._url_policy)
+
+ metrics_total.metrics_start("started {}".format(str_metrics))
+ messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb)
+ for message in messages:
+ DeployHandler._policy_update(audit, message)
+ if not audit.is_success():
+ break
+ metrics_total.metrics("done {}".format(str_metrics))
+
+ @staticmethod
+ def _policy_update(audit, message):
+ """
+ sends the put message to deployment-handler at /policy
+
+ detects whether server_instance_changed condition on deployment-handler
+ that is the cause to catch_up
+ """
+ if not message:
+ return
+
metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_policy)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
- msg_str = json.dumps(message)
- headers_str = json.dumps(headers)
-
- log_action = "post to {0} at {1}".format(
+ log_action = "put to {0} at {1}".format(
DeployHandler._target_entity, DeployHandler._url_policy)
- log_data = " msg={0} headers={1}".format(msg_str, headers_str)
+ log_data = " msg={0} headers={1}".format(json.dumps(message),
+ json.dumps(headers))
log_line = log_action + log_data
DeployHandler._logger.info(log_line)
metrics.metrics_start(log_line)
@@ -130,7 +272,7 @@ class DeployHandler(object):
res = None
try:
- res = DeployHandler._requests_session.post(
+ res = DeployHandler._requests_session.put(
DeployHandler._url_policy, json=message, headers=headers,
**DeployHandler._custom_kwargs
)
@@ -149,8 +291,8 @@ class DeployHandler(object):
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- log_line = "response {0} from {1}: text={2}{3}" \
- .format(res.status_code, log_action, res.text, log_data)
+ log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action,
+ res.text, log_data)
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
@@ -159,15 +301,89 @@ class DeployHandler(object):
DeployHandler._logger.info(log_line)
result = res.json() or {}
+ DeployHandler._server_instance_changed(result, metrics)
+
+
+ @staticmethod
+ def get_deployed_policies(audit, rediscover=False):
+ """
+ Retrieves policies and policy-filters from components
+ that were deployed by deployment-handler
+ """
+ DeployHandler._lazy_init(audit, rediscover)
+ metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
+ targetServiceName=DeployHandler._url_policy)
+ headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+
+ log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy)
+ log_data = " headers={}".format(json.dumps(headers))
+ log_line = log_action + log_data
+ DeployHandler._logger.info(log_line)
+ metrics.metrics_start(log_line)
+
+ if not DeployHandler._url:
+ error_msg = "no url found to {0}".format(log_line)
+ DeployHandler._logger.error(error_msg)
+ metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ metrics.metrics(error_msg)
+ return None, None
+
+ res = None
+ try:
+ res = DeployHandler._requests_session.get(
+ DeployHandler._url_policy, headers=headers,
+ **DeployHandler._custom_kwargs
+ )
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed to {0} {1}: {2}{3}"
+ .format(log_action, type(ex).__name__, str(ex), log_data))
+ DeployHandler._logger.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None, None
+
+ metrics.set_http_status_code(res.status_code)
+ audit.set_http_status_code(res.status_code)
+
+ log_line = ("response {0} from {1}: text={2}{3}"
+ .format(res.status_code, log_action, res.text, log_data))
+ metrics.metrics(log_line)
+
+ if res.status_code != requests.codes.ok:
+ DeployHandler._logger.error(log_line)
+ return None, None
+
+ result = res.json() or {}
+ DeployHandler._server_instance_changed(result, metrics)
+
+ policies = result.get(POLICIES, {})
+ policy_filters = result.get(POLICY_FILTERS, {})
+ if not policies and not policy_filters:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ DeployHandler._logger.warning(audit.warn(
+ "found no deployed policies or policy-filters: {}".format(log_line),
+ error_code=AuditResponseCode.DATA_ERROR))
+ return policies, policy_filters
+
+ DeployHandler._logger.info(log_line)
+ return policies, policy_filters
+
+ @staticmethod
+ def _server_instance_changed(result, metrics):
+ """Checks whether the deployment-handler instance changed since last call."""
prev_server_instance_uuid = DeployHandler._server_instance_uuid
DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
- deployment_handler_changed = (prev_server_instance_uuid
- and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
- if deployment_handler_changed:
+ if (prev_server_instance_uuid
+ and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
+ DeployHandler.server_instance_changed = True
+
log_line = ("deployment_handler_changed: {1} != {0}"
.format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
metrics.info(log_line)
DeployHandler._logger.info(log_line)
-
- return deployment_handler_changed
diff --git a/policyhandler/onap/CommonLogger.py b/policyhandler/onap/CommonLogger.py
index 957b3aa..25b093f 100644
--- a/policyhandler/onap/CommonLogger.py
+++ b/policyhandler/onap/CommonLogger.py
@@ -1,953 +1,963 @@
-#!/usr/bin/python
-# -*- indent-tabs-mode: nil -*- vi: set expandtab:
-
-# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-"""ECOMP Common Logging library in Python.
-
-CommonLogger.py
-
-Original Written by: Terry Schmalzried
-Date written: October 1, 2015
-Last updated: December 1, 2016
-
-version 0.8
-"""
-
-from __future__ import print_function
-import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections
-
-class CommonLogger:
- """ECOMP Common Logging object.
-
- Public methods:
- __init__
- setFields
- debug
- info
- warn
- error
- fatal
- """
-
- UnknownFile = -1
- ErrorFile = 0
- DebugFile = 1
- AuditFile = 2
- MetricsFile = 3
- DateFmt = '%Y-%m-%dT%H:%M:%S'
- verbose = False
-
- def __init__(self, configFile, logKey, **kwargs):
- """Construct a Common Logger for one Log File.
-
- Arguments:
- configFile -- configuration filename.
- logKey -- the keyword in configFile that identifies the log filename.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages,
- one of CommonLogger.ErrorFile, CommonLogger.DebugFile,
- CommonLogger.AuditFile and CommonLogger.MetricsFile, or
- one of the strings "error", "debug", "audit" or "metrics".
- May also be set in the config file using a field named
- <logKey>Style (where <logKey> is the value of the logKey
- parameter). The keyword value overrides the value in the
- config file.
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._monitorFlag = False
-
- # Get configuration parameters
- self._logKey = str(logKey)
- self._configFile = str(configFile)
- self._rotateMethod = 'time'
- self._timeRotateIntervalType = 'midnight'
- self._timeRotateInterval = 1
- self._sizeMaxBytes = 0
- self._sizeRotateMode = 'a'
- self._socketHost = None
- self._socketPort = 0
- self._typeLogger = 'filelogger'
- self._backupCount = 6
- self._logLevelThreshold = self._intLogLevel('')
- self._logFile = None
- self._begTime = None
- self._begMsec = 0
- self._fields = {}
- self._fields["style"] = CommonLogger.UnknownFile
- try:
- self._configFileModified = os.path.getmtime(self._configFile)
- for line in open(self._configFile):
- line = line.split('#',1)[0] # remove comments
- if '=' in line:
- key, value = [x.strip() for x in line.split('=',1)]
- if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']:
- self._rotateMethod = value.lower()
- elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
- self._timeRotateIntervalType = value
- elif key == 'timeRotateInterval' and int( value ) > 0:
- self._timeRotateInterval = int( value )
- elif key == 'sizeMaxBytes' and int( value ) >= 0:
- self._sizeMaxBytes = int( value )
- elif key == 'sizeRotateMode' and value in ['a']:
- self._sizeRotateMode = value
- elif key == 'backupCount' and int( value ) >= 0:
- self._backupCount = int( value )
- elif key == self._logKey + 'SocketHost':
- self._socketHost = value
- elif key == self._logKey + 'SocketPort' and int( value ) == 0:
- self._socketPort = int( value )
- elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
- self._typeLogger = value.lower()
- elif key == self._logKey + 'LogLevel':
- self._logLevelThreshold = self._intLogLevel(value.upper())
- elif key == self._logKey + 'Style':
- self._fields["style"] = value
- elif key == self._logKey:
- self._logFile = value
- except Exception as x:
- print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)), file=sys.stderr)
- sys.exit(2)
- except:
- print("exception reading '%s' configuration file" %(self._configFile), file=sys.stderr)
- sys.exit(2)
-
- if self._logFile is None:
- print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey), file=sys.stderr)
- sys.exit(2)
-
-
- # initialize default log fields
- # timestamp will automatically be generated
- for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
- 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
- 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
- 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
- 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
- 'errorDescription' ]:
- if key in kwargs and kwargs[key] != None:
- self._fields[key] = kwargs[key]
-
- self._resetStyleField()
-
- # Set up logger
- self._logLock = threading.Lock()
- with self._logLock:
- self._logger = logging.getLogger(self._logKey)
- self._logger.propagate = False
- self._createLogger()
-
- self._defaultServerInfo()
-
- # spawn a thread to monitor configFile for logLevel and logFile changes
- self._monitorFlag = True
- self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=())
- self._monitorThread.daemon = True
- self._monitorThread.start()
-
-
- def _createLogger(self):
- if self._typeLogger == 'filelogger':
- self._mkdir_p(self._logFile)
- if self._rotateMethod == 'time':
- self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \
- when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \
- backupCount=self._backupCount, encoding=None, delay=False, utc=True)
- elif self._rotateMethod == 'size':
- self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
- mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
- backupCount=self._backupCount, encoding=None, delay=False)
-
- else:
- self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
- mode=self._sizeRotateMode, \
- encoding=None, delay=False)
- elif self._typeLogger == 'stderrlogger':
- self._logHandler = logging.handlers.StreamHandler(sys.stderr)
- elif self._typeLogger == 'stdoutlogger':
- self._logHandler = logging.handlers.StreamHandler(sys.stdout)
- elif self._typeLogger == 'socketlogger':
- self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort)
- elif self._typeLogger == 'nulllogger':
- self._logHandler = logging.handlers.NullHandler()
-
- if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile:
- self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt)
- else:
- self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
- self._logFormatter.converter = time.gmtime
- self._logHandler.setFormatter(self._logFormatter)
- self._logger.addHandler(self._logHandler)
-
- def _resetStyleField(self):
- styleFields = ["error", "debug", "audit", "metrics"]
- if self._fields['style'] in styleFields:
- self._fields['style'] = styleFields.index(self._fields['style'])
-
- def __del__(self):
- if self._monitorFlag == False:
- return
-
- self._monitorFlag = False
-
- if self._monitorThread is not None and self._monitorThread.is_alive():
- self._monitorThread.join()
-
- self._monitorThread = None
-
-
- def _defaultServerInfo(self):
-
- # If not set or purposely set = None, then set default
- if self._fields.get('server') is None:
- try:
- self._fields['server'] = socket.getfqdn()
- except Exception as err:
- try:
- self._fields['server'] = socket.gethostname()
- except Exception as err:
- self._fields['server'] = ""
-
- # If not set or purposely set = None, then set default
- if self._fields.get('serverIPAddress') is None:
- try:
- self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server'])
- except Exception as err:
- self._fields['serverIPAddress'] = ""
-
-
- def _monitorConfigFile(self):
- while self._monitorFlag:
- try:
- fileTime = os.path.getmtime(self._configFile)
- if fileTime > self._configFileModified:
- self._configFileModified = fileTime
- ReopenLogFile = False
- logFile = self._logFile
- with open(self._configFile) as fp:
- for line in fp:
- line = line.split('#',1)[0] # remove comments
- if '=' in line:
- key, value = [x.strip() for x in line.split('=',1)]
- if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value:
- self._rotateMethod = value.lower()
- ReopenLogFile = True
- elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
- self._timeRotateIntervalType = value
- ReopenLogFile = True
- elif key == 'timeRotateInterval' and int( value ) > 0:
- self._timeRotateInterval = int( value )
- ReopenLogFile = True
- elif key == 'sizeMaxBytes' and int( value ) >= 0:
- self._sizeMaxBytes = int( value )
- ReopenLogFile = True
- elif key == 'sizeRotateMode' and value in ['a']:
- self._sizeRotateMode = value
- ReopenLogFile = True
- elif key == 'backupCount' and int( value ) >= 0:
- self._backupCount = int( value )
- ReopenLogFile = True
- elif key == self._logKey + 'SocketHost' and self._socketHost != value:
- self._socketHost = value
- ReopenLogFile = True
- elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ):
- self._socketPort = int( value )
- ReopenLogFile = True
- elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ):
- self._logLevelThreshold = self._intLogLevel(value.upper())
- elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
- self._typeLogger = value.lower()
- ReopenLogFile = True
- elif key == self._logKey + 'Style':
- self._fields["style"] = value
- self._resetStyleField()
- elif key == self._logKey and self._logFile != value:
- logFile = value
- ReopenLogFile = True
- if ReopenLogFile:
- with self._logLock:
- self._logger.removeHandler(self._logHandler)
- self._logFile = logFile
- self._createLogger()
- except Exception as err:
- pass
-
- time.sleep(5)
-
-
- def setFields(self, **kwargs):
- """Set default values for log fields.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
- 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
- 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
- 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
- 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
- 'errorDescription' ]:
- if key in kwargs:
- if kwargs[key] != None:
- self._fields[key] = kwargs[key]
- elif key in self._fields:
- del self._fields[key]
-
- self._defaultServerInfo()
-
-
- def debug(self, message, **kwargs):
- """Write a DEBUG level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs)
-
- def info(self, message, **kwargs):
- """Write an INFO level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('INFO', message, errorCategory = 'INFO', **kwargs)
-
- def warn(self, message, **kwargs):
- """Write a WARN level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('WARN', message, errorCategory = 'WARN', **kwargs)
-
- def error(self, message, **kwargs):
- """Write an ERROR level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('ERROR', message, errorCategory = 'ERROR', **kwargs)
-
- def fatal(self, message, **kwargs):
- """Write a FATAL level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('FATAL', message, errorCategory = 'FATAL', **kwargs)
-
- def _log(self, logLevel, message, **kwargs):
- """Write a message to the log file.
-
- Arguments:
- logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record.
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- # timestamp will automatically be inserted
- style = int(self._getVal('style', '', **kwargs))
- requestID = self._getVal('requestID', '', **kwargs)
- serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs)
- threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs)
- serverName = self._getVal('serverName', '', **kwargs)
- serviceName = self._getVal('serviceName', '', **kwargs)
- instanceUUID = self._getVal('instanceUUID', '', **kwargs)
- upperLogLevel = self._noSep(logLevel.upper())
- severity = self._getVal('severity', '', **kwargs)
- serverIPAddress = self._getVal('serverIPAddress', '', **kwargs)
- server = self._getVal('server', '', **kwargs)
- IPAddress = self._getVal('IPAddress', '', **kwargs)
- className = self._getVal('className', '', **kwargs)
- timer = self._getVal('timer', '', **kwargs)
- partnerName = self._getVal('partnerName', '', **kwargs)
- targetEntity = self._getVal('targetEntity', '', **kwargs)
- targetServiceName = self._getVal('targetServiceName', '', **kwargs)
- statusCode = self._getVal('statusCode', '', **kwargs)
- responseCode = self._getVal('responseCode', '', **kwargs)
- responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs))
- processKey = self._getVal('processKey', '', **kwargs)
- targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs)
- customField1 = self._getVal('customField1', '', **kwargs)
- customField2 = self._getVal('customField2', '', **kwargs)
- customField3 = self._getVal('customField3', '', **kwargs)
- customField4 = self._getVal('customField4', '', **kwargs)
- errorCategory = self._getVal('errorCategory', '', **kwargs)
- errorCode = self._getVal('errorCode', '', **kwargs)
- errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs))
- nbegTime = self._getArg('begTime', {}, **kwargs)
-
- detailMessage = self._noSep(message)
- if bool(re.match(r" *$", detailMessage)):
- return # don't log empty messages
-
- useLevel = self._intLogLevel(upperLogLevel)
- if CommonLogger.verbose: print("logger STYLE=%s" % style)
- if useLevel < self._logLevelThreshold:
- if CommonLogger.verbose: print("skipping because of level")
- pass
- else:
- with self._logLock:
- if style == CommonLogger.ErrorFile:
- if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
- errorCategory, errorCode, errorDescription, detailMessage))
- elif style == CommonLogger.DebugFile:
- if CommonLogger.verbose: print("using CommonLogger.DebugFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
- severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
- elif style == CommonLogger.AuditFile:
- if CommonLogger.verbose: print("using CommonLogger.AuditFile")
- endAuditTime, endAuditMsec = self._getTime()
- if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
- d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- elif self._begTime is not None:
- d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- else:
- d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- self._begTime = None
- unused = ""
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
- severity, serverIPAddress, timer, server, IPAddress, className, unused,
- processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d)
- elif style == CommonLogger.MetricsFile:
- if CommonLogger.verbose: print("using CommonLogger.MetricsFile")
- endMetricsTime, endMetricsMsec = self._getTime()
- if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
- d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- elif self._begTime is not None:
- d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- else:
- d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- self._begTime = None
- unused = ""
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
- instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress,
- className, unused, processKey, targetVirtualEntity, customField1, customField2,
- customField3, customField4, detailMessage), extra=d)
- else:
- print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"])
-
- def _getTime(self):
- ct = time.time()
- lt = time.localtime(ct)
- return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000)
-
- def setStartRecordEvent(self):
- """
- Set the start time to be saved for both audit and metrics records
- """
- self._begTime, self._begMsec = self._getTime()
-
- def getStartRecordEvent(self):
- """
- Retrieve the start time to be used for either audit and metrics records
- """
- begTime, begMsec = self._getTime()
- return {'begTime':begTime, 'begMsec':begMsec}
-
- def _getVal(self, key, default, **kwargs):
- val = self._fields.get(key)
- if key in kwargs: val = kwargs[key]
- if val is None: val = default
- return self._noSep(val)
-
- def _getArg(self, key, default, **kwargs):
- val = None
- if key in kwargs: val = kwargs[key]
- if val is None: val = default
- return val
-
- def _noSep(self, message):
- if message is None: return ''
- return re.sub(r'[\|\n]', ' ', str(message))
-
- def _intLogLevel(self, logLevel):
- if logLevel == 'FATAL': useLevel = 50
- elif logLevel == 'ERROR': useLevel = 40
- elif logLevel == 'WARN': useLevel = 30
- elif logLevel == 'INFO': useLevel = 20
- elif logLevel == 'DEBUG': useLevel = 10
- else: useLevel = 0
- return useLevel
-
- def _mkdir_p(self, filename):
- """Create missing directories from a full filename path like mkdir -p"""
-
- if filename is None:
- return
-
- folder=os.path.dirname(filename)
-
- if folder == "":
- return
-
- if not os.path.exists(folder):
- try:
- os.makedirs(folder)
- except OSError as err:
- print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror), file=sys.stderr)
- sys.exit(2)
- except Exception as err:
- print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)), file=sys.stderr)
- sys.exit(2)
-
-if __name__ == "__main__":
-
- def __checkOneTime(line):
- format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
- m = re.match(format, line)
- if not m:
- print("ERROR: time string did not match proper time format, %s" %line)
- print("\t: format=%s" % format)
- return 1
- return 0
-
- def __checkTwoTimes(line, different):
- format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]'
- m = re.match(format, line)
- if not m:
- print("ERROR: time strings did not match proper time format, %s" %line)
- print("\t: format=%s" % format)
- return 1
- second1 = int(m.group(1))
- msec1 = int(m.group(2))
- second2 = int(m.group(3))
- msec2 = int(m.group(4))
- if second1 > second2: second2 += 60
- t1 = second1 * 1000 + msec1
- t2 = second2 * 1000 + msec2
- diff = t2 - t1
- # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff))
- if different:
- if diff < 500:
- print("ERROR: times did not differ enough: %s" % line)
- return 1
- else:
- if diff > 10:
- print("ERROR: times were too far apart: %s" % line)
- return 1
- return 0
-
- def __checkBegTime(line):
- format = "begTime should be ([-0-9T:]+)"
- # print("checkBegTime(%s)" % line)
- strt = 'begTime should be '
- i = line.index(strt)
- rest = line[i+len(strt):].rstrip()
- if not line.startswith(rest + ","):
- print("ERROR: line %s should start with %s" % (line,rest))
- return 1
- return 0
-
- def __checkLog(logfile, numLines, numFields):
- lineCount = 0
- errorCount = 0
- with open(logfile, "r") as fp:
- for line in fp:
- # print("saw line %s" % line)
- lineCount += 1
- c = line.count('|')
- if c != numFields:
- print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line))
- errorCount += 1
- if re.search("should not appear", line):
- print("ERROR: a line appeared that should not have appeared, %s" % line)
- errorCount += 1
- elif re.search("single time", line):
- errorCount += __checkOneTime(line)
- elif re.search("time should be the same", line):
- errorCount += __checkTwoTimes(line, different=False)
- elif re.search("time should be ", line):
- errorCount += __checkTwoTimes(line, different=True)
- elif re.search("begTime should be ", line):
- errorCount += __checkBegTime(line)
- else:
- print("ERROR: an unknown message appeared, %s" % line)
- errorCount += 1
-
- if lineCount != numLines:
- print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount))
- errorCount += 1
- return errorCount
-
- import os, argparse
- parser = argparse.ArgumentParser(description="test the CommonLogger functions")
- parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
- parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
- args = parser.parse_args()
-
- spid = str(os.getpid())
- if args.keeplogs:
- spid = ""
- logcfg = "/tmp/cl.log" + spid + ".cfg"
- errorLog = "/tmp/cl.error" + spid + ".log"
- metricsLog = "/tmp/cl.metrics" + spid + ".log"
- auditLog = "/tmp/cl.audit" + spid + ".log"
- debugLog = "/tmp/cl.debug" + spid + ".log"
- if args.verbose: CommonLogger.verbose = True
-
- import atexit
- def cleanupTmps():
- for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]:
- try:
- os.remove(f)
- except:
- pass
- if not args.keeplogs:
- atexit.register(cleanupTmps)
-
- with open(logcfg, "w") as o:
- o.write("error = " + errorLog + "\n" +
- "errorLogLevel = WARN\n" +
- "metrics = " + metricsLog + "\n" +
- "metricsLogLevel = INFO\n" +
- "audit = " + auditLog + "\n" +
- "auditLogLevel = INFO\n" +
- "debug = " + debugLog + "\n" +
- "debugLogLevel = DEBUG\n")
-
- import uuid
- instanceUUID = uuid.uuid1()
- serviceName = "testharness"
- errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName)
- debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName)
- auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName)
- metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName)
-
- testsRun = 0
- errorCount = 0
- errorLogger.debug("error calling debug (should not appear)")
- errorLogger.info("error calling info (should not appear)")
- errorLogger.warn("error calling warn (single time)")
- errorLogger.error("error calling error (single time)")
- errorLogger.setStartRecordEvent()
- time.sleep(1)
- errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)")
- testsRun += 6
- errorCount += __checkLog(errorLog, 3, 10)
-
- auditLogger.debug("audit calling debug (should not appear)")
- auditLogger.info("audit calling info (time should be the same)")
- auditLogger.warn("audit calling warn (time should be the same)")
- auditLogger.error("audit calling error (time should be the same)")
- bt = auditLogger.getStartRecordEvent()
- # print("bt=%s" % bt)
- time.sleep(1)
- auditLogger.setStartRecordEvent()
- time.sleep(1)
- auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)")
- time.sleep(1)
- auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
- testsRun += 7
- errorCount += __checkLog(auditLog, 5, 25)
-
- debugLogger.debug("debug calling debug (single time)")
- debugLogger.info("debug calling info (single time)")
- debugLogger.warn("debug calling warn (single time)")
- debugLogger.setStartRecordEvent()
- time.sleep(1)
- debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)")
- debugLogger.fatal("debug calling fatal (single time)")
- errorCount += __checkLog(debugLog, 5, 13)
- testsRun += 6
-
- metricsLogger.debug("metrics calling debug (should not appear)")
- metricsLogger.info("metrics calling info (time should be the same)")
- metricsLogger.warn("metrics calling warn (time should be the same)")
- bt = metricsLogger.getStartRecordEvent()
- time.sleep(1)
- metricsLogger.setStartRecordEvent()
- time.sleep(1)
- metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different")
- metricsLogger.fatal("metrics calling fatal (time should be the same)")
- time.sleep(1)
- metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
- testsRun += 6
- errorCount += __checkLog(metricsLog, 5, 28)
-
- print("%d tests run, %d errors found" % (testsRun, errorCount))
+#!/usr/bin/python
+# -*- indent-tabs-mode: nil -*- vi: set expandtab:
+
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""ECOMP Common Logging library in Python.
+
+CommonLogger.py
+
+Original Written by: Terry Schmalzried
+Date written: October 1, 2015
+Last updated: December 1, 2016
+
+version 0.8
+"""
+
+from __future__ import print_function
+import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections
+
+class CommonLogger:
+ """ECOMP Common Logging object.
+
+ Public methods:
+ __init__
+ setFields
+ debug
+ info
+ warn
+ error
+ fatal
+ """
+
+ UnknownFile = -1
+ ErrorFile = 0
+ DebugFile = 1
+ AuditFile = 2
+ MetricsFile = 3
+ DateFmt = '%Y-%m-%dT%H:%M:%S'
+ verbose = False
+
+ def __init__(self, configFile, logKey, **kwargs):
+ """Construct a Common Logger for one Log File.
+
+ Arguments:
+ configFile -- configuration filename.
+ logKey -- the keyword in configFile that identifies the log filename.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages,
+ one of CommonLogger.ErrorFile, CommonLogger.DebugFile,
+ CommonLogger.AuditFile and CommonLogger.MetricsFile, or
+ one of the strings "error", "debug", "audit" or "metrics".
+ May also be set in the config file using a field named
+ <logKey>Style (where <logKey> is the value of the logKey
+ parameter). The keyword value overrides the value in the
+ config file.
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._monitorFlag = False
+
+ # Get configuration parameters
+ self._logKey = str(logKey)
+ self._configFile = str(configFile)
+ self._rotateMethod = 'time'
+ self._timeRotateIntervalType = 'midnight'
+ self._timeRotateInterval = 1
+ self._sizeMaxBytes = 0
+ self._sizeRotateMode = 'a'
+ self._socketHost = None
+ self._socketPort = 0
+ self._typeLogger = 'filelogger'
+ self._backupCount = 6
+ self._logLevelThreshold = self._intLogLevel('')
+ self._logFile = None
+ self._begTime = None
+ self._begMsec = 0
+ self._fields = {}
+ self._fields["style"] = CommonLogger.UnknownFile
+ try:
+ self._configFileModified = os.path.getmtime(self._configFile)
+ for line in open(self._configFile):
+ line = line.split('#',1)[0] # remove comments
+ if '=' in line:
+ key, value = [x.strip() for x in line.split('=',1)]
+ if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']:
+ self._rotateMethod = value.lower()
+ elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
+ self._timeRotateIntervalType = value
+ elif key == 'timeRotateInterval' and int( value ) > 0:
+ self._timeRotateInterval = int( value )
+ elif key == 'sizeMaxBytes' and int( value ) >= 0:
+ self._sizeMaxBytes = int( value )
+ elif key == 'sizeRotateMode' and value in ['a']:
+ self._sizeRotateMode = value
+ elif key == 'backupCount' and int( value ) >= 0:
+ self._backupCount = int( value )
+ elif key == self._logKey + 'SocketHost':
+ self._socketHost = value
+ elif key == self._logKey + 'SocketPort' and int( value ) == 0:
+ self._socketPort = int( value )
+ elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
+ self._typeLogger = value.lower()
+ elif key == self._logKey + 'LogLevel':
+ self._logLevelThreshold = self._intLogLevel(value.upper())
+ elif key == self._logKey + 'Style':
+ self._fields["style"] = value
+ elif key == self._logKey:
+ self._logFile = value
+ except Exception as x:
+ print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)), file=sys.stderr)
+ sys.exit(2)
+ except:
+ print("exception reading '%s' configuration file" %(self._configFile), file=sys.stderr)
+ sys.exit(2)
+
+ if self._logFile is None:
+ print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey), file=sys.stderr)
+ sys.exit(2)
+
+
+ # initialize default log fields
+ # timestamp will automatically be generated
+ for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
+ 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
+ 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
+ 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
+ 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
+ 'errorDescription' ]:
+ if key in kwargs and kwargs[key] != None:
+ self._fields[key] = kwargs[key]
+
+ self._resetStyleField()
+
+ # Set up logger
+ self._logLock = threading.Lock()
+ with self._logLock:
+ self._logger = logging.getLogger(self._logKey)
+ self._logger.propagate = False
+ self._createLogger()
+
+ self._defaultServerInfo()
+
+ # spawn a thread to monitor configFile for logLevel and logFile changes
+ self._monitorFlag = True
+ self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=())
+ self._monitorThread.daemon = True
+ self._monitorThread.start()
+
+
+ def _createLogger(self):
+ if self._typeLogger == 'filelogger':
+ self._mkdir_p(self._logFile)
+ if self._rotateMethod == 'time':
+ self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \
+ when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \
+ backupCount=self._backupCount, encoding=None, delay=False, utc=True)
+ elif self._rotateMethod == 'size':
+ self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
+ mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
+ backupCount=self._backupCount, encoding=None, delay=False)
+
+ else:
+ self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
+ mode=self._sizeRotateMode, \
+ encoding=None, delay=False)
+ elif self._typeLogger == 'stderrlogger':
+ self._logHandler = logging.handlers.StreamHandler(sys.stderr)
+ elif self._typeLogger == 'stdoutlogger':
+ self._logHandler = logging.handlers.StreamHandler(sys.stdout)
+ elif self._typeLogger == 'socketlogger':
+ self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort)
+ elif self._typeLogger == 'nulllogger':
+ self._logHandler = logging.handlers.NullHandler()
+
+ if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile:
+ self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt)
+ else:
+ self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
+ self._logFormatter.converter = time.gmtime
+ self._logHandler.setFormatter(self._logFormatter)
+ self._logger.addHandler(self._logHandler)
+
+ def _resetStyleField(self):
+ styleFields = ["error", "debug", "audit", "metrics"]
+ if self._fields['style'] in styleFields:
+ self._fields['style'] = styleFields.index(self._fields['style'])
+
+ def __del__(self):
+ if self._monitorFlag == False:
+ return
+
+ self._monitorFlag = False
+
+ if self._monitorThread is not None and self._monitorThread.is_alive():
+ self._monitorThread.join()
+
+ self._monitorThread = None
+
+
+ def _defaultServerInfo(self):
+
+ # If not set or purposely set = None, then set default
+ if self._fields.get('server') is None:
+ try:
+ self._fields['server'] = socket.getfqdn()
+ except Exception as err:
+ try:
+ self._fields['server'] = socket.gethostname()
+ except Exception as err:
+ self._fields['server'] = ""
+
+ # If not set or purposely set = None, then set default
+ if self._fields.get('serverIPAddress') is None:
+ try:
+ self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server'])
+ except Exception as err:
+ self._fields['serverIPAddress'] = ""
+
+
+ def _monitorConfigFile(self):
+ while self._monitorFlag:
+ try:
+ fileTime = os.path.getmtime(self._configFile)
+ if fileTime > self._configFileModified:
+ self._configFileModified = fileTime
+ ReopenLogFile = False
+ logFile = self._logFile
+ with open(self._configFile) as fp:
+ for line in fp:
+ line = line.split('#',1)[0] # remove comments
+ if '=' in line:
+ key, value = [x.strip() for x in line.split('=',1)]
+ if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value:
+ self._rotateMethod = value.lower()
+ ReopenLogFile = True
+ elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
+ self._timeRotateIntervalType = value
+ ReopenLogFile = True
+ elif key == 'timeRotateInterval' and int( value ) > 0:
+ self._timeRotateInterval = int( value )
+ ReopenLogFile = True
+ elif key == 'sizeMaxBytes' and int( value ) >= 0:
+ self._sizeMaxBytes = int( value )
+ ReopenLogFile = True
+ elif key == 'sizeRotateMode' and value in ['a']:
+ self._sizeRotateMode = value
+ ReopenLogFile = True
+ elif key == 'backupCount' and int( value ) >= 0:
+ self._backupCount = int( value )
+ ReopenLogFile = True
+ elif key == self._logKey + 'SocketHost' and self._socketHost != value:
+ self._socketHost = value
+ ReopenLogFile = True
+ elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ):
+ self._socketPort = int( value )
+ ReopenLogFile = True
+ elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ):
+ self._logLevelThreshold = self._intLogLevel(value.upper())
+ elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
+ self._typeLogger = value.lower()
+ ReopenLogFile = True
+ elif key == self._logKey + 'Style':
+ self._fields["style"] = value
+ self._resetStyleField()
+ elif key == self._logKey and self._logFile != value:
+ logFile = value
+ ReopenLogFile = True
+ if ReopenLogFile:
+ with self._logLock:
+ self._logger.removeHandler(self._logHandler)
+ self._logFile = logFile
+ self._createLogger()
+ except Exception as err:
+ pass
+
+ time.sleep(5)
+
+
+ def setFields(self, **kwargs):
+ """Set default values for log fields.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
+ 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
+ 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
+ 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
+ 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
+ 'errorDescription' ]:
+ if key in kwargs:
+ if kwargs[key] != None:
+ self._fields[key] = kwargs[key]
+ elif key in self._fields:
+ del self._fields[key]
+
+ self._defaultServerInfo()
+
+
+ def debug(self, message, **kwargs):
+ """Write a DEBUG level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs)
+
+ def info(self, message, **kwargs):
+ """Write an INFO level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('INFO', message, errorCategory = 'INFO', **kwargs)
+
+ def warn(self, message, **kwargs):
+ """Write a WARN level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('WARN', message, errorCategory = 'WARN', **kwargs)
+
+ def error(self, message, **kwargs):
+ """Write an ERROR level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('ERROR', message, errorCategory = 'ERROR', **kwargs)
+
+ def fatal(self, message, **kwargs):
+ """Write a FATAL level message to the log file.
+
+ Arguments:
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ self._log('FATAL', message, errorCategory = 'FATAL', **kwargs)
+
+ def _log(self, logLevel, message, **kwargs):
+ """Write a message to the log file.
+
+ Arguments:
+ logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record.
+ message -- value for the last log record field.
+
+ Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
+ style -- the log file format (style) to use when writing log messages
+ requestID (dame) -- optional default value for this log record field.
+ serviceInstanceID (am) -- optional default value for this log record field.
+ threadID (am) -- optional default value for this log record field.
+ serverName (am) -- optional default value for this log record field.
+ serviceName (am) -- optional default value for this log record field.
+ instanceUUID (am) -- optional default value for this log record field.
+ severity (am) -- optional default value for this log record field.
+ serverIPAddress (am) -- optional default value for this log record field.
+ server (am) -- optional default value for this log record field.
+ IPAddress (am) -- optional default value for this log record field.
+ className (am) -- optional default value for this log record field.
+ timer (am) -- (ElapsedTime) optional default value for this log record field.
+ partnerName (ame) -- optional default value for this log record field.
+ targetEntity (me) -- optional default value for this log record field.
+ targetServiceName (me) -- optional default value for this log record field.
+ statusCode (am) -- optional default value for this log record field.
+ responseCode (am) -- optional default value for this log record field.
+ responseDescription (am) -- optional default value for this log record field.
+ processKey (am) -- optional default value for this log record field.
+ targetVirtualEntity (m) -- optional default value for this log record field.
+ customField1 (am) -- optional default value for this log record field.
+ customField2 (am) -- optional default value for this log record field.
+ customField3 (am) -- optional default value for this log record field.
+ customField4 (am) -- optional default value for this log record field.
+ errorCategory (e) -- optional default value for this log record field.
+ errorCode (e) -- optional default value for this log record field.
+ errorDescription (e) -- optional default value for this log record field.
+ begTime (am) -- optional starting time for this audit/metrics log record.
+
+ Note: the pipe '|' character is not allowed in any log record field.
+ """
+
+ # timestamp will automatically be inserted
+ style = int(self._getVal('style', '', **kwargs))
+ requestID = self._getVal('requestID', '', **kwargs)
+ serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs)
+ threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs)
+ serverName = self._getVal('serverName', '', **kwargs)
+ serviceName = self._getVal('serviceName', '', **kwargs)
+ instanceUUID = self._getVal('instanceUUID', '', **kwargs)
+ upperLogLevel = self._noSep(logLevel.upper())
+ severity = self._getVal('severity', '', **kwargs)
+ serverIPAddress = self._getVal('serverIPAddress', '', **kwargs)
+ server = self._getVal('server', '', **kwargs)
+ IPAddress = self._getVal('IPAddress', '', **kwargs)
+ className = self._getVal('className', '', **kwargs)
+ timer = self._getVal('timer', '', **kwargs)
+ partnerName = self._getVal('partnerName', '', **kwargs)
+ targetEntity = self._getVal('targetEntity', '', **kwargs)
+ targetServiceName = self._getVal('targetServiceName', '', **kwargs)
+ statusCode = self._getVal('statusCode', '', **kwargs)
+ responseCode = self._getVal('responseCode', '', **kwargs)
+ responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs))
+ processKey = self._getVal('processKey', '', **kwargs)
+ targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs)
+ customField1 = self._getVal('customField1', '', **kwargs)
+ customField2 = self._getVal('customField2', '', **kwargs)
+ customField3 = self._getVal('customField3', '', **kwargs)
+ customField4 = self._getVal('customField4', '', **kwargs)
+ errorCategory = self._getVal('errorCategory', '', **kwargs)
+ errorCode = self._getVal('errorCode', '', **kwargs)
+ errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs))
+ nbegTime = self._getArg('begTime', {}, **kwargs)
+
+ detailMessage = self._noSep(message)
+ if bool(re.match(r" *$", detailMessage)):
+ return # don't log empty messages
+
+ useLevel = self._intLogLevel(upperLogLevel)
+ if CommonLogger.verbose: print("logger STYLE=%s" % style)
+ if useLevel < self._logLevelThreshold:
+ if CommonLogger.verbose: print("skipping because of level")
+ pass
+ else:
+ with self._logLock:
+ if style == CommonLogger.ErrorFile:
+ if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
+ errorCategory, errorCode, errorDescription, detailMessage))
+ elif style == CommonLogger.DebugFile:
+ if CommonLogger.verbose: print("using CommonLogger.DebugFile")
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
+ severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
+ elif style == CommonLogger.AuditFile:
+ if CommonLogger.verbose: print("using CommonLogger.AuditFile")
+ endAuditTime, endAuditMsec, endSeconds = self._getTime()
+ if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
+ d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'begseconds': nbegTime['begSeconds'],
+ 'endtime': endAuditTime, 'endmsecs': endAuditMsec, 'endseconds': endSeconds }
+ elif self._begTime is not None:
+ d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'begseconds': self._begSeconds,
+ 'endtime': endAuditTime, 'endmsecs': endAuditMsec, 'endseconds': endSeconds }
+ else:
+ d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'begseconds': endSeconds,
+ 'endtime': endAuditTime, 'endmsecs': endAuditMsec, 'endseconds': endSeconds }
+ if not timer:
+ timer = int(d["endseconds"] * 1000.0 + d["endmsecs"] - d["begseconds"] * 1000.0 - d["begmsecs"])
+ self._begTime = self._begMsec = None
+ unused = ""
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+ statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
+ severity, serverIPAddress, timer, server, IPAddress, className, unused,
+ processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d)
+ elif style == CommonLogger.MetricsFile:
+ if CommonLogger.verbose: print("using CommonLogger.MetricsFile")
+ endMetricsTime, endMetricsMsec, endSeconds = self._getTime()
+ if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
+ d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'begseconds': nbegTime['begSeconds'],
+ 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec, 'endseconds': endSeconds }
+ elif self._begTime is not None:
+ d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'begseconds': self._begSeconds,
+ 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec, 'endseconds': endSeconds }
+ else:
+ d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'begseconds': endSeconds,
+ 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec, 'endseconds': endSeconds }
+ if not timer:
+ timer = int(d["endseconds"] * 1000.0 + d["endmsecs"] - d["begseconds"] * 1000.0 - d["begmsecs"])
+ self._begTime = self._begMsec = None
+ unused = ""
+ self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+ %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+ targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
+ instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress,
+ className, unused, processKey, targetVirtualEntity, customField1, customField2,
+ customField3, customField4, detailMessage), extra=d)
+ else:
+ print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"])
+
+ def _getTime(self):
+ ct = time.time()
+ lt = time.localtime(ct)
+ return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000, ct)
+
+ def setStartRecordEvent(self):
+ """
+ Set the start time to be saved for both audit and metrics records
+ """
+ self._begTime, self._begMsec, self._begSeconds = self._getTime()
+
+ def getStartRecordEvent(self):
+ """
+ Retrieve the start time to be used for either audit and metrics records
+ """
+ begTime, begMsec, begSeconds = self._getTime()
+ return {'begTime':begTime, 'begMsec':begMsec, 'begSeconds':begSeconds}
+
+ def _getVal(self, key, default, **kwargs):
+ val = self._fields.get(key)
+ if key in kwargs: val = kwargs[key]
+ if val is None: val = default
+ return self._noSep(val)
+
+ def _getArg(self, key, default, **kwargs):
+ val = None
+ if key in kwargs: val = kwargs[key]
+ if val is None: val = default
+ return val
+
+ def _noSep(self, message):
+ if message is None: return ''
+ return re.sub(r'[\|\n]', ' ', str(message))
+
+ def _intLogLevel(self, logLevel):
+ if logLevel == 'FATAL': useLevel = 50
+ elif logLevel == 'ERROR': useLevel = 40
+ elif logLevel == 'WARN': useLevel = 30
+ elif logLevel == 'INFO': useLevel = 20
+ elif logLevel == 'DEBUG': useLevel = 10
+ else: useLevel = 0
+ return useLevel
+
+ def _mkdir_p(self, filename):
+ """Create missing directories from a full filename path like mkdir -p"""
+
+ if filename is None:
+ return
+
+ folder=os.path.dirname(filename)
+
+ if folder == "":
+ return
+
+ if not os.path.exists(folder):
+ try:
+ os.makedirs(folder)
+ except OSError as err:
+ print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror), file=sys.stderr)
+ sys.exit(2)
+ except Exception as err:
+ print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)), file=sys.stderr)
+ sys.exit(2)
+
+if __name__ == "__main__":
+
+ def __checkOneTime(line):
+ format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
+ m = re.match(format, line)
+ if not m:
+ print("ERROR: time string did not match proper time format, %s" %line)
+ print("\t: format=%s" % format)
+ return 1
+ return 0
+
+ def __checkTwoTimes(line, different):
+ format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]'
+ m = re.match(format, line)
+ if not m:
+ print("ERROR: time strings did not match proper time format, %s" %line)
+ print("\t: format=%s" % format)
+ return 1
+ second1 = int(m.group(1))
+ msec1 = int(m.group(2))
+ second2 = int(m.group(3))
+ msec2 = int(m.group(4))
+ if second1 > second2: second2 += 60
+ t1 = second1 * 1000 + msec1
+ t2 = second2 * 1000 + msec2
+ diff = t2 - t1
+ # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff))
+ if different:
+ if diff < 500:
+ print("ERROR: times did not differ enough: %s" % line)
+ return 1
+ else:
+ if diff > 10:
+ print("ERROR: times were too far apart: %s" % line)
+ return 1
+ return 0
+
+ def __checkBegTime(line):
+ format = "begTime should be ([-0-9T:]+)"
+ # print("checkBegTime(%s)" % line)
+ strt = 'begTime should be '
+ i = line.index(strt)
+ rest = line[i+len(strt):].rstrip()
+ if not line.startswith(rest + ","):
+ print("ERROR: line %s should start with %s" % (line,rest))
+ return 1
+ return 0
+
+ def __checkLog(logfile, numLines, numFields):
+ lineCount = 0
+ errorCount = 0
+ with open(logfile, "r") as fp:
+ for line in fp:
+ # print("saw line %s" % line)
+ lineCount += 1
+ c = line.count('|')
+ if c != numFields:
+ print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line))
+ errorCount += 1
+ if re.search("should not appear", line):
+ print("ERROR: a line appeared that should not have appeared, %s" % line)
+ errorCount += 1
+ elif re.search("single time", line):
+ errorCount += __checkOneTime(line)
+ elif re.search("time should be the same", line):
+ errorCount += __checkTwoTimes(line, different=False)
+ elif re.search("time should be ", line):
+ errorCount += __checkTwoTimes(line, different=True)
+ elif re.search("begTime should be ", line):
+ errorCount += __checkBegTime(line)
+ else:
+ print("ERROR: an unknown message appeared, %s" % line)
+ errorCount += 1
+
+ if lineCount != numLines:
+ print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount))
+ errorCount += 1
+ return errorCount
+
+ import os, argparse
+ parser = argparse.ArgumentParser(description="test the CommonLogger functions")
+ parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
+ parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
+ args = parser.parse_args()
+
+ spid = str(os.getpid())
+ if args.keeplogs:
+ spid = ""
+ logcfg = "/tmp/cl.log" + spid + ".cfg"
+ errorLog = "/tmp/cl.error" + spid + ".log"
+ metricsLog = "/tmp/cl.metrics" + spid + ".log"
+ auditLog = "/tmp/cl.audit" + spid + ".log"
+ debugLog = "/tmp/cl.debug" + spid + ".log"
+ if args.verbose: CommonLogger.verbose = True
+
+ import atexit
+ def cleanupTmps():
+ for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]:
+ try:
+ os.remove(f)
+ except:
+ pass
+ if not args.keeplogs:
+ atexit.register(cleanupTmps)
+
+ with open(logcfg, "w") as o:
+ o.write("error = " + errorLog + "\n" +
+ "errorLogLevel = WARN\n" +
+ "metrics = " + metricsLog + "\n" +
+ "metricsLogLevel = INFO\n" +
+ "audit = " + auditLog + "\n" +
+ "auditLogLevel = INFO\n" +
+ "debug = " + debugLog + "\n" +
+ "debugLogLevel = DEBUG\n")
+
+ import uuid
+ instanceUUID = uuid.uuid1()
+ serviceName = "testharness"
+ errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName)
+ metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName)
+
+ testsRun = 0
+ errorCount = 0
+ errorLogger.debug("error calling debug (should not appear)")
+ errorLogger.info("error calling info (should not appear)")
+ errorLogger.warn("error calling warn (single time)")
+ errorLogger.error("error calling error (single time)")
+ errorLogger.setStartRecordEvent()
+ time.sleep(1)
+ errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)")
+ testsRun += 6
+ errorCount += __checkLog(errorLog, 3, 10)
+
+ auditLogger.debug("audit calling debug (should not appear)")
+ auditLogger.info("audit calling info (time should be the same)")
+ auditLogger.warn("audit calling warn (time should be the same)")
+ auditLogger.error("audit calling error (time should be the same)")
+ bt = auditLogger.getStartRecordEvent()
+ # print("bt=%s" % bt)
+ time.sleep(1)
+ auditLogger.setStartRecordEvent()
+ time.sleep(1)
+ auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)")
+ time.sleep(1)
+ auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
+ testsRun += 7
+ errorCount += __checkLog(auditLog, 5, 25)
+
+ debugLogger.debug("debug calling debug (single time)")
+ debugLogger.info("debug calling info (single time)")
+ debugLogger.warn("debug calling warn (single time)")
+ debugLogger.setStartRecordEvent()
+ time.sleep(1)
+ debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)")
+ debugLogger.fatal("debug calling fatal (single time)")
+ errorCount += __checkLog(debugLog, 5, 13)
+ testsRun += 6
+
+ metricsLogger.debug("metrics calling debug (should not appear)")
+ metricsLogger.info("metrics calling info (time should be the same)")
+ metricsLogger.warn("metrics calling warn (time should be the same)")
+ bt = metricsLogger.getStartRecordEvent()
+ time.sleep(1)
+ metricsLogger.setStartRecordEvent()
+ time.sleep(1)
+ metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different")
+ metricsLogger.fatal("metrics calling fatal (time should be the same)")
+ time.sleep(1)
+ metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
+ testsRun += 6
+ errorCount += __checkLog(metricsLog, 5, 28)
+
+ print("%d tests run, %d errors found" % (testsRun, errorCount))
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index a007a26..69ddb86 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -225,6 +225,12 @@ class _Audit(object):
if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
self.max_http_status_code = max(http_status_code, self.max_http_status_code)
+ def reset_http_status_not_found(self):
+ """resets the highest(worst) http status code if data not found"""
+ with self._lock:
+ if self.max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value:
+ self.max_http_status_code = 0
+
def get_max_http_status_code(self):
"""returns the highest(worst) http status code"""
with self._lock:
@@ -244,31 +250,41 @@ class _Audit(object):
== AuditResponseCode.get_response_code(status_code).value
or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- def _get_response_status(self):
+ def _get_response_status(self, not_found_ok=None):
"""calculates the response status fields from max_http_status_code"""
max_http_status_code = self.get_max_http_status_code()
response_code = AuditResponseCode.get_response_code(max_http_status_code)
- success = (response_code.value == AuditResponseCode.SUCCESS.value)
+ success = ((response_code.value == AuditResponseCode.SUCCESS.value)
+ or (not_found_ok
+ and max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value))
response_description = AuditResponseCode.get_human_text(response_code)
return success, max_http_status_code, response_code, response_description
def is_success(self):
- """returns whether the response_code is success and a human text for response code"""
+ """returns whether the response_code is success"""
success, _, _, _ = self._get_response_status()
return success
+ def is_success_or_not_found(self):
+ """returns whether the response_code is success or 404 - not found"""
+ success, _, _, _ = self._get_response_status(not_found_ok=True)
+ return success
+
def debug(self, log_line, **kwargs):
"""debug - the debug=lowest level of logging"""
_Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+ return log_line
def info(self, log_line, **kwargs):
"""debug - the info level of logging"""
_Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+ return log_line
def info_requested(self, result=None, **kwargs):
"""info "requested ..." - the info level of logging"""
- self.info("requested {0} {1}".format(self.req_message, result or ""), \
- **self.merge_all_kwargs(**kwargs))
+ log_line = "requested {0} {1}".format(self.req_message, result or "")
+ self.info(log_line, **self.merge_all_kwargs(**kwargs))
+ return log_line
def warn(self, log_line, error_code=None, **kwargs):
"""debug+error - the warn level of logging"""
@@ -280,6 +296,7 @@ class _Audit(object):
_Audit._logger_debug.warn(log_line, **all_kwargs)
_Audit._logger_error.warn(log_line, **all_kwargs)
+ return log_line
def error(self, log_line, error_code=None, **kwargs):
"""debug+error - the error level of logging"""
@@ -291,6 +308,7 @@ class _Audit(object):
_Audit._logger_debug.error(log_line, **all_kwargs)
_Audit._logger_error.error(log_line, **all_kwargs)
+ return log_line
def fatal(self, log_line, error_code=None, **kwargs):
"""debug+error - the fatal level of logging"""
@@ -302,6 +320,7 @@ class _Audit(object):
_Audit._logger_debug.fatal(log_line, **all_kwargs)
_Audit._logger_error.fatal(log_line, **all_kwargs)
+ return log_line
@staticmethod
def hide_secrets(obj):
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 90ede47..51ac173 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -29,6 +29,10 @@ AUTO_CATCH_UP = "auto catch_up"
LATEST_POLICIES = "latest_policies"
REMOVED_POLICIES = "removed_policies"
ERRORED_POLICIES = "errored_policies"
-ERRORED_SCOPES = "errored_scopes"
-SCOPE_PREFIXES = "scope_prefixes"
POLICY_FILTER = "policy_filter"
+POLICY_FILTERS = "policy_filters"
+POLICIES = "policies"
+POLICY_VERSIONS = "policy_versions"
+MATCHING_CONDITIONS = "matchingConditions"
+POLICY_NAMES = "policy_names"
+POLICY_FILTER_MATCHES = "policy_filter_matches"
diff --git a/policyhandler/policy_matcher.py b/policyhandler/policy_matcher.py
new file mode 100644
index 0000000..8406f14
--- /dev/null
+++ b/policyhandler/policy_matcher.py
@@ -0,0 +1,236 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""policy-matcher matches the policies from deployment-handler to policies from policy-engine"""
+
+import json
+import logging
+import re
+
+from .deploy_handler import DeployHandler, PolicyUpdateMessage
+from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES,
+ MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER,
+ POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS)
+from .policy_rest import PolicyRest
+from .policy_utils import RegexCoarser
+
+
+class PolicyMatcher(object):
+ """policy-matcher - static class"""
+ _logger = logging.getLogger("policy_handler.policy_matcher")
+ PENDING_UPDATE = "pending_update"
+
+ @staticmethod
+ def get_latest_policies(audit):
+ """
+ find the latest policies from policy-engine for the deployed policies and policy-filters
+ """
+ deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
+
+ if not audit.is_success() or (not deployed_policies and not deployed_policy_filters):
+ error_txt = "failed to retrieve policies from deployment-handler"
+ PolicyMatcher._logger.error(error_txt)
+ return {"error": error_txt}, None
+
+ coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns(
+ audit, deployed_policies, deployed_policy_filters)
+
+ if not coarse_regex_patterns:
+ error_txt = ("failed to construct the coarse_regex_patterns from " +
+ "deployed_policies: {} and deployed_policy_filters: {}"
+ .format(deployed_policies, deployed_policy_filters))
+ PolicyMatcher._logger.error(error_txt)
+ return {"error": error_txt}, None
+
+ pdp_response = PolicyRest.get_latest_policies(
+ audit, policy_filters=[{POLICY_NAME: policy_name_pattern}
+ for policy_name_pattern in coarse_regex_patterns]
+ )
+
+ if not audit.is_success_or_not_found():
+ error_txt = "failed to retrieve policies from policy-engine"
+ PolicyMatcher._logger.error(error_txt)
+ return {"error": error_txt}, None
+
+ latest_policies = pdp_response.get(LATEST_POLICIES, {})
+ errored_policies = pdp_response.get(ERRORED_POLICIES, {})
+
+ removed_policies = dict(
+ (policy_id, True)
+ for (policy_id, deployed_policy) in deployed_policies.items()
+ if deployed_policy.get(POLICY_VERSIONS)
+ and policy_id not in latest_policies
+ and policy_id not in errored_policies
+ )
+
+ latest_policies, changed_policies, policy_filter_matches = PolicyMatcher.match_policies(
+ audit, latest_policies, deployed_policies, deployed_policy_filters)
+ errored_policies, _, _ = PolicyMatcher.match_policies(
+ audit, errored_policies, deployed_policies, deployed_policy_filters)
+
+ return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies},
+ PolicyUpdateMessage(changed_policies,
+ removed_policies,
+ policy_filter_matches))
+
+ @staticmethod
+ def calc_coarse_patterns(audit, deployed_policies, deployed_policy_filters):
+ """calculate the coarsed patterns on policy-names in policies and policy-filters"""
+ coarse_regex = RegexCoarser()
+ for policy_id in deployed_policies or {}:
+ coarse_regex.add_regex_pattern(policy_id)
+
+ for policy_filter in (deployed_policy_filters or {}).values():
+ policy_name_pattern = policy_filter.get(POLICY_FILTER, {}).get(POLICY_NAME)
+ coarse_regex.add_regex_pattern(policy_name_pattern)
+
+ coarse_regex_patterns = coarse_regex.get_coarse_regex_patterns()
+ PolicyMatcher._logger.debug(
+ audit.debug("coarse_regex_patterns({}) combined_regex_pattern({}) for patterns({})"
+ .format(coarse_regex_patterns,
+ coarse_regex.get_combined_regex_pattern(),
+ coarse_regex.patterns)))
+ return coarse_regex_patterns
+
+ @staticmethod
+ def match_to_deployed_policies(audit, policies_updated, policies_removed):
+ """match the policies_updated, policies_removed versus deployed policies"""
+ deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(
+ audit)
+ if not audit.is_success():
+ return {}, {}, {}
+
+ _, changed_policies, policy_filter_matches = PolicyMatcher.match_policies(
+ audit, policies_updated, deployed_policies, deployed_policy_filters)
+ policies_removed, _, _ = PolicyMatcher.match_policies(
+ audit, policies_removed, deployed_policies, deployed_policy_filters)
+
+ return changed_policies, policies_removed, policy_filter_matches
+
+ @staticmethod
+ def match_policies(audit, policies, deployed_policies, deployed_policy_filters):
+ """
+ Match policies to deployed policies either by policy_id or the policy-filters.
+
+ Also calculates the policies that changed in comparison to deployed policies
+ """
+ matching_policies = {}
+ changed_policies = {}
+ policy_filter_matches = {}
+
+ policies = policies or {}
+ deployed_policies = deployed_policies or {}
+ deployed_policy_filters = deployed_policy_filters or {}
+
+ for (policy_id, policy) in policies.items():
+ new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ deployed_policy = deployed_policies.get(policy_id)
+
+ if deployed_policy:
+ matching_policies[policy_id] = policy
+
+ policy_changed = (deployed_policy and new_version
+ and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE)
+ or {new_version} ^
+ deployed_policy.get(POLICY_VERSIONS, {}).keys()))
+ if policy_changed:
+ changed_policies[policy_id] = policy
+
+ in_filters = False
+ for (policy_filter_id, policy_filter) in deployed_policy_filters.items():
+ if not PolicyMatcher.match_policy_to_filter(
+ audit, policy_id, policy,
+ policy_filter_id, policy_filter.get(POLICY_FILTER)):
+ continue
+
+ if policy_changed or not deployed_policy:
+ in_filters = True
+ if policy_id not in policy_filter_matches:
+ policy_filter_matches[policy_id] = {}
+ policy_filter_matches[policy_id][policy_filter_id] = True
+
+ if not deployed_policy and in_filters:
+ matching_policies[policy_id] = policy
+ changed_policies[policy_id] = policy
+
+ return matching_policies, changed_policies, policy_filter_matches
+
+ @staticmethod
+ def match_policy_to_filter(audit, policy_id, policy, policy_filter_id, policy_filter):
+ """Match the policy to the policy-filter"""
+ if not policy_id or not policy or not policy_filter or not policy_filter_id:
+ return False
+
+ filter_policy_name = policy_filter.get(POLICY_NAME)
+ if not filter_policy_name:
+ return False
+
+ policy_body = policy.get(POLICY_BODY)
+ if not policy_body:
+ return False
+
+ policy_name = policy_body.get(POLICY_NAME)
+ if not policy_name:
+ return False
+
+ log_line = "policy {} to filter id {}: {}".format(json.dumps(policy_body),
+ policy_filter_id,
+ json.dumps(policy_filter))
+ # PolicyMatcher._logger.debug(audit.debug("matching {}...".format(log_line)))
+
+ if (filter_policy_name != policy_id and filter_policy_name != policy_name
+ and not re.match(filter_policy_name, policy_name)):
+ PolicyMatcher._logger.debug(
+ audit.debug("not match by policyName: {} != {}: {}"
+ .format(policy_name, filter_policy_name, log_line)))
+ return False
+
+ matching_conditions = policy_body.get(MATCHING_CONDITIONS, {})
+ if not isinstance(matching_conditions, dict):
+ return False
+
+ filter_onap_name = policy_filter.get("onapName")
+ policy_onap_name = matching_conditions.get("ONAPName")
+ if filter_onap_name and filter_onap_name != policy_onap_name:
+ PolicyMatcher._logger.debug(
+ audit.debug("not match by ONAPName: {} != {}: {}"
+ .format(policy_onap_name, filter_onap_name, log_line)))
+ return False
+
+ filter_config_name = policy_filter.get("configName")
+ policy_config_name = matching_conditions.get("ConfigName")
+ if filter_onap_name and filter_onap_name != policy_onap_name:
+ PolicyMatcher._logger.debug(
+ audit.debug("not match by configName: {} != {}: {}"
+ .format(policy_config_name, filter_config_name, log_line)))
+ return False
+
+ filter_config_attributes = policy_filter.get("configAttributes")
+ if filter_config_attributes and isinstance(filter_config_attributes, dict):
+ for filter_key, filter_config_attribute in filter_config_attributes.items():
+ if (filter_key not in matching_conditions
+ or filter_config_attribute != matching_conditions.get(filter_key)):
+ PolicyMatcher._logger.debug(
+ audit.debug("not match by configAttributes: {} != {}: {}"
+ .format(json.dumps(matching_conditions),
+ json.dumps(filter_config_attributes),
+ log_line)))
+ return False
+
+ PolicyMatcher._logger.debug(audit.debug("matched {}".format(log_line)))
+ return True
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index e1584a3..bb33cd5 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -22,26 +22,24 @@ thru web-socket to receive push notifications
on updates and removal of policies.
on receiving the policy-notifications, the policy-receiver
-filters them out by the policy scope(s) provided in policy-handler config
-and passes the notifications to policy-updater
+passes the notifications to policy-updater
"""
import json
import logging
-import re
import time
from threading import Lock, Thread
import websocket
from .config import Config
-from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
+from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
LOADED_POLICIES = 'loadedPolicies'
REMOVED_POLICIES = 'removedPolicies'
-POLICY_NAME = 'policyName'
POLICY_VER = 'versionNo'
+POLICY_MATCHES = 'matches'
class _PolicyReceiver(Thread):
"""web-socket to PolicyEngine"""
@@ -65,10 +63,6 @@ class _PolicyReceiver(Thread):
self._web_socket = None
- scope_prefixes = [scope_prefix.replace(".", "[.]")
- for scope_prefix in Config.settings["scope_prefixes"]]
- self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
- _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
self._policy_updater = PolicyUpdater()
self._policy_updater.start()
@@ -126,31 +120,29 @@ class _PolicyReceiver(Thread):
json.dumps(message))
return
- policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(LOADED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
- policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
- for policy in message.get(REMOVED_POLICIES, [])
- if self._policy_scopes.match(policy.get(POLICY_NAME))]
+ policies_updated = [
+ {POLICY_NAME: policy.get(POLICY_NAME),
+ POLICY_VERSION: policy.get(POLICY_VER),
+ MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})}
+ for policy in message.get(LOADED_POLICIES, [])
+ ]
+
+ policies_removed = [
+ {POLICY_NAME: removed_policy.get(POLICY_NAME),
+ POLICY_VERSION: removed_policy.get(POLICY_VER)}
+ for removed_policy in message.get(REMOVED_POLICIES, [])
+ ]
if not policies_updated and not policies_removed:
- _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
- self._policy_scopes.pattern)
+ _PolicyReceiver._logger.info("no policy updated or removed")
return
- audit = Audit(job_name="policy_update",
- req_message="policy-notification - updated[{0}], removed[{1}]"
- .format(len(policies_updated), len(policies_removed)),
- retry_get_config=True)
- self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+ self._policy_updater.policy_update(policies_updated, policies_removed)
except Exception as ex:
error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
"on_pdp_message", json.dumps(message))
_PolicyReceiver._logger.exception(error_msg)
- audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
- audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
-
def _on_ws_error(self, _, error):
"""report an error"""
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index c8018f6..39c26a5 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -29,9 +29,9 @@ import requests
from .config import Config
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
-from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES,
- POLICY_BODY, POLICY_CONFIG, POLICY_FILTER,
- POLICY_ID, POLICY_NAME, SCOPE_PREFIXES)
+from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS,
+ POLICY_ID, POLICY_NAME)
from .policy_utils import PolicyUtils
@@ -56,8 +56,6 @@ class PolicyRest(object):
_headers = None
_target_entity = None
_thread_pool_size = 4
- _scope_prefixes = None
- _scope_thread_pool_size = 4
_policy_retry_count = 1
_policy_retry_sleep = 0
@@ -70,7 +68,7 @@ class PolicyRest(object):
config = Config.settings[Config.FIELD_POLICY_ENGINE]
- pool_size = config.get("pool_connections", 20)
+ pool_size = Config.settings.get("pool_connections", 20)
PolicyRest._requests_session = requests.Session()
PolicyRest._requests_session.mount(
'https://',
@@ -81,24 +79,20 @@ class PolicyRest(object):
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
- PolicyRest._url_get_config = config["url"] \
- + config["path_api"] + PolicyRest.POLICY_GET_CONFIG
+ PolicyRest._url_get_config = (config["url"] + config["path_api"]
+ + PolicyRest.POLICY_GET_CONFIG)
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._scope_prefixes = Config.settings["scope_prefixes"]
- PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \
- len(PolicyRest._scope_prefixes))
PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
PolicyRest._logger.info(
- "PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
- PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers),
- json.dumps(PolicyRest._scope_prefixes))
+ "PolicyClient url(%s) headers(%s)",
+ PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers))
@staticmethod
def _pdp_get_config(audit, json_body):
@@ -180,8 +174,8 @@ class PolicyRest(object):
rslt = res_data[0]
if (rslt and not rslt.get(POLICY_NAME)
- and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
- and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
+ and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
+ and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
info_msg = "not found {0}".format(log_line)
@@ -281,22 +275,22 @@ class PolicyRest(object):
expect_policy_removed):
"""single attempt to get the latest policy for the policy_id from the policy-engine"""
- status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
+ status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
- PolicyRest._logger.debug("%s %s policy_configs: %s",
- status_code, policy_id, json.dumps(policy_configs or []))
+ PolicyRest._logger.debug("%s %s policy_bodies: %s",
+ status_code, policy_id, json.dumps(policy_bodies or []))
latest_policy = PolicyUtils.select_latest_policy(
- policy_configs, min_version_expected, ignore_policy_names
+ policy_bodies, min_version_expected, ignore_policy_names
)
if not latest_policy and not expect_policy_removed:
audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
- .format(policy_id, json.dumps(policy_configs or [])),
+ .format(policy_id, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR)
done = bool(latest_policy
- or (expect_policy_removed and not policy_configs)
+ or (expect_policy_removed and not policy_bodies)
or audit.is_serious_error(status_code))
return done, latest_policy, status_code
@@ -330,18 +324,17 @@ class PolicyRest(object):
def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
"""Get the latest policies of the list of policy_names from the policy-engine"""
PolicyRest._lazy_init()
- metrics = Metrics(
+ metrics_total = Metrics(
aud_parent=audit,
targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
targetServiceName=PolicyRest._url_get_config)
- metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
PolicyRest._logger.debug(str_metrics)
policies_to_find = {}
- for (policy_name, policy_version) in policies_updated:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id or not policy_version.isdigit():
+ for (policy_id, policy_version) in policies_updated:
+ if not policy_id or not policy_version or not policy_version.isdigit():
continue
policy = policies_to_find.get(policy_id)
if not policy:
@@ -354,18 +347,17 @@ class PolicyRest(object):
if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
- for (policy_name, _) in policies_removed:
- policy_id = PolicyUtils.extract_policy_id(policy_name)
+ for (policy_id, policy_names) in policies_removed:
if not policy_id:
continue
policy = policies_to_find.get(policy_id)
if not policy:
policies_to_find[policy_id] = {
POLICY_ID: policy_id,
- PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+ PolicyRest.IGNORE_POLICY_NAMES: policy_names
}
continue
- policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+ policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names)
apns = [(audit, policy_id,
policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
@@ -382,8 +374,8 @@ class PolicyRest(object):
pool.close()
pool.join()
- metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
- .format(str_metrics, len(policies), json.dumps(policies)))
+ metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}"
+ .format(str_metrics, len(policies), json.dumps(policies)))
updated_policies = dict((policy[POLICY_ID], policy)
for policy in policies
@@ -416,36 +408,26 @@ class PolicyRest(object):
@staticmethod
def _get_latest_policies(aud_policy_filter):
- """
- get the latest policies by policy_filter
- or all the latest policies of the same scope from the policy-engine
- """
- audit, policy_filter, scope_prefix = aud_policy_filter
+ """get the latest policies by policy_filter from the policy-engine"""
+ audit, policy_filter = aud_policy_filter
try:
str_policy_filter = json.dumps(policy_filter)
PolicyRest._logger.debug("%s", str_policy_filter)
- status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+ status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
- PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
- str_policy_filter, json.dumps(policy_configs or []))
+ PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_bodies or []))
- latest_policies = PolicyUtils.select_latest_policies(policy_configs)
-
- if (scope_prefix and not policy_configs
- and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value):
- audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
- error_code=AuditResponseCode.DATA_ERROR)
- return None, latest_policies, scope_prefix
+ latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
if not latest_policies:
- if not scope_prefix:
- audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
- audit.warn(
- "received no policies from PDP for policy_filter {0}: {1}"
- .format(str_policy_filter, json.dumps(policy_configs or [])),
- error_code=AuditResponseCode.DATA_ERROR)
- return None, latest_policies, None
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.warn(
+ "received no policies from PDP for policy_filter {0}: {1}"
+ .format(str_policy_filter, json.dumps(policy_bodies or [])),
+ error_code=AuditResponseCode.DATA_ERROR)
+ return None, latest_policies
audit.set_http_status_code(status_code)
valid_policies = {}
@@ -455,22 +437,22 @@ class PolicyRest(object):
valid_policies[policy_id] = policy
else:
errored_policies[policy_id] = policy
- return valid_policies, errored_policies, None
+ return valid_policies, errored_policies
except Exception as ex:
- error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})"
+ error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})"
.format(audit.request_id, type(ex).__name__, str(ex),
- "_get_latest_policies", json.dumps(policy_filter), scope_prefix))
+ "_get_latest_policies", json.dumps(policy_filter)))
PolicyRest._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- return None, None, scope_prefix
+ return None, None
@staticmethod
- def get_latest_policies(audit, policy_filter=None):
- """Get the latest policies of the same scope from the policy-engine"""
+ def get_latest_policies(audit, policy_filter=None, policy_filters=None):
+ """Get the latest policies by policy-filter(s) from the policy-engine"""
result = {}
aud_policy_filters = None
str_policy_filters = None
@@ -479,51 +461,53 @@ class PolicyRest(object):
try:
PolicyRest._lazy_init()
- if policy_filter is not None:
- aud_policy_filters = [(audit, policy_filter, None)]
+ if policy_filter:
+ aud_policy_filters = [(audit, policy_filter)]
str_policy_filters = json.dumps(policy_filter)
str_metrics = "get_latest_policies for policy_filter {0}".format(
str_policy_filters)
target_entity = ("{0} total get_latest_policies by policy_filter"
.format(PolicyRest._target_entity))
result[POLICY_FILTER] = copy.deepcopy(policy_filter)
- else:
- aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
- for scope_prefix in PolicyRest._scope_prefixes]
- str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
- str_metrics = "get_latest_policies for scopes {0} {1}".format( \
- len(PolicyRest._scope_prefixes), str_policy_filters)
- target_entity = ("{0} total get_latest_policies by scope_prefixes"
+ elif policy_filters:
+ aud_policy_filters = [
+ (audit, policy_filter)
+ for policy_filter in policy_filters
+ ]
+ str_policy_filters = json.dumps(policy_filters)
+ str_metrics = "get_latest_policies for policy_filters {0}".format(
+ str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by policy_filters"
.format(PolicyRest._target_entity))
- result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
+ result[POLICY_FILTERS] = copy.deepcopy(policy_filters)
+ else:
+ return result
PolicyRest._logger.debug("%s", str_policy_filters)
- metrics = Metrics(aud_parent=audit, targetEntity=target_entity,
- targetServiceName=PolicyRest._url_get_config)
+ metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity,
+ targetServiceName=PolicyRest._url_get_config)
- metrics.metrics_start(str_metrics)
+ metrics_total.metrics_start(str_metrics)
latest_policies = None
apfs_length = len(aud_policy_filters)
if apfs_length == 1:
latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
else:
- pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length))
latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
pool.close()
pool.join()
- metrics.metrics("total result {0}: {1} {2}".format(
+ metrics_total.metrics("total result {0}: {1} {2}".format(
str_metrics, len(latest_policies), json.dumps(latest_policies)))
- # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+ # latest_policies == [(valid_policies, errored_policies), ...]
result[LATEST_POLICIES] = dict(
- pair for (vps, _, _) in latest_policies if vps for pair in vps.items())
+ pair for (vps, _) in latest_policies if vps for pair in vps.items())
result[ERRORED_POLICIES] = dict(
- pair for (_, eps, _) in latest_policies if eps for pair in eps.items())
-
- result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
+ pair for (_, eps) in latest_policies if eps for pair in eps.items())
PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
str_policy_filters, json.dumps(result))
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 5ba7c29..3ae8199 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -18,24 +18,101 @@
"""policy-updater thread"""
-import copy
import json
import logging
-from queue import Queue
-from threading import Lock, Thread
+from threading import Event, Lock, Thread
from .config import Config
-from .deploy_handler import DeployHandler
+from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
-from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
- REMOVED_POLICIES)
+from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID,
+ POLICY_NAME, POLICY_NAMES, POLICY_VERSION)
+from .policy_matcher import PolicyMatcher
from .policy_rest import PolicyRest
-from .policy_utils import Utils
+from .policy_utils import PolicyUtils
from .step_timer import StepTimer
+class _PolicyUpdate(object):
+ """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
+ _logger = logging.getLogger("policy_handler.policy_update")
+
+ def __init__(self):
+ """init and reset"""
+ self._audit = None
+ self._policies_updated = {}
+ self._policies_removed = {}
+
+ def reset(self):
+ """resets the state"""
+ self.__init__()
+
+ def pop_policy_update(self):
+ """
+ Returns the consolidated (audit, policies_updated, policies_removed)
+ and resets the state
+ """
+ if not self._audit:
+ return None, None, None
+
+ audit = self._audit
+ policies_updated = self._policies_updated
+ policies_removed = self._policies_removed
+
+ self.reset()
+
+ return audit, policies_updated, policies_removed
+
+
+ def push_policy_update(self, policies_updated, policies_removed):
+ """consolidate the new policies_updated, policies_removed to existing ones"""
+ for policy_body in policies_updated:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ self._policies_updated[policy_id] = policy
+
+ rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
+ if rm_policy_names and policy_name in rm_policy_names:
+ del rm_policy_names[policy_name]
+
+ for policy_body in policies_removed:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ if policy_id in self._policies_removed:
+ policy = self._policies_removed[policy_id]
+
+ if POLICY_NAMES not in policy:
+ policy[POLICY_NAMES] = {}
+ policy[POLICY_NAMES][policy_name] = True
+ self._policies_removed[policy_id] = policy
+
+ req_message = ("policy-update notification - updated[{0}], removed[{1}]"
+ .format(len(self._policies_updated),
+ len(self._policies_removed)))
+
+ if not self._audit:
+ self._audit = Audit(job_name="policy_update",
+ req_message=req_message,
+ retry_get_config=True)
+ else:
+ self._audit.req_message = req_message
+
+ self._logger.info(
+ "pending request_id %s for %s policies_updated %s policies_removed %s",
+ self._audit.request_id, req_message,
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+
class PolicyUpdater(Thread):
- """queue and handle the policy-updates in a separate thread"""
+ """sequentially handle the policy-updates and catch-ups in its own policy_updater thread"""
_logger = logging.getLogger("policy_handler.policy_updater")
def __init__(self):
@@ -43,33 +120,22 @@ class PolicyUpdater(Thread):
Thread.__init__(self, name="policy_updater")
self.daemon = True
+ self._lock = Lock()
+ self._run = Event()
+
self._catch_up_timer = None
self._aud_shutdown = None
self._aud_catch_up = None
+ self._policy_update = _PolicyUpdate()
catch_up_config = Config.settings.get(CATCH_UP, {})
self._catch_up_interval = catch_up_config.get("interval") or 15*60
- self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
- self._catch_up_skips = 0
- self._catch_up_prev_message = None
-
- self._lock = Lock()
- self._queue = Queue()
-
- def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
+ def policy_update(self, policies_updated, policies_removed):
"""enqueue the policy-updates"""
- policies_updated = policies_updated or []
- policies_removed = policies_removed or []
-
- PolicyUpdater._logger.info(
- "enqueue request_id %s policies_updated %s policies_removed %s",
- ((audit and audit.request_id) or "none"),
- json.dumps(policies_updated), json.dumps(policies_removed))
-
with self._lock:
- self._queue.put((audit, policies_updated, policies_removed))
-
+ self._policy_update.push_policy_update(policies_updated, policies_removed)
+ self._run.set()
def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
@@ -80,30 +146,24 @@ class PolicyUpdater(Thread):
"catch_up %s request_id %s",
self._aud_catch_up.req_message, self._aud_catch_up.request_id
)
-
- self.enqueue()
-
+ self._run.set()
def run(self):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- queued_audit, policies_updated, policies_removed = self._queue.get()
- PolicyUpdater._logger.info(
- "got request_id %s policies_updated %s policies_removed %s",
- ((queued_audit and queued_audit.request_id) or "none"),
- json.dumps(policies_updated), json.dumps(policies_removed))
+ self._run.wait()
+
+ with self._lock:
+ self._run.clear()
if not self._keep_running():
break
if self._on_catch_up():
- self._reset_queue()
- continue
- elif not queued_audit:
continue
- self._on_policies_update(queued_audit, policies_updated, policies_removed)
+ self._on_policy_update()
PolicyUpdater._logger.info("exit policy-updater")
@@ -151,44 +211,13 @@ class PolicyUpdater(Thread):
self._catch_up_timer = None
self._logger.info("stopped catch_up_timer")
- def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
- """try not to send the duplicate messages on auto catchup unless hitting the max count"""
- if aud_catch_up.req_message != AUTO_CATCH_UP \
- or self._catch_up_skips >= self._catch_up_max_skips \
- or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
- self._catch_up_skips = 0
- self._catch_up_prev_message = copy.deepcopy(catch_up_message)
- log_line = "going to send the catch_up {0}: {1}".format(
- aud_catch_up.req_message,
- json.dumps(self._catch_up_prev_message)
- )
- self._logger.info(log_line)
- aud_catch_up.info(log_line)
- return True
-
- self._catch_up_skips += 1
- self._catch_up_prev_message = copy.deepcopy(catch_up_message)
- log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format(
- self._catch_up_skips, self._catch_up_max_skips,
- aud_catch_up.req_message, json.dumps(self._catch_up_prev_message)
- )
- self._logger.info(log_line)
- aud_catch_up.info(log_line)
- return False
-
- def _reset_queue(self):
- """clear up the queue"""
- with self._lock:
- if not self._aud_catch_up and not self._aud_shutdown:
- with self._queue.mutex:
- self._queue.queue.clear()
-
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
aud_catch_up = self._aud_catch_up
if self._aud_catch_up:
self._aud_catch_up = None
+ self._policy_update.reset()
if not aud_catch_up:
return False
@@ -196,20 +225,20 @@ class PolicyUpdater(Thread):
log_line = "catch_up {0} request_id {1}".format(
aud_catch_up.req_message, aud_catch_up.request_id
)
+ catch_up_result = ""
try:
PolicyUpdater._logger.info(log_line)
self._pause_catch_up_timer()
- catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
- catch_up_message[CATCH_UP] = True
+ _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up)
- catch_up_result = ""
- if not aud_catch_up.is_success():
+ if not catch_up_message or not aud_catch_up.is_success_or_not_found():
catch_up_result = "- not sending catch-up to deployment-handler due to errors"
PolicyUpdater._logger.warning(catch_up_result)
- elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
- catch_up_result = "- skipped sending the same policies"
+ elif catch_up_message.empty():
+ catch_up_result = "- not sending empty catch-up to deployment-handler"
else:
+ aud_catch_up.reset_http_status_not_found()
DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
if not aud_catch_up.is_success():
catch_up_result = "- failed to send catch-up to deployment-handler"
@@ -229,9 +258,6 @@ class PolicyUpdater(Thread):
aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
success = False
- if not success:
- self._catch_up_prev_message = None
-
self._run_catch_up_timer()
PolicyUpdater._logger.info("policy_handler health: %s",
@@ -240,49 +266,73 @@ class PolicyUpdater(Thread):
return success
- def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
- """handle the event of policy-updates from the queue"""
- deployment_handler_changed = None
+ def _on_policy_update(self):
+ """handle the event of policy-updates"""
result = ""
+ with self._lock:
+ audit, policies_updated, policies_removed = self._policy_update.pop_policy_update()
+
+ if not audit:
+ return
log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
- ((queued_audit and queued_audit.request_id) or "none"),
- json.dumps(policies_updated), json.dumps(policies_removed))
+ audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed))
+ PolicyUpdater._logger.info(log_line)
try:
- updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (queued_audit, policies_updated, policies_removed))
-
- if not queued_audit.is_success():
+ (updated_policies, removed_policies,
+ policy_filter_matches) = PolicyMatcher.match_to_deployed_policies(
+ audit, policies_updated, policies_removed)
+
+ if updated_policies or removed_policies:
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (audit,
+ [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
+ for policy_id, policy in updated_policies.items()],
+ [(policy_id, policy.get(POLICY_NAMES, {}))
+ for policy_id, policy in removed_policies.items()]
+ ))
+
+ if not audit.is_success_or_not_found():
result = "- not sending policy-updates to deployment-handler due to errors"
PolicyUpdater._logger.warning(result)
+ elif not updated_policies and not removed_policies:
+ result = "- not sending empty policy-updates to deployment-handler"
+ PolicyUpdater._logger.warning(result)
else:
- message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
- deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
- if not queued_audit.is_success():
- result = "- failed to send policy-updates to deployment-handler"
+ message = PolicyUpdateMessage(updated_policies, removed_policies,
+ policy_filter_matches, False)
+ log_updates = ("policies-updated[{0}], removed[{1}]"
+ .format(len(updated_policies), len(removed_policies)))
+
+ audit.reset_http_status_not_found()
+ DeployHandler.policy_update(audit, message)
+
+ if not audit.is_success():
+ result = "- failed to send to deployment-handler {}".format(log_updates)
PolicyUpdater._logger.warning(result)
else:
- result = "- sent policy-updates to deployment-handler"
+ result = "- sent to deployment-handler {}".format(log_updates)
+ log_line = "request_id: {} updated_policies: {} removed_policies: {}".format(
+ audit.request_id,
+ json.dumps(updated_policies), json.dumps(removed_policies))
- success, _, _ = queued_audit.audit_done(result=result)
+ audit.audit_done(result=result)
+ PolicyUpdater._logger.info(log_line + " " + result)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
- .format(queued_audit.request_id, type(ex).__name__, str(ex),
+ .format(audit.request_id, type(ex).__name__, str(ex),
"on_policies_update", log_line + " " + result))
PolicyUpdater._logger.exception(error_msg)
- queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
- queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- success = False
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- if deployment_handler_changed:
- self._catch_up_prev_message = None
+ if DeployHandler.server_instance_changed:
+ DeployHandler.server_instance_changed = False
self._pause_catch_up_timer()
self.catch_up()
- elif not success:
- self._catch_up_prev_message = None
def shutdown(self, audit):
@@ -290,7 +340,7 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.info("shutdown policy-updater")
with self._lock:
self._aud_shutdown = audit
- self.enqueue()
+ self._run.set()
self._stop_catch_up_timer()
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
index c2a8b07..c63f382 100644
--- a/policyhandler/policy_utils.py
+++ b/policyhandler/policy_utils.py
@@ -16,15 +16,18 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-"""policy-client communicates with policy-engine thru REST API"""
+"""utils for policy usage and conversions"""
import json
import logging
import re
+from copy import deepcopy
+from typing import Pattern
from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME,
POLICY_VERSION)
+
class PolicyUtils(object):
"""policy-client utils"""
_logger = logging.getLogger("policy_handler.policy_utils")
@@ -59,30 +62,30 @@ class PolicyUtils(object):
return policy
@staticmethod
- def convert_to_policy(policy_config):
- """wrap policy_config received from policy-engine with policy_id."""
- if not policy_config:
- return
- policy_name = policy_config.get(POLICY_NAME)
- policy_version = policy_config.get(POLICY_VERSION)
+ def convert_to_policy(policy_body):
+ """wrap policy_body received from policy-engine with policy_id."""
+ if not policy_body:
+ return None
+ policy_name = policy_body.get(POLICY_NAME)
+ policy_version = policy_body.get(POLICY_VERSION)
if not policy_name or not policy_version:
- return
+ return None
policy_id = PolicyUtils.extract_policy_id(policy_name)
if not policy_id:
- return
- return {POLICY_ID:policy_id, POLICY_BODY:policy_config}
+ return None
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
@staticmethod
- def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None):
- """For some reason, the policy-engine returns all version of the policy_configs.
+ def select_latest_policy(policy_bodies, min_version_expected=None, ignore_policy_names=None):
+ """For some reason, the policy-engine returns all version of the policy_bodies.
DCAE-Controller is only interested in the latest version
"""
- if not policy_configs:
+ if not policy_bodies:
return
- latest_policy_config = {}
- for policy_config in policy_configs:
- policy_name = policy_config.get(POLICY_NAME)
- policy_version = policy_config.get(POLICY_VERSION)
+ latest_policy_body = {}
+ for policy_body in policy_bodies:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy_version = policy_body.get(POLICY_VERSION)
if not policy_name or not policy_version or not policy_version.isdigit():
continue
policy_version = int(policy_version)
@@ -91,30 +94,30 @@ class PolicyUtils(object):
if ignore_policy_names and policy_name in ignore_policy_names:
continue
- if not latest_policy_config \
- or int(latest_policy_config[POLICY_VERSION]) < policy_version:
- latest_policy_config = policy_config
+ if (not latest_policy_body
+ or int(latest_policy_body[POLICY_VERSION]) < policy_version):
+ latest_policy_body = policy_body
- return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config))
+ return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))
@staticmethod
- def select_latest_policies(policy_configs):
- """For some reason, the policy-engine returns all version of the policy_configs.
+ def select_latest_policies(policy_bodies):
+ """For some reason, the policy-engine returns all version of the policy_bodies.
DCAE-Controller is only interested in the latest versions
"""
- if not policy_configs:
+ if not policy_bodies:
return {}
policies = {}
- for policy_config in policy_configs:
- policy = PolicyUtils.convert_to_policy(policy_config)
+ for policy_body in policy_bodies:
+ policy = PolicyUtils.convert_to_policy(policy_body)
if not policy:
continue
policy_id = policy.get(POLICY_ID)
policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
if not policy_id or not policy_version or not policy_version.isdigit():
continue
- if policy_id not in policies \
- or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]):
+ if (policy_id not in policies
+ or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
policies[policy_id] = policy
for policy_id in policies:
@@ -173,3 +176,204 @@ class Utils(object):
if not the_same_values:
Utils._logger.debug("values %s != %s", body_1, body_2)
return the_same_values
+
+class RegexCoarser(object):
+ """
+ utility to combine or coarse the collection of regex patterns
+ into a single regex that is at least not narrower (wider or the same)
+ than the collection regexes
+
+ inspired by https://github.com/spadgos/regex-combiner in js
+ """
+ ENDER = '***'
+ GROUPERS = {'{': '}', '[': ']', '(': ')'}
+ MODIFIERS = '*?+'
+ CHOICE_STARTER = '('
+ HIDDEN_CHOICE_STARTER = '(?:'
+ ANY_CHARS = '.*'
+ LINE_START = '^'
+
+ def __init__(self, regex_patterns=None):
+ """regex coarser"""
+ self.trie = {}
+ self.patterns = []
+ self.add_regex_patterns(regex_patterns)
+
+ def get_combined_regex_pattern(self):
+ """gets the pattern for the combined regex"""
+ trie = deepcopy(self.trie)
+ RegexCoarser._compress(trie)
+ return RegexCoarser._trie_to_pattern(trie)
+
+ def get_coarse_regex_patterns(self, max_length=100):
+ """gets the patterns for the coarse regex"""
+ trie = deepcopy(self.trie)
+ RegexCoarser._compress(trie)
+ patterns = RegexCoarser._trie_to_pattern(trie, True)
+
+ root_patterns = []
+ for pattern in patterns:
+ left, _, choice = pattern.partition(RegexCoarser.CHOICE_STARTER)
+ if choice and left and left.strip() != RegexCoarser.LINE_START and not left.isspace():
+ pattern = left + RegexCoarser.ANY_CHARS
+ root_patterns.append(pattern)
+ root_patterns = RegexCoarser._join_patterns(root_patterns, max_length)
+
+ if not root_patterns or root_patterns == ['']:
+ return []
+ return root_patterns
+
+
+ def add_regex_patterns(self, new_regex_patterns):
+ """adds the new_regex patterns to RegexPatternCoarser"""
+ if not new_regex_patterns or not isinstance(new_regex_patterns, list):
+ return
+ for new_regex_pattern in new_regex_patterns:
+ self.add_regex_pattern(new_regex_pattern)
+
+ def add_regex_pattern(self, new_regex_pattern):
+ """adds the new_regex to RegexPatternCoarser"""
+ new_regex_pattern = RegexCoarser._regex_pattern_to_string(new_regex_pattern)
+ if not new_regex_pattern:
+ return
+
+ self.patterns.append(new_regex_pattern)
+
+ tokens = RegexCoarser._tokenize(new_regex_pattern)
+ last_token_idx = len(tokens) - 1
+ trie_node = self.trie
+ for idx, token in enumerate(tokens):
+ if token not in trie_node:
+ trie_node[token] = {}
+ if idx == last_token_idx:
+ trie_node[token][RegexCoarser.ENDER] = {}
+ trie_node = trie_node[token]
+
+ @staticmethod
+ def _regex_pattern_to_string(regex_pattern):
+ """convert regex pattern to string"""
+ if not regex_pattern:
+ return ''
+
+ if isinstance(regex_pattern, str):
+ return regex_pattern
+
+ if isinstance(regex_pattern, Pattern):
+ return regex_pattern.pattern
+ return None
+
+ @staticmethod
+ def _tokenize(regex_pattern):
+ """tokenize the regex pattern for trie assignment"""
+ tokens = []
+ token = ''
+ group_ender = None
+ use_next = False
+
+ for char in regex_pattern:
+ if use_next:
+ use_next = False
+ token += char
+ char = None
+
+ if char == '\\':
+ use_next = True
+ token += char
+ continue
+
+ if not group_ender and char in RegexCoarser.GROUPERS:
+ group_ender = RegexCoarser.GROUPERS[char]
+ token = char
+ char = None
+
+ if char is None:
+ pass
+ elif char == group_ender:
+ token += char
+ group_ender = None
+ if char == '}': # this group is a modifier
+ tokens[len(tokens) - 1] += token
+ token = ''
+ continue
+ elif char in RegexCoarser.MODIFIERS:
+ if group_ender:
+ token += char
+ else:
+ tokens[len(tokens) - 1] += char
+ continue
+ else:
+ token += char
+
+ if not group_ender:
+ tokens.append(token)
+ token = ''
+
+ if token:
+ tokens.append(token)
+ return tokens
+
+ @staticmethod
+ def _compress(trie):
+ """compress trie into shortest leaves"""
+ for key, subtrie in trie.items():
+ RegexCoarser._compress(subtrie)
+ subkeys = list(subtrie.keys())
+ if len(subkeys) == 1:
+ trie[key + subkeys[0]] = subtrie[subkeys[0]]
+ del trie[key]
+
+ @staticmethod
+ def _trie_to_pattern(trie, top_keep=False):
+ """convert trie to the regex pattern"""
+ patterns = [
+ key.replace(RegexCoarser.ENDER, '') + RegexCoarser._trie_to_pattern(subtrie)
+ for key, subtrie in trie.items()
+ ]
+
+ if top_keep:
+ return patterns
+
+ return RegexCoarser._join_patterns(patterns)[0]
+
+ @staticmethod
+ def _join_patterns(patterns, max_length=0):
+ """convert list of patterns to the segmented list of dense regex patterns"""
+ if not patterns:
+ return ['']
+
+ if len(patterns) == 1:
+ return patterns
+
+ if not max_length:
+ return [RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns) + ')']
+
+ long_patterns = []
+ join_patterns = []
+ for pattern in patterns:
+ len_pattern = len(pattern)
+ if not len_pattern:
+ continue
+ if len_pattern >= max_length:
+ long_patterns.append(pattern)
+ continue
+
+ for idx, patterns_to_join in enumerate(join_patterns):
+ patterns_to_join, len_patterns_to_join = patterns_to_join
+ if len_pattern + len_patterns_to_join < max_length:
+ patterns_to_join.append(pattern)
+ len_patterns_to_join += len_pattern
+ join_patterns[idx] = (patterns_to_join, len_patterns_to_join)
+ len_pattern = 0
+ break
+ if len_pattern:
+ join_patterns.append(([pattern], len_pattern))
+ join_patterns.sort(key=lambda x: x[1])
+
+ if join_patterns:
+ # pattern, _, choice = pattern.endswith(RegexCoarser.ANY_CHARS)
+ join_patterns = [
+ RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns_to_join) + ')'
+ for patterns_to_join, _ in join_patterns
+ ]
+
+ return join_patterns + long_patterns
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index c49536f..24db468 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -16,31 +16,39 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-"""web-service for policy_handler"""
+"""web-server for policy_handler"""
-import logging
import json
+import logging
from datetime import datetime
+
import cherrypy
from .config import Config
+from .deploy_handler import PolicyUpdateMessage
from .onap.audit import Audit
-from .policy_rest import PolicyRest
+from .policy_matcher import PolicyMatcher
from .policy_receiver import PolicyReceiver
+from .policy_rest import PolicyRest
+
class PolicyWeb(object):
- """run REST API of policy-handler"""
- SERVER_HOST = "0.0.0.0"
+ """run http API of policy-handler on 0.0.0.0:wservice_port - any incoming address"""
+ HOST_INADDR_ANY = ".".join("0"*4)
logger = logging.getLogger("policy_handler.policy_web")
@staticmethod
def run_forever(audit):
"""run the web-server of the policy-handler forever"""
- PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port)
- cherrypy.config.update({"server.socket_host": PolicyWeb.SERVER_HOST,
- 'server.socket_port': Config.wservice_port})
+ PolicyWeb.logger.info("policy_handler web-server on port(%d)...", Config.wservice_port)
+ cherrypy.config.update({"server.socket_host": PolicyWeb.HOST_INADDR_ANY,
+ "server.socket_port": Config.wservice_port})
cherrypy.tree.mount(_PolicyWeb(), '/')
- audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port))
+ audit.info("running policy_handler web-server as {0}:{1}".format(
+ cherrypy.server.socket_host, cherrypy.server.socket_port))
+ PolicyWeb.logger.info("running policy_handler web-server as %s:%d with config: %s",
+ cherrypy.server.socket_host, cherrypy.server.socket_port,
+ json.dumps(cherrypy.config))
cherrypy.engine.start()
class _PolicyWeb(object):
@@ -81,7 +89,9 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- result = PolicyRest.get_latest_policies(audit)
+ result, policy_update = PolicyMatcher.get_latest_policies(audit)
+ if policy_update and isinstance(policy_update, PolicyUpdateMessage):
+ result["policy_update"] = policy_update.get_message()
PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result))
@@ -96,8 +106,7 @@ class _PolicyWeb(object):
@cherrypy.tools.json_in()
def policies_latest(self):
"""
- on :GET: retrieves all the latest policies from policy-engine that are
- in the scope of the policy-handler.
+ on :GET: retrieves all the latest policies from policy-engine that are deployed
on :POST: expects to receive the params that mimic the /getConfig of policy-engine
and retrieves the matching policies from policy-engine and picks the latest on each policy.