diff options
author | Alex Shatov <alexs@att.com> | 2018-08-07 12:11:35 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-08-07 12:11:35 -0400 |
commit | d7f34d4b71ec4d86547628cda351d20bff4d017f (patch) | |
tree | 101c7669fb5508a103894e262964da0d0c8319bc | |
parent | a29f70823b18f492417629f56c86f61f94b96af8 (diff) |
4.0.0 new dataflow on policy-update and catchup
- changed API and functionality - new dataflow
- new dataflow between policy-handler and deployment-handler
on policy-update and catchup
= GETting policy_ids+versions and policy-filters from
deployment-handler
= PUTting policy-update and catchup in the new message format
= data segmenting the policy-update/catchup messages to
deployment-handler to avoid 413 on deployment-handler side
= matching policies from policy-engine to policies
and policy-filters from deployment-handler
= coarsening the policyName filter received from deployment-handler
to reduce the number messages passed to policy-engine on catchup
= consolidating sequential policy-updates into a single request
when the policy-update is busy
- removed policy scope-prefixes from config and logic -
it is not needed anymore because
= the policy matching happens directly to policies and
policy-filters received from deployment-handler
= on catchup - the policy scope-prefix equivalents are calculated
based on the data received from deployment-handler
- API - GET /policies_latest now returns the info on deployed
policy_ids+versions and policy-filters, rather than policies
of the scope-prefixes previously found in config (obsolete)
- not sending an empty catch_up message to deployment-handler
when nothing changed
- send policy-removed to deployment-handler when getting
404-not found from PDP on removal of policy
- config change: removed catch_up.max_skips - obsolete
- brought the latest CommonLogger.py
- minor refactoring - improved naming of variables
Change-Id: I36b3412eefd439088cb693703a6e5f18f4238b00
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-492
-rw-r--r-- | README.md | 1 | ||||
-rw-r--r-- | etc_upload/config.json | 15 | ||||
-rw-r--r-- | etc_upload/upload_config_for_ph_in_docker.sh | 40 | ||||
-rw-r--r-- | policyhandler/customize/customizer_base.py | 4 | ||||
-rw-r--r-- | policyhandler/deploy_handler.py | 266 | ||||
-rw-r--r-- | policyhandler/onap/CommonLogger.py | 1916 | ||||
-rw-r--r-- | policyhandler/onap/audit.py | 29 | ||||
-rw-r--r-- | policyhandler/policy_consts.py | 8 | ||||
-rw-r--r-- | policyhandler/policy_matcher.py | 236 | ||||
-rw-r--r-- | policyhandler/policy_receiver.py | 42 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 146 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 256 | ||||
-rw-r--r-- | policyhandler/policy_utils.py | 260 | ||||
-rw-r--r-- | policyhandler/web_server.py | 33 | ||||
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/mock_settings.py | 2 | ||||
-rw-r--r-- | tests/test_policy_utils.py | 186 | ||||
-rw-r--r-- | tests/test_policyhandler.py | 176 | ||||
-rw-r--r-- | version.properties | 4 |
20 files changed, 2260 insertions, 1364 deletions
@@ -42,7 +42,6 @@ in folder `policy_handler`: - `config.json` contains - - `"scope_prefixes" : ["DCAE.Config_"]` - the list of policy-scope-class values - `"policy_engine"` - the http connect info to ONAP **policy-engine** - headers.ClientAuth : base64(<mech-id with namespace>:<password>) - headers.Authorization : base64(<policy-engine server auth>) diff --git a/etc_upload/config.json b/etc_upload/config.json index aae8e84..a38d81c 100644 --- a/etc_upload/config.json +++ b/etc_upload/config.json @@ -3,29 +3,28 @@ "system" : "policy_handler", "thread_pool_size" : 4, "pool_connections" : 20, - "scope_prefixes" : ["DCAE.Config_", "CLAMP"], "policy_retry_count" : 5, "policy_retry_sleep" : 5, "catch_up" : { - "interval" : 1200, - "max_skips" : 5 + "interval" : 1200 }, "policy_engine" : { - "url" : "https://peawiv9nspd01.pedc.sbc.com:8081", - "path_pdp" :"/pdp/", + "url" : "https://policy_engine:8081", + "path_pdp" : "/pdp/", "path_api" : "/pdp/api/", "headers" : { "Accept" : "application/json", "Content-Type" : "application/json", - "ClientAuth" : "Basic bTAzOTQ5OnBvbGljeVIwY2sk", - "Authorization" : "Basic dGVzdHBkcDphbHBoYTEyMw==", + "ClientAuth" : "Basic blah", + "Authorization" : "Basic blah", "Environment" : "TEST" }, "target_entity" : "policy_engine" }, "deploy_handler" : { "target_entity" : "deployment_handler", - "url" : "http://deployment_handler:8188" + "url" : "http://deployment_handler:8188", + "max_msg_length_mb" : 5 } } } diff --git a/etc_upload/upload_config_for_ph_in_docker.sh b/etc_upload/upload_config_for_ph_in_docker.sh deleted file mode 100644 index 0854d21..0000000 --- a/etc_upload/upload_config_for_ph_in_docker.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash - -# ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -echo "to upload the config to consul-kv discovery:" -echo " - place it into etc_upload/config.json" -echo " - and run this script: etc_upload/upload_config_for_ph_in_docker.sh" -echo " from main dir == on the same level as etc_upload/" - -APPNAME=policy_handler - -if [[ -n ${DOCKER_HOST} ]]; then - # DOCKER_HOSTNAME=${DOCKER_HOST//*(tcp:|:*[0-9]|\/)/} - DOCKER_HOSTNAME=${DOCKER_HOST//tcp:/} - DOCKER_HOSTNAME=${DOCKER_HOSTNAME//:*[0-9]/} - DOCKER_HOSTNAME=${DOCKER_HOSTNAME//\//} - CONSUL_HOST=${DOCKER_HOSTNAME} -else - CONSUL_HOST=devcnsl00.dcae.sic.research.att.com -fi - -echo "uploading etc_upload/config.json for ${APPNAME} to CONSUL_HOST=${CONSUL_HOST}" - -curl -X PUT -H 'Content-Type: application/json' --data-binary "$(cat etc_upload/config.json)" http://${CONSUL_HOST}:8500/v1/kv/${APPNAME} diff --git a/policyhandler/customize/customizer_base.py b/policyhandler/customize/customizer_base.py index c98a9eb..561891f 100644 --- a/policyhandler/customize/customizer_base.py +++ b/policyhandler/customize/customizer_base.py @@ -55,8 +55,8 @@ class CustomizerBase(object): return service_url def get_deploy_handler_kwargs(self, audit): - """returns the optional dict-kwargs for requests.post to deploy_handler""" - info = "no optional kwargs for requests.post to deploy_handler" + """returns the optional dict-kwargs for requests.put to deploy_handler""" + info = "no optional kwargs for requests.put to deploy_handler" self._logger.info(info) audit.info(info) kwargs = {} diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index ea703f4..6b7788c 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -16,32 +16,144 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -""" send notification to deploy-handler""" +"""send policy-update notification to deployment-handler""" import json import logging +from copy import copy, deepcopy import requests from .config import Config from .customize import CustomizerUser from .discovery import DiscoveryClient -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, + POLICY_FILTER_MATCHES, POLICY_FILTERS, + REMOVED_POLICIES) + + +class PolicyUpdateMessage(object): + """class for messages to deployment-handler on policy-update""" + BYTES_IN_MB = 1 << 2 * 10 + + def __init__(self, latest_policies=None, + removed_policies=None, policy_filter_matches=None, catch_up=True): + """init""" + self._catch_up = catch_up + self._latest_policies = deepcopy(latest_policies or {}) + self._removed_policies = copy(removed_policies or {}) + self._policy_filter_matches = deepcopy(policy_filter_matches or {}) + + self._message = { + CATCH_UP: self._catch_up, + LATEST_POLICIES: self._latest_policies, + REMOVED_POLICIES: self._removed_policies, + POLICY_FILTER_MATCHES: self._policy_filter_matches + } + self.msg_length = 0 + self._calc_stats() + + def _calc_stats(self): + """generate the message and calc stats""" + self.msg_length = len(json.dumps(self._message)) + + def empty(self): + """checks whether have any data""" + return (not self._latest_policies + and not self._removed_policies + and not self._policy_filter_matches) + + def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None): + """add the parts from the other message to the current message""" + if not policy_id or not (latest_policy or policy_filter_ids or removed_policy): + return + + if latest_policy: + self._latest_policies[policy_id] = deepcopy(latest_policy) + + if policy_filter_ids: + if policy_id not in self._policy_filter_matches: + self._policy_filter_matches[policy_id] = {} + self._policy_filter_matches[policy_id].update(policy_filter_ids) + + if removed_policy is not None: + self._removed_policies[policy_id] = removed_policy + + self._calc_stats() + + def get_message(self): + """expose the copy of the message""" + return deepcopy(self._message) + + def __str__(self): + """to string""" + return json.dumps(self._message) + + def _iter_over_removed_policies(self): + """generator of iterator over removed_policies""" + for (policy_id, value) in self._removed_policies.items(): + yield (policy_id, value) + + def _iter_over_latest_policies(self): + """generator of iterator over latest_policies and policy_filter_matches""" + for (policy_id, policy) in self._latest_policies.items(): + yield (policy_id, policy, self._policy_filter_matches.get(policy_id)) + + def gen_segmented_messages(self, max_msg_length_mb): + """ + Break the policy-update message into a list of segmented messages. + + Each segmented message should not exceed the max_msg_length_mb from config. + """ + max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB + + messages = [] + curr_message = PolicyUpdateMessage(catch_up=self._catch_up) + + for (policy_id, value) in self._iter_over_removed_policies(): + if (not curr_message.empty() + and (len(policy_id) + len(str(value)) + curr_message.msg_length + > max_msg_length_mb)): + messages.append(curr_message.get_message()) + curr_message = PolicyUpdateMessage(catch_up=self._catch_up) + curr_message.add(policy_id, removed_policy=value) + + for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies(): + if (not curr_message.empty() + and (2 * len(policy_id) + len(json.dumps(policy)) + + len(json.dumps(policy_filter_ids)) + + curr_message.msg_length > max_msg_length_mb)): + messages.append(curr_message.get_message()) + curr_message = PolicyUpdateMessage(catch_up=self._catch_up) + curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids) + + if not curr_message.empty(): + messages.append(curr_message.get_message()) + + msg_count = len(messages) + if msg_count > 1: + msg_count = "/" + str(msg_count) + for idx, msg in enumerate(messages): + msg["data_segment"] = str((idx+1)) + msg_count + + return messages -POOL_SIZE = 1 class DeployHandler(object): - """ deploy-handler """ + """calling the deployment-handler web apis""" _logger = logging.getLogger("policy_handler.deploy_handler") _lazy_inited = False _requests_session = None - _config = None _url = None _url_policy = None + _max_msg_length_mb = 10 _target_entity = None _custom_kwargs = None _server_instance_uuid = None + server_instance_changed = False @staticmethod def _lazy_init(audit, rediscover=False): @@ -56,14 +168,15 @@ class DeployHandler(object): DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: + pool_size = Config.settings.get("pool_connections", 20) DeployHandler._requests_session = requests.Session() DeployHandler._requests_session.mount( 'https://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) DeployHandler._requests_session.mount( 'http://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) config_dh = Config.settings.get("deploy_handler") @@ -72,10 +185,13 @@ class DeployHandler(object): # config for policy-handler >= 2.4.0 # "deploy_handler" : { # "target_entity" : "deployment_handler", - # "url" : "http://deployment_handler:8188" + # "url" : "http://deployment_handler:8188", + # "max_msg_length_mb" : 100 # } DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") DeployHandler._url = config_dh.get("url") + DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", + DeployHandler._max_msg_length_mb) DeployHandler._logger.info("dns based routing to %s: url(%s)", DeployHandler._target_entity, DeployHandler._url) @@ -96,26 +212,52 @@ class DeployHandler(object): @staticmethod - def policy_update(audit, message, rediscover=False): + def policy_update(audit, policy_update_message, rediscover=False): """ - post policy_updated message to deploy-handler + segments the big policy_update_message limited by size + and sequatially sends each segment as put to deployment-handler at /policy. - returns condition whether it needs to catch_up + param policy_update_message is of PolicyUpdateMessage type """ - if not message: + if not policy_update_message or policy_update_message.empty(): return DeployHandler._lazy_init(audit, rediscover) + + str_metrics = "policy_update {0}".format(str(policy_update_message)) + + metrics_total = Metrics( + aud_parent=audit, + targetEntity="{0} total policy_update".format(DeployHandler._target_entity), + targetServiceName=DeployHandler._url_policy) + + metrics_total.metrics_start("started {}".format(str_metrics)) + messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb) + for message in messages: + DeployHandler._policy_update(audit, message) + if not audit.is_success(): + break + metrics_total.metrics("done {}".format(str_metrics)) + + @staticmethod + def _policy_update(audit, message): + """ + sends the put message to deployment-handler at /policy + + detects whether server_instance_changed condition on deployment-handler + that is the cause to catch_up + """ + if not message: + return + metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_policy) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} - msg_str = json.dumps(message) - headers_str = json.dumps(headers) - - log_action = "post to {0} at {1}".format( + log_action = "put to {0} at {1}".format( DeployHandler._target_entity, DeployHandler._url_policy) - log_data = " msg={0} headers={1}".format(msg_str, headers_str) + log_data = " msg={0} headers={1}".format(json.dumps(message), + json.dumps(headers)) log_line = log_action + log_data DeployHandler._logger.info(log_line) metrics.metrics_start(log_line) @@ -130,7 +272,7 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.post( + res = DeployHandler._requests_session.put( DeployHandler._url_policy, json=message, headers=headers, **DeployHandler._custom_kwargs ) @@ -149,8 +291,8 @@ class DeployHandler(object): metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - log_line = "response {0} from {1}: text={2}{3}" \ - .format(res.status_code, log_action, res.text, log_data) + log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action, + res.text, log_data) metrics.metrics(log_line) if res.status_code != requests.codes.ok: @@ -159,15 +301,89 @@ class DeployHandler(object): DeployHandler._logger.info(log_line) result = res.json() or {} + DeployHandler._server_instance_changed(result, metrics) + + + @staticmethod + def get_deployed_policies(audit, rediscover=False): + """ + Retrieves policies and policy-filters from components + that were deployed by deployment-handler + """ + DeployHandler._lazy_init(audit, rediscover) + metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, + targetServiceName=DeployHandler._url_policy) + headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + + log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy) + log_data = " headers={}".format(json.dumps(headers)) + log_line = log_action + log_data + DeployHandler._logger.info(log_line) + metrics.metrics_start(log_line) + + if not DeployHandler._url: + error_msg = "no url found to {0}".format(log_line) + DeployHandler._logger.error(error_msg) + metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + metrics.metrics(error_msg) + return None, None + + res = None + try: + res = DeployHandler._requests_session.get( + DeployHandler._url_policy, headers=headers, + **DeployHandler._custom_kwargs + ) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed to {0} {1}: {2}{3}" + .format(log_action, type(ex).__name__, str(ex), log_data)) + DeployHandler._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None, None + + metrics.set_http_status_code(res.status_code) + audit.set_http_status_code(res.status_code) + + log_line = ("response {0} from {1}: text={2}{3}" + .format(res.status_code, log_action, res.text, log_data)) + metrics.metrics(log_line) + + if res.status_code != requests.codes.ok: + DeployHandler._logger.error(log_line) + return None, None + + result = res.json() or {} + DeployHandler._server_instance_changed(result, metrics) + + policies = result.get(POLICIES, {}) + policy_filters = result.get(POLICY_FILTERS, {}) + if not policies and not policy_filters: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + DeployHandler._logger.warning(audit.warn( + "found no deployed policies or policy-filters: {}".format(log_line), + error_code=AuditResponseCode.DATA_ERROR)) + return policies, policy_filters + + DeployHandler._logger.info(log_line) + return policies, policy_filters + + @staticmethod + def _server_instance_changed(result, metrics): + """Checks whether the deployment-handler instance changed since last call.""" prev_server_instance_uuid = DeployHandler._server_instance_uuid DeployHandler._server_instance_uuid = result.get("server_instance_uuid") - deployment_handler_changed = (prev_server_instance_uuid - and prev_server_instance_uuid != DeployHandler._server_instance_uuid) - if deployment_handler_changed: + if (prev_server_instance_uuid + and prev_server_instance_uuid != DeployHandler._server_instance_uuid): + DeployHandler.server_instance_changed = True + log_line = ("deployment_handler_changed: {1} != {0}" .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) metrics.info(log_line) DeployHandler._logger.info(log_line) - - return deployment_handler_changed diff --git a/policyhandler/onap/CommonLogger.py b/policyhandler/onap/CommonLogger.py index 957b3aa..25b093f 100644 --- a/policyhandler/onap/CommonLogger.py +++ b/policyhandler/onap/CommonLogger.py @@ -1,953 +1,963 @@ -#!/usr/bin/python
-# -*- indent-tabs-mode: nil -*- vi: set expandtab:
-
-# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-"""ECOMP Common Logging library in Python.
-
-CommonLogger.py
-
-Original Written by: Terry Schmalzried
-Date written: October 1, 2015
-Last updated: December 1, 2016
-
-version 0.8
-"""
-
-from __future__ import print_function
-import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections
-
-class CommonLogger:
- """ECOMP Common Logging object.
-
- Public methods:
- __init__
- setFields
- debug
- info
- warn
- error
- fatal
- """
-
- UnknownFile = -1
- ErrorFile = 0
- DebugFile = 1
- AuditFile = 2
- MetricsFile = 3
- DateFmt = '%Y-%m-%dT%H:%M:%S'
- verbose = False
-
- def __init__(self, configFile, logKey, **kwargs):
- """Construct a Common Logger for one Log File.
-
- Arguments:
- configFile -- configuration filename.
- logKey -- the keyword in configFile that identifies the log filename.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages,
- one of CommonLogger.ErrorFile, CommonLogger.DebugFile,
- CommonLogger.AuditFile and CommonLogger.MetricsFile, or
- one of the strings "error", "debug", "audit" or "metrics".
- May also be set in the config file using a field named
- <logKey>Style (where <logKey> is the value of the logKey
- parameter). The keyword value overrides the value in the
- config file.
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._monitorFlag = False
-
- # Get configuration parameters
- self._logKey = str(logKey)
- self._configFile = str(configFile)
- self._rotateMethod = 'time'
- self._timeRotateIntervalType = 'midnight'
- self._timeRotateInterval = 1
- self._sizeMaxBytes = 0
- self._sizeRotateMode = 'a'
- self._socketHost = None
- self._socketPort = 0
- self._typeLogger = 'filelogger'
- self._backupCount = 6
- self._logLevelThreshold = self._intLogLevel('')
- self._logFile = None
- self._begTime = None
- self._begMsec = 0
- self._fields = {}
- self._fields["style"] = CommonLogger.UnknownFile
- try:
- self._configFileModified = os.path.getmtime(self._configFile)
- for line in open(self._configFile):
- line = line.split('#',1)[0] # remove comments
- if '=' in line:
- key, value = [x.strip() for x in line.split('=',1)]
- if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']:
- self._rotateMethod = value.lower()
- elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
- self._timeRotateIntervalType = value
- elif key == 'timeRotateInterval' and int( value ) > 0:
- self._timeRotateInterval = int( value )
- elif key == 'sizeMaxBytes' and int( value ) >= 0:
- self._sizeMaxBytes = int( value )
- elif key == 'sizeRotateMode' and value in ['a']:
- self._sizeRotateMode = value
- elif key == 'backupCount' and int( value ) >= 0:
- self._backupCount = int( value )
- elif key == self._logKey + 'SocketHost':
- self._socketHost = value
- elif key == self._logKey + 'SocketPort' and int( value ) == 0:
- self._socketPort = int( value )
- elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
- self._typeLogger = value.lower()
- elif key == self._logKey + 'LogLevel':
- self._logLevelThreshold = self._intLogLevel(value.upper())
- elif key == self._logKey + 'Style':
- self._fields["style"] = value
- elif key == self._logKey:
- self._logFile = value
- except Exception as x:
- print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)), file=sys.stderr)
- sys.exit(2)
- except:
- print("exception reading '%s' configuration file" %(self._configFile), file=sys.stderr)
- sys.exit(2)
-
- if self._logFile is None:
- print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey), file=sys.stderr)
- sys.exit(2)
-
-
- # initialize default log fields
- # timestamp will automatically be generated
- for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
- 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
- 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
- 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
- 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
- 'errorDescription' ]:
- if key in kwargs and kwargs[key] != None:
- self._fields[key] = kwargs[key]
-
- self._resetStyleField()
-
- # Set up logger
- self._logLock = threading.Lock()
- with self._logLock:
- self._logger = logging.getLogger(self._logKey)
- self._logger.propagate = False
- self._createLogger()
-
- self._defaultServerInfo()
-
- # spawn a thread to monitor configFile for logLevel and logFile changes
- self._monitorFlag = True
- self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=())
- self._monitorThread.daemon = True
- self._monitorThread.start()
-
-
- def _createLogger(self):
- if self._typeLogger == 'filelogger':
- self._mkdir_p(self._logFile)
- if self._rotateMethod == 'time':
- self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \
- when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \
- backupCount=self._backupCount, encoding=None, delay=False, utc=True)
- elif self._rotateMethod == 'size':
- self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \
- mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \
- backupCount=self._backupCount, encoding=None, delay=False)
-
- else:
- self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \
- mode=self._sizeRotateMode, \
- encoding=None, delay=False)
- elif self._typeLogger == 'stderrlogger':
- self._logHandler = logging.handlers.StreamHandler(sys.stderr)
- elif self._typeLogger == 'stdoutlogger':
- self._logHandler = logging.handlers.StreamHandler(sys.stdout)
- elif self._typeLogger == 'socketlogger':
- self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort)
- elif self._typeLogger == 'nulllogger':
- self._logHandler = logging.handlers.NullHandler()
-
- if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile:
- self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt)
- else:
- self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
- self._logFormatter.converter = time.gmtime
- self._logHandler.setFormatter(self._logFormatter)
- self._logger.addHandler(self._logHandler)
-
- def _resetStyleField(self):
- styleFields = ["error", "debug", "audit", "metrics"]
- if self._fields['style'] in styleFields:
- self._fields['style'] = styleFields.index(self._fields['style'])
-
- def __del__(self):
- if self._monitorFlag == False:
- return
-
- self._monitorFlag = False
-
- if self._monitorThread is not None and self._monitorThread.is_alive():
- self._monitorThread.join()
-
- self._monitorThread = None
-
-
- def _defaultServerInfo(self):
-
- # If not set or purposely set = None, then set default
- if self._fields.get('server') is None:
- try:
- self._fields['server'] = socket.getfqdn()
- except Exception as err:
- try:
- self._fields['server'] = socket.gethostname()
- except Exception as err:
- self._fields['server'] = ""
-
- # If not set or purposely set = None, then set default
- if self._fields.get('serverIPAddress') is None:
- try:
- self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server'])
- except Exception as err:
- self._fields['serverIPAddress'] = ""
-
-
- def _monitorConfigFile(self):
- while self._monitorFlag:
- try:
- fileTime = os.path.getmtime(self._configFile)
- if fileTime > self._configFileModified:
- self._configFileModified = fileTime
- ReopenLogFile = False
- logFile = self._logFile
- with open(self._configFile) as fp:
- for line in fp:
- line = line.split('#',1)[0] # remove comments
- if '=' in line:
- key, value = [x.strip() for x in line.split('=',1)]
- if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value:
- self._rotateMethod = value.lower()
- ReopenLogFile = True
- elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']:
- self._timeRotateIntervalType = value
- ReopenLogFile = True
- elif key == 'timeRotateInterval' and int( value ) > 0:
- self._timeRotateInterval = int( value )
- ReopenLogFile = True
- elif key == 'sizeMaxBytes' and int( value ) >= 0:
- self._sizeMaxBytes = int( value )
- ReopenLogFile = True
- elif key == 'sizeRotateMode' and value in ['a']:
- self._sizeRotateMode = value
- ReopenLogFile = True
- elif key == 'backupCount' and int( value ) >= 0:
- self._backupCount = int( value )
- ReopenLogFile = True
- elif key == self._logKey + 'SocketHost' and self._socketHost != value:
- self._socketHost = value
- ReopenLogFile = True
- elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ):
- self._socketPort = int( value )
- ReopenLogFile = True
- elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ):
- self._logLevelThreshold = self._intLogLevel(value.upper())
- elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']:
- self._typeLogger = value.lower()
- ReopenLogFile = True
- elif key == self._logKey + 'Style':
- self._fields["style"] = value
- self._resetStyleField()
- elif key == self._logKey and self._logFile != value:
- logFile = value
- ReopenLogFile = True
- if ReopenLogFile:
- with self._logLock:
- self._logger.removeHandler(self._logHandler)
- self._logFile = logFile
- self._createLogger()
- except Exception as err:
- pass
-
- time.sleep(5)
-
-
- def setFields(self, **kwargs):
- """Set default values for log fields.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \
- 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \
- 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \
- 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \
- 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \
- 'errorDescription' ]:
- if key in kwargs:
- if kwargs[key] != None:
- self._fields[key] = kwargs[key]
- elif key in self._fields:
- del self._fields[key]
-
- self._defaultServerInfo()
-
-
- def debug(self, message, **kwargs):
- """Write a DEBUG level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs)
-
- def info(self, message, **kwargs):
- """Write an INFO level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('INFO', message, errorCategory = 'INFO', **kwargs)
-
- def warn(self, message, **kwargs):
- """Write a WARN level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('WARN', message, errorCategory = 'WARN', **kwargs)
-
- def error(self, message, **kwargs):
- """Write an ERROR level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('ERROR', message, errorCategory = 'ERROR', **kwargs)
-
- def fatal(self, message, **kwargs):
- """Write a FATAL level message to the log file.
-
- Arguments:
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- self._log('FATAL', message, errorCategory = 'FATAL', **kwargs)
-
- def _log(self, logLevel, message, **kwargs):
- """Write a message to the log file.
-
- Arguments:
- logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record.
- message -- value for the last log record field.
-
- Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error
- style -- the log file format (style) to use when writing log messages
- requestID (dame) -- optional default value for this log record field.
- serviceInstanceID (am) -- optional default value for this log record field.
- threadID (am) -- optional default value for this log record field.
- serverName (am) -- optional default value for this log record field.
- serviceName (am) -- optional default value for this log record field.
- instanceUUID (am) -- optional default value for this log record field.
- severity (am) -- optional default value for this log record field.
- serverIPAddress (am) -- optional default value for this log record field.
- server (am) -- optional default value for this log record field.
- IPAddress (am) -- optional default value for this log record field.
- className (am) -- optional default value for this log record field.
- timer (am) -- (ElapsedTime) optional default value for this log record field.
- partnerName (ame) -- optional default value for this log record field.
- targetEntity (me) -- optional default value for this log record field.
- targetServiceName (me) -- optional default value for this log record field.
- statusCode (am) -- optional default value for this log record field.
- responseCode (am) -- optional default value for this log record field.
- responseDescription (am) -- optional default value for this log record field.
- processKey (am) -- optional default value for this log record field.
- targetVirtualEntity (m) -- optional default value for this log record field.
- customField1 (am) -- optional default value for this log record field.
- customField2 (am) -- optional default value for this log record field.
- customField3 (am) -- optional default value for this log record field.
- customField4 (am) -- optional default value for this log record field.
- errorCategory (e) -- optional default value for this log record field.
- errorCode (e) -- optional default value for this log record field.
- errorDescription (e) -- optional default value for this log record field.
- begTime (am) -- optional starting time for this audit/metrics log record.
-
- Note: the pipe '|' character is not allowed in any log record field.
- """
-
- # timestamp will automatically be inserted
- style = int(self._getVal('style', '', **kwargs))
- requestID = self._getVal('requestID', '', **kwargs)
- serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs)
- threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs)
- serverName = self._getVal('serverName', '', **kwargs)
- serviceName = self._getVal('serviceName', '', **kwargs)
- instanceUUID = self._getVal('instanceUUID', '', **kwargs)
- upperLogLevel = self._noSep(logLevel.upper())
- severity = self._getVal('severity', '', **kwargs)
- serverIPAddress = self._getVal('serverIPAddress', '', **kwargs)
- server = self._getVal('server', '', **kwargs)
- IPAddress = self._getVal('IPAddress', '', **kwargs)
- className = self._getVal('className', '', **kwargs)
- timer = self._getVal('timer', '', **kwargs)
- partnerName = self._getVal('partnerName', '', **kwargs)
- targetEntity = self._getVal('targetEntity', '', **kwargs)
- targetServiceName = self._getVal('targetServiceName', '', **kwargs)
- statusCode = self._getVal('statusCode', '', **kwargs)
- responseCode = self._getVal('responseCode', '', **kwargs)
- responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs))
- processKey = self._getVal('processKey', '', **kwargs)
- targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs)
- customField1 = self._getVal('customField1', '', **kwargs)
- customField2 = self._getVal('customField2', '', **kwargs)
- customField3 = self._getVal('customField3', '', **kwargs)
- customField4 = self._getVal('customField4', '', **kwargs)
- errorCategory = self._getVal('errorCategory', '', **kwargs)
- errorCode = self._getVal('errorCode', '', **kwargs)
- errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs))
- nbegTime = self._getArg('begTime', {}, **kwargs)
-
- detailMessage = self._noSep(message)
- if bool(re.match(r" *$", detailMessage)):
- return # don't log empty messages
-
- useLevel = self._intLogLevel(upperLogLevel)
- if CommonLogger.verbose: print("logger STYLE=%s" % style)
- if useLevel < self._logLevelThreshold:
- if CommonLogger.verbose: print("skipping because of level")
- pass
- else:
- with self._logLock:
- if style == CommonLogger.ErrorFile:
- if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
- errorCategory, errorCode, errorDescription, detailMessage))
- elif style == CommonLogger.DebugFile:
- if CommonLogger.verbose: print("using CommonLogger.DebugFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
- severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
- elif style == CommonLogger.AuditFile:
- if CommonLogger.verbose: print("using CommonLogger.AuditFile")
- endAuditTime, endAuditMsec = self._getTime()
- if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
- d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- elif self._begTime is not None:
- d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- else:
- d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, 'endmsecs': endAuditMsec }
- self._begTime = None
- unused = ""
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
- severity, serverIPAddress, timer, server, IPAddress, className, unused,
- processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d)
- elif style == CommonLogger.MetricsFile:
- if CommonLogger.verbose: print("using CommonLogger.MetricsFile")
- endMetricsTime, endMetricsMsec = self._getTime()
- if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime:
- d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- elif self._begTime is not None:
- d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- else:
- d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec }
- self._begTime = None
- unused = ""
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
- instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress,
- className, unused, processKey, targetVirtualEntity, customField1, customField2,
- customField3, customField4, detailMessage), extra=d)
- else:
- print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"])
-
- def _getTime(self):
- ct = time.time()
- lt = time.localtime(ct)
- return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000)
-
- def setStartRecordEvent(self):
- """
- Set the start time to be saved for both audit and metrics records
- """
- self._begTime, self._begMsec = self._getTime()
-
- def getStartRecordEvent(self):
- """
- Retrieve the start time to be used for either audit and metrics records
- """
- begTime, begMsec = self._getTime()
- return {'begTime':begTime, 'begMsec':begMsec}
-
- def _getVal(self, key, default, **kwargs):
- val = self._fields.get(key)
- if key in kwargs: val = kwargs[key]
- if val is None: val = default
- return self._noSep(val)
-
- def _getArg(self, key, default, **kwargs):
- val = None
- if key in kwargs: val = kwargs[key]
- if val is None: val = default
- return val
-
- def _noSep(self, message):
- if message is None: return ''
- return re.sub(r'[\|\n]', ' ', str(message))
-
- def _intLogLevel(self, logLevel):
- if logLevel == 'FATAL': useLevel = 50
- elif logLevel == 'ERROR': useLevel = 40
- elif logLevel == 'WARN': useLevel = 30
- elif logLevel == 'INFO': useLevel = 20
- elif logLevel == 'DEBUG': useLevel = 10
- else: useLevel = 0
- return useLevel
-
- def _mkdir_p(self, filename):
- """Create missing directories from a full filename path like mkdir -p"""
-
- if filename is None:
- return
-
- folder=os.path.dirname(filename)
-
- if folder == "":
- return
-
- if not os.path.exists(folder):
- try:
- os.makedirs(folder)
- except OSError as err:
- print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror), file=sys.stderr)
- sys.exit(2)
- except Exception as err:
- print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)), file=sys.stderr)
- sys.exit(2)
-
-if __name__ == "__main__":
-
- def __checkOneTime(line):
- format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]'
- m = re.match(format, line)
- if not m:
- print("ERROR: time string did not match proper time format, %s" %line)
- print("\t: format=%s" % format)
- return 1
- return 0
-
- def __checkTwoTimes(line, different):
- format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]'
- m = re.match(format, line)
- if not m:
- print("ERROR: time strings did not match proper time format, %s" %line)
- print("\t: format=%s" % format)
- return 1
- second1 = int(m.group(1))
- msec1 = int(m.group(2))
- second2 = int(m.group(3))
- msec2 = int(m.group(4))
- if second1 > second2: second2 += 60
- t1 = second1 * 1000 + msec1
- t2 = second2 * 1000 + msec2
- diff = t2 - t1
- # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff))
- if different:
- if diff < 500:
- print("ERROR: times did not differ enough: %s" % line)
- return 1
- else:
- if diff > 10:
- print("ERROR: times were too far apart: %s" % line)
- return 1
- return 0
-
- def __checkBegTime(line):
- format = "begTime should be ([-0-9T:]+)"
- # print("checkBegTime(%s)" % line)
- strt = 'begTime should be '
- i = line.index(strt)
- rest = line[i+len(strt):].rstrip()
- if not line.startswith(rest + ","):
- print("ERROR: line %s should start with %s" % (line,rest))
- return 1
- return 0
-
- def __checkLog(logfile, numLines, numFields):
- lineCount = 0
- errorCount = 0
- with open(logfile, "r") as fp:
- for line in fp:
- # print("saw line %s" % line)
- lineCount += 1
- c = line.count('|')
- if c != numFields:
- print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line))
- errorCount += 1
- if re.search("should not appear", line):
- print("ERROR: a line appeared that should not have appeared, %s" % line)
- errorCount += 1
- elif re.search("single time", line):
- errorCount += __checkOneTime(line)
- elif re.search("time should be the same", line):
- errorCount += __checkTwoTimes(line, different=False)
- elif re.search("time should be ", line):
- errorCount += __checkTwoTimes(line, different=True)
- elif re.search("begTime should be ", line):
- errorCount += __checkBegTime(line)
- else:
- print("ERROR: an unknown message appeared, %s" % line)
- errorCount += 1
-
- if lineCount != numLines:
- print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount))
- errorCount += 1
- return errorCount
-
- import os, argparse
- parser = argparse.ArgumentParser(description="test the CommonLogger functions")
- parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true")
- parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true")
- args = parser.parse_args()
-
- spid = str(os.getpid())
- if args.keeplogs:
- spid = ""
- logcfg = "/tmp/cl.log" + spid + ".cfg"
- errorLog = "/tmp/cl.error" + spid + ".log"
- metricsLog = "/tmp/cl.metrics" + spid + ".log"
- auditLog = "/tmp/cl.audit" + spid + ".log"
- debugLog = "/tmp/cl.debug" + spid + ".log"
- if args.verbose: CommonLogger.verbose = True
-
- import atexit
- def cleanupTmps():
- for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]:
- try:
- os.remove(f)
- except:
- pass
- if not args.keeplogs:
- atexit.register(cleanupTmps)
-
- with open(logcfg, "w") as o:
- o.write("error = " + errorLog + "\n" +
- "errorLogLevel = WARN\n" +
- "metrics = " + metricsLog + "\n" +
- "metricsLogLevel = INFO\n" +
- "audit = " + auditLog + "\n" +
- "auditLogLevel = INFO\n" +
- "debug = " + debugLog + "\n" +
- "debugLogLevel = DEBUG\n")
-
- import uuid
- instanceUUID = uuid.uuid1()
- serviceName = "testharness"
- errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName)
- debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName)
- auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName)
- metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName)
-
- testsRun = 0
- errorCount = 0
- errorLogger.debug("error calling debug (should not appear)")
- errorLogger.info("error calling info (should not appear)")
- errorLogger.warn("error calling warn (single time)")
- errorLogger.error("error calling error (single time)")
- errorLogger.setStartRecordEvent()
- time.sleep(1)
- errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)")
- testsRun += 6
- errorCount += __checkLog(errorLog, 3, 10)
-
- auditLogger.debug("audit calling debug (should not appear)")
- auditLogger.info("audit calling info (time should be the same)")
- auditLogger.warn("audit calling warn (time should be the same)")
- auditLogger.error("audit calling error (time should be the same)")
- bt = auditLogger.getStartRecordEvent()
- # print("bt=%s" % bt)
- time.sleep(1)
- auditLogger.setStartRecordEvent()
- time.sleep(1)
- auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)")
- time.sleep(1)
- auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
- testsRun += 7
- errorCount += __checkLog(auditLog, 5, 25)
-
- debugLogger.debug("debug calling debug (single time)")
- debugLogger.info("debug calling info (single time)")
- debugLogger.warn("debug calling warn (single time)")
- debugLogger.setStartRecordEvent()
- time.sleep(1)
- debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)")
- debugLogger.fatal("debug calling fatal (single time)")
- errorCount += __checkLog(debugLog, 5, 13)
- testsRun += 6
-
- metricsLogger.debug("metrics calling debug (should not appear)")
- metricsLogger.info("metrics calling info (time should be the same)")
- metricsLogger.warn("metrics calling warn (time should be the same)")
- bt = metricsLogger.getStartRecordEvent()
- time.sleep(1)
- metricsLogger.setStartRecordEvent()
- time.sleep(1)
- metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different")
- metricsLogger.fatal("metrics calling fatal (time should be the same)")
- time.sleep(1)
- metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt)
- testsRun += 6
- errorCount += __checkLog(metricsLog, 5, 28)
-
- print("%d tests run, %d errors found" % (testsRun, errorCount))
+#!/usr/bin/python +# -*- indent-tabs-mode: nil -*- vi: set expandtab: + +# ================================================================================ +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""ECOMP Common Logging library in Python. + +CommonLogger.py + +Original Written by: Terry Schmalzried +Date written: October 1, 2015 +Last updated: December 1, 2016 + +version 0.8 +""" + +from __future__ import print_function +import os, sys, getopt, logging, logging.handlers, time, re, uuid, socket, threading, collections + +class CommonLogger: + """ECOMP Common Logging object. + + Public methods: + __init__ + setFields + debug + info + warn + error + fatal + """ + + UnknownFile = -1 + ErrorFile = 0 + DebugFile = 1 + AuditFile = 2 + MetricsFile = 3 + DateFmt = '%Y-%m-%dT%H:%M:%S' + verbose = False + + def __init__(self, configFile, logKey, **kwargs): + """Construct a Common Logger for one Log File. + + Arguments: + configFile -- configuration filename. + logKey -- the keyword in configFile that identifies the log filename. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages, + one of CommonLogger.ErrorFile, CommonLogger.DebugFile, + CommonLogger.AuditFile and CommonLogger.MetricsFile, or + one of the strings "error", "debug", "audit" or "metrics". + May also be set in the config file using a field named + <logKey>Style (where <logKey> is the value of the logKey + parameter). The keyword value overrides the value in the + config file. + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._monitorFlag = False + + # Get configuration parameters + self._logKey = str(logKey) + self._configFile = str(configFile) + self._rotateMethod = 'time' + self._timeRotateIntervalType = 'midnight' + self._timeRotateInterval = 1 + self._sizeMaxBytes = 0 + self._sizeRotateMode = 'a' + self._socketHost = None + self._socketPort = 0 + self._typeLogger = 'filelogger' + self._backupCount = 6 + self._logLevelThreshold = self._intLogLevel('') + self._logFile = None + self._begTime = None + self._begMsec = 0 + self._fields = {} + self._fields["style"] = CommonLogger.UnknownFile + try: + self._configFileModified = os.path.getmtime(self._configFile) + for line in open(self._configFile): + line = line.split('#',1)[0] # remove comments + if '=' in line: + key, value = [x.strip() for x in line.split('=',1)] + if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none']: + self._rotateMethod = value.lower() + elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: + self._timeRotateIntervalType = value + elif key == 'timeRotateInterval' and int( value ) > 0: + self._timeRotateInterval = int( value ) + elif key == 'sizeMaxBytes' and int( value ) >= 0: + self._sizeMaxBytes = int( value ) + elif key == 'sizeRotateMode' and value in ['a']: + self._sizeRotateMode = value + elif key == 'backupCount' and int( value ) >= 0: + self._backupCount = int( value ) + elif key == self._logKey + 'SocketHost': + self._socketHost = value + elif key == self._logKey + 'SocketPort' and int( value ) == 0: + self._socketPort = int( value ) + elif key == self._logKey + 'LogType' and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: + self._typeLogger = value.lower() + elif key == self._logKey + 'LogLevel': + self._logLevelThreshold = self._intLogLevel(value.upper()) + elif key == self._logKey + 'Style': + self._fields["style"] = value + elif key == self._logKey: + self._logFile = value + except Exception as x: + print("exception reading '%s' configuration file: %s" %(self._configFile, str(x)), file=sys.stderr) + sys.exit(2) + except: + print("exception reading '%s' configuration file" %(self._configFile), file=sys.stderr) + sys.exit(2) + + if self._logFile is None: + print('configuration file %s is missing definition %s for log file' %(self._configFile, self._logKey), file=sys.stderr) + sys.exit(2) + + + # initialize default log fields + # timestamp will automatically be generated + for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ + 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ + 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ + 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ + 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ + 'errorDescription' ]: + if key in kwargs and kwargs[key] != None: + self._fields[key] = kwargs[key] + + self._resetStyleField() + + # Set up logger + self._logLock = threading.Lock() + with self._logLock: + self._logger = logging.getLogger(self._logKey) + self._logger.propagate = False + self._createLogger() + + self._defaultServerInfo() + + # spawn a thread to monitor configFile for logLevel and logFile changes + self._monitorFlag = True + self._monitorThread = threading.Thread(target=self._monitorConfigFile, args=()) + self._monitorThread.daemon = True + self._monitorThread.start() + + + def _createLogger(self): + if self._typeLogger == 'filelogger': + self._mkdir_p(self._logFile) + if self._rotateMethod == 'time': + self._logHandler = logging.handlers.TimedRotatingFileHandler(self._logFile, \ + when=self._timeRotateIntervalType, interval=self._timeRotateInterval, \ + backupCount=self._backupCount, encoding=None, delay=False, utc=True) + elif self._rotateMethod == 'size': + self._logHandler = logging.handlers.RotatingFileHandler(self._logFile, \ + mode=self._sizeRotateMode, maxBytes=self._sizeMaxBytes, \ + backupCount=self._backupCount, encoding=None, delay=False) + + else: + self._logHandler = logging.handlers.WatchedFileHandler(self._logFile, \ + mode=self._sizeRotateMode, \ + encoding=None, delay=False) + elif self._typeLogger == 'stderrlogger': + self._logHandler = logging.handlers.StreamHandler(sys.stderr) + elif self._typeLogger == 'stdoutlogger': + self._logHandler = logging.handlers.StreamHandler(sys.stdout) + elif self._typeLogger == 'socketlogger': + self._logHandler = logging.handlers.SocketHandler(self._socketHost, self._socketPort) + elif self._typeLogger == 'nulllogger': + self._logHandler = logging.handlers.NullHandler() + + if self._fields["style"] == CommonLogger.AuditFile or self._fields["style"] == CommonLogger.MetricsFile: + self._logFormatter = logging.Formatter(fmt='%(begtime)s,%(begmsecs)03d+00:00|%(endtime)s,%(endmsecs)03d+00:00|%(message)s', datefmt=CommonLogger.DateFmt) + else: + self._logFormatter = logging.Formatter(fmt='%(asctime)s,%(msecs)03d+00:00|%(message)s', datefmt='%Y-%m-%dT%H:%M:%S') + self._logFormatter.converter = time.gmtime + self._logHandler.setFormatter(self._logFormatter) + self._logger.addHandler(self._logHandler) + + def _resetStyleField(self): + styleFields = ["error", "debug", "audit", "metrics"] + if self._fields['style'] in styleFields: + self._fields['style'] = styleFields.index(self._fields['style']) + + def __del__(self): + if self._monitorFlag == False: + return + + self._monitorFlag = False + + if self._monitorThread is not None and self._monitorThread.is_alive(): + self._monitorThread.join() + + self._monitorThread = None + + + def _defaultServerInfo(self): + + # If not set or purposely set = None, then set default + if self._fields.get('server') is None: + try: + self._fields['server'] = socket.getfqdn() + except Exception as err: + try: + self._fields['server'] = socket.gethostname() + except Exception as err: + self._fields['server'] = "" + + # If not set or purposely set = None, then set default + if self._fields.get('serverIPAddress') is None: + try: + self._fields['serverIPAddress'] = socket.gethostbyname(self._fields['server']) + except Exception as err: + self._fields['serverIPAddress'] = "" + + + def _monitorConfigFile(self): + while self._monitorFlag: + try: + fileTime = os.path.getmtime(self._configFile) + if fileTime > self._configFileModified: + self._configFileModified = fileTime + ReopenLogFile = False + logFile = self._logFile + with open(self._configFile) as fp: + for line in fp: + line = line.split('#',1)[0] # remove comments + if '=' in line: + key, value = [x.strip() for x in line.split('=',1)] + if key == 'rotateMethod' and value.lower() in ['time', 'size', 'none'] and self._rotateMethod != value: + self._rotateMethod = value.lower() + ReopenLogFile = True + elif key == 'timeRotateIntervalType' and value in ['S', 'M', 'H', 'D', 'W0', 'W1', 'W2', 'W3', 'W4', 'W5', 'W6', 'midnight']: + self._timeRotateIntervalType = value + ReopenLogFile = True + elif key == 'timeRotateInterval' and int( value ) > 0: + self._timeRotateInterval = int( value ) + ReopenLogFile = True + elif key == 'sizeMaxBytes' and int( value ) >= 0: + self._sizeMaxBytes = int( value ) + ReopenLogFile = True + elif key == 'sizeRotateMode' and value in ['a']: + self._sizeRotateMode = value + ReopenLogFile = True + elif key == 'backupCount' and int( value ) >= 0: + self._backupCount = int( value ) + ReopenLogFile = True + elif key == self._logKey + 'SocketHost' and self._socketHost != value: + self._socketHost = value + ReopenLogFile = True + elif key == self._logKey + 'SocketPort' and self._socketPort > 0 and self._socketPort != int( value ): + self._socketPort = int( value ) + ReopenLogFile = True + elif key == self._logKey + 'LogLevel' and self._logLevelThreshold != self._intLogLevel( value.upper() ): + self._logLevelThreshold = self._intLogLevel(value.upper()) + elif key == self._logKey + 'LogType' and self._typeLogger != value and value.lower() in ['filelogger', 'stdoutlogger', 'stderrlogger', 'socketlogger', 'nulllogger']: + self._typeLogger = value.lower() + ReopenLogFile = True + elif key == self._logKey + 'Style': + self._fields["style"] = value + self._resetStyleField() + elif key == self._logKey and self._logFile != value: + logFile = value + ReopenLogFile = True + if ReopenLogFile: + with self._logLock: + self._logger.removeHandler(self._logHandler) + self._logFile = logFile + self._createLogger() + except Exception as err: + pass + + time.sleep(5) + + + def setFields(self, **kwargs): + """Set default values for log fields. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + for key in ['style', 'requestID', 'serviceInstanceID', 'threadID', 'serverName', 'serviceName', 'instanceUUID', \ + 'severity', 'serverIPAddress', 'server', 'IPAddress', 'className', 'timer', \ + 'partnerName', 'targetEntity', 'targetServiceName', 'statusCode', 'responseCode', \ + 'responseDescription', 'processKey', 'targetVirtualEntity', 'customField1', \ + 'customField2', 'customField3', 'customField4', 'errorCategory', 'errorCode', \ + 'errorDescription' ]: + if key in kwargs: + if kwargs[key] != None: + self._fields[key] = kwargs[key] + elif key in self._fields: + del self._fields[key] + + self._defaultServerInfo() + + + def debug(self, message, **kwargs): + """Write a DEBUG level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('DEBUG', message, errorCategory = 'DEBUG', **kwargs) + + def info(self, message, **kwargs): + """Write an INFO level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('INFO', message, errorCategory = 'INFO', **kwargs) + + def warn(self, message, **kwargs): + """Write a WARN level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('WARN', message, errorCategory = 'WARN', **kwargs) + + def error(self, message, **kwargs): + """Write an ERROR level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('ERROR', message, errorCategory = 'ERROR', **kwargs) + + def fatal(self, message, **kwargs): + """Write a FATAL level message to the log file. + + Arguments: + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + self._log('FATAL', message, errorCategory = 'FATAL', **kwargs) + + def _log(self, logLevel, message, **kwargs): + """Write a message to the log file. + + Arguments: + logLevel -- value ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', ...) for the log record. + message -- value for the last log record field. + + Keyword arguments: Annotations are d:debug, a=audit, m=metrics, e=error + style -- the log file format (style) to use when writing log messages + requestID (dame) -- optional default value for this log record field. + serviceInstanceID (am) -- optional default value for this log record field. + threadID (am) -- optional default value for this log record field. + serverName (am) -- optional default value for this log record field. + serviceName (am) -- optional default value for this log record field. + instanceUUID (am) -- optional default value for this log record field. + severity (am) -- optional default value for this log record field. + serverIPAddress (am) -- optional default value for this log record field. + server (am) -- optional default value for this log record field. + IPAddress (am) -- optional default value for this log record field. + className (am) -- optional default value for this log record field. + timer (am) -- (ElapsedTime) optional default value for this log record field. + partnerName (ame) -- optional default value for this log record field. + targetEntity (me) -- optional default value for this log record field. + targetServiceName (me) -- optional default value for this log record field. + statusCode (am) -- optional default value for this log record field. + responseCode (am) -- optional default value for this log record field. + responseDescription (am) -- optional default value for this log record field. + processKey (am) -- optional default value for this log record field. + targetVirtualEntity (m) -- optional default value for this log record field. + customField1 (am) -- optional default value for this log record field. + customField2 (am) -- optional default value for this log record field. + customField3 (am) -- optional default value for this log record field. + customField4 (am) -- optional default value for this log record field. + errorCategory (e) -- optional default value for this log record field. + errorCode (e) -- optional default value for this log record field. + errorDescription (e) -- optional default value for this log record field. + begTime (am) -- optional starting time for this audit/metrics log record. + + Note: the pipe '|' character is not allowed in any log record field. + """ + + # timestamp will automatically be inserted + style = int(self._getVal('style', '', **kwargs)) + requestID = self._getVal('requestID', '', **kwargs) + serviceInstanceID = self._getVal('serviceInstanceID', '', **kwargs) + threadID = self._getVal('threadID', threading.currentThread().getName(), **kwargs) + serverName = self._getVal('serverName', '', **kwargs) + serviceName = self._getVal('serviceName', '', **kwargs) + instanceUUID = self._getVal('instanceUUID', '', **kwargs) + upperLogLevel = self._noSep(logLevel.upper()) + severity = self._getVal('severity', '', **kwargs) + serverIPAddress = self._getVal('serverIPAddress', '', **kwargs) + server = self._getVal('server', '', **kwargs) + IPAddress = self._getVal('IPAddress', '', **kwargs) + className = self._getVal('className', '', **kwargs) + timer = self._getVal('timer', '', **kwargs) + partnerName = self._getVal('partnerName', '', **kwargs) + targetEntity = self._getVal('targetEntity', '', **kwargs) + targetServiceName = self._getVal('targetServiceName', '', **kwargs) + statusCode = self._getVal('statusCode', '', **kwargs) + responseCode = self._getVal('responseCode', '', **kwargs) + responseDescription = self._noSep(self._getVal('responseDescription', '', **kwargs)) + processKey = self._getVal('processKey', '', **kwargs) + targetVirtualEntity = self._getVal('targetVirtualEntity', '', **kwargs) + customField1 = self._getVal('customField1', '', **kwargs) + customField2 = self._getVal('customField2', '', **kwargs) + customField3 = self._getVal('customField3', '', **kwargs) + customField4 = self._getVal('customField4', '', **kwargs) + errorCategory = self._getVal('errorCategory', '', **kwargs) + errorCode = self._getVal('errorCode', '', **kwargs) + errorDescription = self._noSep(self._getVal('errorDescription', '', **kwargs)) + nbegTime = self._getArg('begTime', {}, **kwargs) + + detailMessage = self._noSep(message) + if bool(re.match(r" *$", detailMessage)): + return # don't log empty messages + + useLevel = self._intLogLevel(upperLogLevel) + if CommonLogger.verbose: print("logger STYLE=%s" % style) + if useLevel < self._logLevelThreshold: + if CommonLogger.verbose: print("skipping because of level") + pass + else: + with self._logLock: + if style == CommonLogger.ErrorFile: + if CommonLogger.verbose: print("using CommonLogger.ErrorFile") + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName, + errorCategory, errorCode, errorDescription, detailMessage)) + elif style == CommonLogger.DebugFile: + if CommonLogger.verbose: print("using CommonLogger.DebugFile") + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel, + severity, serverIPAddress, server, IPAddress, className, timer, detailMessage)) + elif style == CommonLogger.AuditFile: + if CommonLogger.verbose: print("using CommonLogger.AuditFile") + endAuditTime, endAuditMsec, endSeconds = self._getTime() + if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: + d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'begseconds': nbegTime['begSeconds'], + 'endtime': endAuditTime, 'endmsecs': endAuditMsec, 'endseconds': endSeconds } + elif self._begTime is not None: + d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'begseconds': self._begSeconds, + 'endtime': endAuditTime, 'endmsecs': endAuditMsec, 'endseconds': endSeconds } + else: + d = { 'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'begseconds': endSeconds, + 'endtime': endAuditTime, 'endmsecs': endAuditMsec, 'endseconds': endSeconds } + if not timer: + timer = int(d["endseconds"] * 1000.0 + d["endmsecs"] - d["begseconds"] * 1000.0 - d["begmsecs"]) + self._begTime = self._begMsec = None + unused = "" + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, + statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel, + severity, serverIPAddress, timer, server, IPAddress, className, unused, + processKey, customField1, customField2, customField3, customField4, detailMessage), extra=d) + elif style == CommonLogger.MetricsFile: + if CommonLogger.verbose: print("using CommonLogger.MetricsFile") + endMetricsTime, endMetricsMsec, endSeconds = self._getTime() + if type(nbegTime) is dict and 'begTime' in nbegTime and 'begMsec' in nbegTime: + d = { 'begtime': self._noSep(nbegTime['begTime']), 'begmsecs': float(self._noSep(nbegTime['begMsec'])), 'begseconds': nbegTime['begSeconds'], + 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec, 'endseconds': endSeconds } + elif self._begTime is not None: + d = { 'begtime': self._begTime, 'begmsecs': self._begMsec, 'begseconds': self._begSeconds, + 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec, 'endseconds': endSeconds } + else: + d = { 'begtime': endMetricsTime, 'begmsecs': endMetricsMsec, 'begseconds': endSeconds, + 'endtime': endMetricsTime, 'endmsecs': endMetricsMsec, 'endseconds': endSeconds } + if not timer: + timer = int(d["endseconds"] * 1000.0 + d["endmsecs"] - d["begseconds"] * 1000.0 - d["begmsecs"]) + self._begTime = self._begMsec = None + unused = "" + self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ + %(requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, + targetEntity, targetServiceName, statusCode, responseCode, responseDescription, + instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, IPAddress, + className, unused, processKey, targetVirtualEntity, customField1, customField2, + customField3, customField4, detailMessage), extra=d) + else: + print("!!!!!!!!!!!!!!!! style not set: %s" % self._fields["style"]) + + def _getTime(self): + ct = time.time() + lt = time.localtime(ct) + return (time.strftime(CommonLogger.DateFmt, lt), (ct - int(ct)) * 1000, ct) + + def setStartRecordEvent(self): + """ + Set the start time to be saved for both audit and metrics records + """ + self._begTime, self._begMsec, self._begSeconds = self._getTime() + + def getStartRecordEvent(self): + """ + Retrieve the start time to be used for either audit and metrics records + """ + begTime, begMsec, begSeconds = self._getTime() + return {'begTime':begTime, 'begMsec':begMsec, 'begSeconds':begSeconds} + + def _getVal(self, key, default, **kwargs): + val = self._fields.get(key) + if key in kwargs: val = kwargs[key] + if val is None: val = default + return self._noSep(val) + + def _getArg(self, key, default, **kwargs): + val = None + if key in kwargs: val = kwargs[key] + if val is None: val = default + return val + + def _noSep(self, message): + if message is None: return '' + return re.sub(r'[\|\n]', ' ', str(message)) + + def _intLogLevel(self, logLevel): + if logLevel == 'FATAL': useLevel = 50 + elif logLevel == 'ERROR': useLevel = 40 + elif logLevel == 'WARN': useLevel = 30 + elif logLevel == 'INFO': useLevel = 20 + elif logLevel == 'DEBUG': useLevel = 10 + else: useLevel = 0 + return useLevel + + def _mkdir_p(self, filename): + """Create missing directories from a full filename path like mkdir -p""" + + if filename is None: + return + + folder=os.path.dirname(filename) + + if folder == "": + return + + if not os.path.exists(folder): + try: + os.makedirs(folder) + except OSError as err: + print("error number %d creating %s directory to hold %s logfile: %s" %(err.errno, err.filename, filename, err.strerror), file=sys.stderr) + sys.exit(2) + except Exception as err: + print("error creating %s directory to hold %s logfile: %s" %(folder, filename, str(err)), file=sys.stderr) + sys.exit(2) + +if __name__ == "__main__": + + def __checkOneTime(line): + format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[+]00:00[|]' + m = re.match(format, line) + if not m: + print("ERROR: time string did not match proper time format, %s" %line) + print("\t: format=%s" % format) + return 1 + return 0 + + def __checkTwoTimes(line, different): + format = r'[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|][0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:([0-9]{2}),([0-9]{3})[+]00:00[|]' + m = re.match(format, line) + if not m: + print("ERROR: time strings did not match proper time format, %s" %line) + print("\t: format=%s" % format) + return 1 + second1 = int(m.group(1)) + msec1 = int(m.group(2)) + second2 = int(m.group(3)) + msec2 = int(m.group(4)) + if second1 > second2: second2 += 60 + t1 = second1 * 1000 + msec1 + t2 = second2 * 1000 + msec2 + diff = t2 - t1 + # print("t1=%d (%d,%d) t2=%d (%d,%d), diff = %d" % (t1, second1, msec1, t2, second2, msec2, diff)) + if different: + if diff < 500: + print("ERROR: times did not differ enough: %s" % line) + return 1 + else: + if diff > 10: + print("ERROR: times were too far apart: %s" % line) + return 1 + return 0 + + def __checkBegTime(line): + format = "begTime should be ([-0-9T:]+)" + # print("checkBegTime(%s)" % line) + strt = 'begTime should be ' + i = line.index(strt) + rest = line[i+len(strt):].rstrip() + if not line.startswith(rest + ","): + print("ERROR: line %s should start with %s" % (line,rest)) + return 1 + return 0 + + def __checkLog(logfile, numLines, numFields): + lineCount = 0 + errorCount = 0 + with open(logfile, "r") as fp: + for line in fp: + # print("saw line %s" % line) + lineCount += 1 + c = line.count('|') + if c != numFields: + print("ERROR: wrong number of fields. Expected %d, got %d: %s" % (numFields, c, line)) + errorCount += 1 + if re.search("should not appear", line): + print("ERROR: a line appeared that should not have appeared, %s" % line) + errorCount += 1 + elif re.search("single time", line): + errorCount += __checkOneTime(line) + elif re.search("time should be the same", line): + errorCount += __checkTwoTimes(line, different=False) + elif re.search("time should be ", line): + errorCount += __checkTwoTimes(line, different=True) + elif re.search("begTime should be ", line): + errorCount += __checkBegTime(line) + else: + print("ERROR: an unknown message appeared, %s" % line) + errorCount += 1 + + if lineCount != numLines: + print("ERROR: expected %d lines, but got %d lines" % (numLines, lineCount)) + errorCount += 1 + return errorCount + + import os, argparse + parser = argparse.ArgumentParser(description="test the CommonLogger functions") + parser.add_argument("-k", "--keeplogs", help="Keep the log files after finishing the tests", action="store_true") + parser.add_argument("-v", "--verbose", help="Print debugging messages", action="store_true") + args = parser.parse_args() + + spid = str(os.getpid()) + if args.keeplogs: + spid = "" + logcfg = "/tmp/cl.log" + spid + ".cfg" + errorLog = "/tmp/cl.error" + spid + ".log" + metricsLog = "/tmp/cl.metrics" + spid + ".log" + auditLog = "/tmp/cl.audit" + spid + ".log" + debugLog = "/tmp/cl.debug" + spid + ".log" + if args.verbose: CommonLogger.verbose = True + + import atexit + def cleanupTmps(): + for f in [ logcfg, errorLog, metricsLog, auditLog, debugLog ]: + try: + os.remove(f) + except: + pass + if not args.keeplogs: + atexit.register(cleanupTmps) + + with open(logcfg, "w") as o: + o.write("error = " + errorLog + "\n" + + "errorLogLevel = WARN\n" + + "metrics = " + metricsLog + "\n" + + "metricsLogLevel = INFO\n" + + "audit = " + auditLog + "\n" + + "auditLogLevel = INFO\n" + + "debug = " + debugLog + "\n" + + "debugLogLevel = DEBUG\n") + + import uuid + instanceUUID = uuid.uuid1() + serviceName = "testharness" + errorLogger = CommonLogger(logcfg, "error", style=CommonLogger.ErrorFile, instanceUUID=instanceUUID, serviceName=serviceName) + debugLogger = CommonLogger(logcfg, "debug", style=CommonLogger.DebugFile, instanceUUID=instanceUUID, serviceName=serviceName) + auditLogger = CommonLogger(logcfg, "audit", style=CommonLogger.AuditFile, instanceUUID=instanceUUID, serviceName=serviceName) + metricsLogger = CommonLogger(logcfg, "metrics", style=CommonLogger.MetricsFile, instanceUUID=instanceUUID, serviceName=serviceName) + + testsRun = 0 + errorCount = 0 + errorLogger.debug("error calling debug (should not appear)") + errorLogger.info("error calling info (should not appear)") + errorLogger.warn("error calling warn (single time)") + errorLogger.error("error calling error (single time)") + errorLogger.setStartRecordEvent() + time.sleep(1) + errorLogger.fatal("error calling fatal, after setStartRecordEvent and sleep (start should be ignored, single time)") + testsRun += 6 + errorCount += __checkLog(errorLog, 3, 10) + + auditLogger.debug("audit calling debug (should not appear)") + auditLogger.info("audit calling info (time should be the same)") + auditLogger.warn("audit calling warn (time should be the same)") + auditLogger.error("audit calling error (time should be the same)") + bt = auditLogger.getStartRecordEvent() + # print("bt=%s" % bt) + time.sleep(1) + auditLogger.setStartRecordEvent() + time.sleep(1) + auditLogger.fatal("audit calling fatal, after setStartRecordEvent and sleep, time should be different)") + time.sleep(1) + auditLogger.fatal("audit calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) + testsRun += 7 + errorCount += __checkLog(auditLog, 5, 25) + + debugLogger.debug("debug calling debug (single time)") + debugLogger.info("debug calling info (single time)") + debugLogger.warn("debug calling warn (single time)") + debugLogger.setStartRecordEvent() + time.sleep(1) + debugLogger.error("debug calling error, after SetStartRecordEvent and sleep (start should be ignored, single time)") + debugLogger.fatal("debug calling fatal (single time)") + errorCount += __checkLog(debugLog, 5, 13) + testsRun += 6 + + metricsLogger.debug("metrics calling debug (should not appear)") + metricsLogger.info("metrics calling info (time should be the same)") + metricsLogger.warn("metrics calling warn (time should be the same)") + bt = metricsLogger.getStartRecordEvent() + time.sleep(1) + metricsLogger.setStartRecordEvent() + time.sleep(1) + metricsLogger.error("metrics calling error, after SetStartRecordEvent and sleep, time should be different") + metricsLogger.fatal("metrics calling fatal (time should be the same)") + time.sleep(1) + metricsLogger.fatal("metrics calling fatal, begTime should be %s" % bt['begTime'], begTime=bt) + testsRun += 6 + errorCount += __checkLog(metricsLog, 5, 28) + + print("%d tests run, %d errors found" % (testsRun, errorCount)) diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index a007a26..69ddb86 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -225,6 +225,12 @@ class _Audit(object): if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: self.max_http_status_code = max(http_status_code, self.max_http_status_code) + def reset_http_status_not_found(self): + """resets the highest(worst) http status code if data not found""" + with self._lock: + if self.max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value: + self.max_http_status_code = 0 + def get_max_http_status_code(self): """returns the highest(worst) http status code""" with self._lock: @@ -244,31 +250,41 @@ class _Audit(object): == AuditResponseCode.get_response_code(status_code).value or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value) - def _get_response_status(self): + def _get_response_status(self, not_found_ok=None): """calculates the response status fields from max_http_status_code""" max_http_status_code = self.get_max_http_status_code() response_code = AuditResponseCode.get_response_code(max_http_status_code) - success = (response_code.value == AuditResponseCode.SUCCESS.value) + success = ((response_code.value == AuditResponseCode.SUCCESS.value) + or (not_found_ok + and max_http_status_code == AuditHttpCode.DATA_NOT_FOUND_ERROR.value)) response_description = AuditResponseCode.get_human_text(response_code) return success, max_http_status_code, response_code, response_description def is_success(self): - """returns whether the response_code is success and a human text for response code""" + """returns whether the response_code is success""" success, _, _, _ = self._get_response_status() return success + def is_success_or_not_found(self): + """returns whether the response_code is success or 404 - not found""" + success, _, _, _ = self._get_response_status(not_found_ok=True) + return success + def debug(self, log_line, **kwargs): """debug - the debug=lowest level of logging""" _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs)) + return log_line def info(self, log_line, **kwargs): """debug - the info level of logging""" _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs)) + return log_line def info_requested(self, result=None, **kwargs): """info "requested ..." - the info level of logging""" - self.info("requested {0} {1}".format(self.req_message, result or ""), \ - **self.merge_all_kwargs(**kwargs)) + log_line = "requested {0} {1}".format(self.req_message, result or "") + self.info(log_line, **self.merge_all_kwargs(**kwargs)) + return log_line def warn(self, log_line, error_code=None, **kwargs): """debug+error - the warn level of logging""" @@ -280,6 +296,7 @@ class _Audit(object): _Audit._logger_debug.warn(log_line, **all_kwargs) _Audit._logger_error.warn(log_line, **all_kwargs) + return log_line def error(self, log_line, error_code=None, **kwargs): """debug+error - the error level of logging""" @@ -291,6 +308,7 @@ class _Audit(object): _Audit._logger_debug.error(log_line, **all_kwargs) _Audit._logger_error.error(log_line, **all_kwargs) + return log_line def fatal(self, log_line, error_code=None, **kwargs): """debug+error - the fatal level of logging""" @@ -302,6 +320,7 @@ class _Audit(object): _Audit._logger_debug.fatal(log_line, **all_kwargs) _Audit._logger_error.fatal(log_line, **all_kwargs) + return log_line @staticmethod def hide_secrets(obj): diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 90ede47..51ac173 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -29,6 +29,10 @@ AUTO_CATCH_UP = "auto catch_up" LATEST_POLICIES = "latest_policies" REMOVED_POLICIES = "removed_policies" ERRORED_POLICIES = "errored_policies" -ERRORED_SCOPES = "errored_scopes" -SCOPE_PREFIXES = "scope_prefixes" POLICY_FILTER = "policy_filter" +POLICY_FILTERS = "policy_filters" +POLICIES = "policies" +POLICY_VERSIONS = "policy_versions" +MATCHING_CONDITIONS = "matchingConditions" +POLICY_NAMES = "policy_names" +POLICY_FILTER_MATCHES = "policy_filter_matches" diff --git a/policyhandler/policy_matcher.py b/policyhandler/policy_matcher.py new file mode 100644 index 0000000..8406f14 --- /dev/null +++ b/policyhandler/policy_matcher.py @@ -0,0 +1,236 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""policy-matcher matches the policies from deployment-handler to policies from policy-engine""" + +import json +import logging +import re + +from .deploy_handler import DeployHandler, PolicyUpdateMessage +from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, + MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER, + POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS) +from .policy_rest import PolicyRest +from .policy_utils import RegexCoarser + + +class PolicyMatcher(object): + """policy-matcher - static class""" + _logger = logging.getLogger("policy_handler.policy_matcher") + PENDING_UPDATE = "pending_update" + + @staticmethod + def get_latest_policies(audit): + """ + find the latest policies from policy-engine for the deployed policies and policy-filters + """ + deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) + + if not audit.is_success() or (not deployed_policies and not deployed_policy_filters): + error_txt = "failed to retrieve policies from deployment-handler" + PolicyMatcher._logger.error(error_txt) + return {"error": error_txt}, None + + coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns( + audit, deployed_policies, deployed_policy_filters) + + if not coarse_regex_patterns: + error_txt = ("failed to construct the coarse_regex_patterns from " + + "deployed_policies: {} and deployed_policy_filters: {}" + .format(deployed_policies, deployed_policy_filters)) + PolicyMatcher._logger.error(error_txt) + return {"error": error_txt}, None + + pdp_response = PolicyRest.get_latest_policies( + audit, policy_filters=[{POLICY_NAME: policy_name_pattern} + for policy_name_pattern in coarse_regex_patterns] + ) + + if not audit.is_success_or_not_found(): + error_txt = "failed to retrieve policies from policy-engine" + PolicyMatcher._logger.error(error_txt) + return {"error": error_txt}, None + + latest_policies = pdp_response.get(LATEST_POLICIES, {}) + errored_policies = pdp_response.get(ERRORED_POLICIES, {}) + + removed_policies = dict( + (policy_id, True) + for (policy_id, deployed_policy) in deployed_policies.items() + if deployed_policy.get(POLICY_VERSIONS) + and policy_id not in latest_policies + and policy_id not in errored_policies + ) + + latest_policies, changed_policies, policy_filter_matches = PolicyMatcher.match_policies( + audit, latest_policies, deployed_policies, deployed_policy_filters) + errored_policies, _, _ = PolicyMatcher.match_policies( + audit, errored_policies, deployed_policies, deployed_policy_filters) + + return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies}, + PolicyUpdateMessage(changed_policies, + removed_policies, + policy_filter_matches)) + + @staticmethod + def calc_coarse_patterns(audit, deployed_policies, deployed_policy_filters): + """calculate the coarsed patterns on policy-names in policies and policy-filters""" + coarse_regex = RegexCoarser() + for policy_id in deployed_policies or {}: + coarse_regex.add_regex_pattern(policy_id) + + for policy_filter in (deployed_policy_filters or {}).values(): + policy_name_pattern = policy_filter.get(POLICY_FILTER, {}).get(POLICY_NAME) + coarse_regex.add_regex_pattern(policy_name_pattern) + + coarse_regex_patterns = coarse_regex.get_coarse_regex_patterns() + PolicyMatcher._logger.debug( + audit.debug("coarse_regex_patterns({}) combined_regex_pattern({}) for patterns({})" + .format(coarse_regex_patterns, + coarse_regex.get_combined_regex_pattern(), + coarse_regex.patterns))) + return coarse_regex_patterns + + @staticmethod + def match_to_deployed_policies(audit, policies_updated, policies_removed): + """match the policies_updated, policies_removed versus deployed policies""" + deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies( + audit) + if not audit.is_success(): + return {}, {}, {} + + _, changed_policies, policy_filter_matches = PolicyMatcher.match_policies( + audit, policies_updated, deployed_policies, deployed_policy_filters) + policies_removed, _, _ = PolicyMatcher.match_policies( + audit, policies_removed, deployed_policies, deployed_policy_filters) + + return changed_policies, policies_removed, policy_filter_matches + + @staticmethod + def match_policies(audit, policies, deployed_policies, deployed_policy_filters): + """ + Match policies to deployed policies either by policy_id or the policy-filters. + + Also calculates the policies that changed in comparison to deployed policies + """ + matching_policies = {} + changed_policies = {} + policy_filter_matches = {} + + policies = policies or {} + deployed_policies = deployed_policies or {} + deployed_policy_filters = deployed_policy_filters or {} + + for (policy_id, policy) in policies.items(): + new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + deployed_policy = deployed_policies.get(policy_id) + + if deployed_policy: + matching_policies[policy_id] = policy + + policy_changed = (deployed_policy and new_version + and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE) + or {new_version} ^ + deployed_policy.get(POLICY_VERSIONS, {}).keys())) + if policy_changed: + changed_policies[policy_id] = policy + + in_filters = False + for (policy_filter_id, policy_filter) in deployed_policy_filters.items(): + if not PolicyMatcher.match_policy_to_filter( + audit, policy_id, policy, + policy_filter_id, policy_filter.get(POLICY_FILTER)): + continue + + if policy_changed or not deployed_policy: + in_filters = True + if policy_id not in policy_filter_matches: + policy_filter_matches[policy_id] = {} + policy_filter_matches[policy_id][policy_filter_id] = True + + if not deployed_policy and in_filters: + matching_policies[policy_id] = policy + changed_policies[policy_id] = policy + + return matching_policies, changed_policies, policy_filter_matches + + @staticmethod + def match_policy_to_filter(audit, policy_id, policy, policy_filter_id, policy_filter): + """Match the policy to the policy-filter""" + if not policy_id or not policy or not policy_filter or not policy_filter_id: + return False + + filter_policy_name = policy_filter.get(POLICY_NAME) + if not filter_policy_name: + return False + + policy_body = policy.get(POLICY_BODY) + if not policy_body: + return False + + policy_name = policy_body.get(POLICY_NAME) + if not policy_name: + return False + + log_line = "policy {} to filter id {}: {}".format(json.dumps(policy_body), + policy_filter_id, + json.dumps(policy_filter)) + # PolicyMatcher._logger.debug(audit.debug("matching {}...".format(log_line))) + + if (filter_policy_name != policy_id and filter_policy_name != policy_name + and not re.match(filter_policy_name, policy_name)): + PolicyMatcher._logger.debug( + audit.debug("not match by policyName: {} != {}: {}" + .format(policy_name, filter_policy_name, log_line))) + return False + + matching_conditions = policy_body.get(MATCHING_CONDITIONS, {}) + if not isinstance(matching_conditions, dict): + return False + + filter_onap_name = policy_filter.get("onapName") + policy_onap_name = matching_conditions.get("ONAPName") + if filter_onap_name and filter_onap_name != policy_onap_name: + PolicyMatcher._logger.debug( + audit.debug("not match by ONAPName: {} != {}: {}" + .format(policy_onap_name, filter_onap_name, log_line))) + return False + + filter_config_name = policy_filter.get("configName") + policy_config_name = matching_conditions.get("ConfigName") + if filter_onap_name and filter_onap_name != policy_onap_name: + PolicyMatcher._logger.debug( + audit.debug("not match by configName: {} != {}: {}" + .format(policy_config_name, filter_config_name, log_line))) + return False + + filter_config_attributes = policy_filter.get("configAttributes") + if filter_config_attributes and isinstance(filter_config_attributes, dict): + for filter_key, filter_config_attribute in filter_config_attributes.items(): + if (filter_key not in matching_conditions + or filter_config_attribute != matching_conditions.get(filter_key)): + PolicyMatcher._logger.debug( + audit.debug("not match by configAttributes: {} != {}: {}" + .format(json.dumps(matching_conditions), + json.dumps(filter_config_attributes), + log_line))) + return False + + PolicyMatcher._logger.debug(audit.debug("matched {}".format(log_line))) + return True diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index e1584a3..bb33cd5 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -22,26 +22,24 @@ thru web-socket to receive push notifications on updates and removal of policies. on receiving the policy-notifications, the policy-receiver -filters them out by the policy scope(s) provided in policy-handler config -and passes the notifications to policy-updater +passes the notifications to policy-updater """ import json import logging -import re import time from threading import Lock, Thread import websocket from .config import Config -from .onap.audit import Audit, AuditHttpCode, AuditResponseCode +from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater LOADED_POLICIES = 'loadedPolicies' REMOVED_POLICIES = 'removedPolicies' -POLICY_NAME = 'policyName' POLICY_VER = 'versionNo' +POLICY_MATCHES = 'matches' class _PolicyReceiver(Thread): """web-socket to PolicyEngine""" @@ -65,10 +63,6 @@ class _PolicyReceiver(Thread): self._web_socket = None - scope_prefixes = [scope_prefix.replace(".", "[.]") - for scope_prefix in Config.settings["scope_prefixes"]] - self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")") - _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern) self._policy_updater = PolicyUpdater() self._policy_updater.start() @@ -126,31 +120,29 @@ class _PolicyReceiver(Thread): json.dumps(message)) return - policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(LOADED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] - policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER)) - for policy in message.get(REMOVED_POLICIES, []) - if self._policy_scopes.match(policy.get(POLICY_NAME))] + policies_updated = [ + {POLICY_NAME: policy.get(POLICY_NAME), + POLICY_VERSION: policy.get(POLICY_VER), + MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})} + for policy in message.get(LOADED_POLICIES, []) + ] + + policies_removed = [ + {POLICY_NAME: removed_policy.get(POLICY_NAME), + POLICY_VERSION: removed_policy.get(POLICY_VER)} + for removed_policy in message.get(REMOVED_POLICIES, []) + ] if not policies_updated and not policies_removed: - _PolicyReceiver._logger.info("no policy updated or removed for scopes %s", - self._policy_scopes.pattern) + _PolicyReceiver._logger.info("no policy updated or removed") return - audit = Audit(job_name="policy_update", - req_message="policy-notification - updated[{0}], removed[{1}]" - .format(len(policies_updated), len(policies_removed)), - retry_get_config=True) - self._policy_updater.enqueue(audit, policies_updated, policies_removed) + self._policy_updater.policy_update(policies_updated, policies_removed) except Exception as ex: error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), "on_pdp_message", json.dumps(message)) _PolicyReceiver._logger.exception(error_msg) - audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - def _on_ws_error(self, _, error): """report an error""" diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index c8018f6..39c26a5 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -29,9 +29,9 @@ import requests from .config import Config from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) -from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES, - POLICY_BODY, POLICY_CONFIG, POLICY_FILTER, - POLICY_ID, POLICY_NAME, SCOPE_PREFIXES) +from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS, + POLICY_ID, POLICY_NAME) from .policy_utils import PolicyUtils @@ -56,8 +56,6 @@ class PolicyRest(object): _headers = None _target_entity = None _thread_pool_size = 4 - _scope_prefixes = None - _scope_thread_pool_size = 4 _policy_retry_count = 1 _policy_retry_sleep = 0 @@ -70,7 +68,7 @@ class PolicyRest(object): config = Config.settings[Config.FIELD_POLICY_ENGINE] - pool_size = config.get("pool_connections", 20) + pool_size = Config.settings.get("pool_connections", 20) PolicyRest._requests_session = requests.Session() PolicyRest._requests_session.mount( 'https://', @@ -81,24 +79,20 @@ class PolicyRest(object): requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) - PolicyRest._url_get_config = config["url"] \ - + config["path_api"] + PolicyRest.POLICY_GET_CONFIG + PolicyRest._url_get_config = (config["url"] + config["path_api"] + + PolicyRest.POLICY_GET_CONFIG) PolicyRest._headers = config["headers"] PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4) if PolicyRest._thread_pool_size < 2: PolicyRest._thread_pool_size = 2 - PolicyRest._scope_prefixes = Config.settings["scope_prefixes"] - PolicyRest._scope_thread_pool_size = min(PolicyRest._thread_pool_size, \ - len(PolicyRest._scope_prefixes)) PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1 PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0) PolicyRest._logger.info( - "PolicyClient url(%s) headers(%s) scope-prefixes(%s)", - PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers), - json.dumps(PolicyRest._scope_prefixes)) + "PolicyClient url(%s) headers(%s)", + PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers)) @staticmethod def _pdp_get_config(audit, json_body): @@ -180,8 +174,8 @@ class PolicyRest(object): rslt = res_data[0] if (rslt and not rslt.get(POLICY_NAME) - and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND - and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): + and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND + and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND): status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value info_msg = "not found {0}".format(log_line) @@ -281,22 +275,22 @@ class PolicyRest(object): expect_policy_removed): """single attempt to get the latest policy for the policy_id from the policy-engine""" - status_code, policy_configs = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) + status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) - PolicyRest._logger.debug("%s %s policy_configs: %s", - status_code, policy_id, json.dumps(policy_configs or [])) + PolicyRest._logger.debug("%s %s policy_bodies: %s", + status_code, policy_id, json.dumps(policy_bodies or [])) latest_policy = PolicyUtils.select_latest_policy( - policy_configs, min_version_expected, ignore_policy_names + policy_bodies, min_version_expected, ignore_policy_names ) if not latest_policy and not expect_policy_removed: audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" - .format(policy_id, json.dumps(policy_configs or [])), + .format(policy_id, json.dumps(policy_bodies or [])), error_code=AuditResponseCode.DATA_ERROR) done = bool(latest_policy - or (expect_policy_removed and not policy_configs) + or (expect_policy_removed and not policy_bodies) or audit.is_serious_error(status_code)) return done, latest_policy, status_code @@ -330,18 +324,17 @@ class PolicyRest(object): def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): """Get the latest policies of the list of policy_names from the policy-engine""" PolicyRest._lazy_init() - metrics = Metrics( + metrics_total = Metrics( aud_parent=audit, targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), targetServiceName=PolicyRest._url_get_config) - metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) PolicyRest._logger.debug(str_metrics) policies_to_find = {} - for (policy_name, policy_version) in policies_updated: - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id or not policy_version.isdigit(): + for (policy_id, policy_version) in policies_updated: + if not policy_id or not policy_version or not policy_version.isdigit(): continue policy = policies_to_find.get(policy_id) if not policy: @@ -354,18 +347,17 @@ class PolicyRest(object): if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version): policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version) - for (policy_name, _) in policies_removed: - policy_id = PolicyUtils.extract_policy_id(policy_name) + for (policy_id, policy_names) in policies_removed: if not policy_id: continue policy = policies_to_find.get(policy_id) if not policy: policies_to_find[policy_id] = { POLICY_ID: policy_id, - PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True} + PolicyRest.IGNORE_POLICY_NAMES: policy_names } continue - policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True + policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names) apns = [(audit, policy_id, policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED), @@ -382,8 +374,8 @@ class PolicyRest(object): pool.close() pool.join() - metrics.metrics("result get_latest_updated_policies {0}: {1} {2}" - .format(str_metrics, len(policies), json.dumps(policies))) + metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}" + .format(str_metrics, len(policies), json.dumps(policies))) updated_policies = dict((policy[POLICY_ID], policy) for policy in policies @@ -416,36 +408,26 @@ class PolicyRest(object): @staticmethod def _get_latest_policies(aud_policy_filter): - """ - get the latest policies by policy_filter - or all the latest policies of the same scope from the policy-engine - """ - audit, policy_filter, scope_prefix = aud_policy_filter + """get the latest policies by policy_filter from the policy-engine""" + audit, policy_filter = aud_policy_filter try: str_policy_filter = json.dumps(policy_filter) PolicyRest._logger.debug("%s", str_policy_filter) - status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter) + status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter) - PolicyRest._logger.debug("%s policy_configs: %s %s", status_code, - str_policy_filter, json.dumps(policy_configs or [])) + PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code, + str_policy_filter, json.dumps(policy_bodies or [])) - latest_policies = PolicyUtils.select_latest_policies(policy_configs) - - if (scope_prefix and not policy_configs - and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value): - audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), - error_code=AuditResponseCode.DATA_ERROR) - return None, latest_policies, scope_prefix + latest_policies = PolicyUtils.select_latest_policies(policy_bodies) if not latest_policies: - if not scope_prefix: - audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) - audit.warn( - "received no policies from PDP for policy_filter {0}: {1}" - .format(str_policy_filter, json.dumps(policy_configs or [])), - error_code=AuditResponseCode.DATA_ERROR) - return None, latest_policies, None + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.warn( + "received no policies from PDP for policy_filter {0}: {1}" + .format(str_policy_filter, json.dumps(policy_bodies or [])), + error_code=AuditResponseCode.DATA_ERROR) + return None, latest_policies audit.set_http_status_code(status_code) valid_policies = {} @@ -455,22 +437,22 @@ class PolicyRest(object): valid_policies[policy_id] = policy else: errored_policies[policy_id] = policy - return valid_policies, errored_policies, None + return valid_policies, errored_policies except Exception as ex: - error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})" + error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})" .format(audit.request_id, type(ex).__name__, str(ex), - "_get_latest_policies", json.dumps(policy_filter), scope_prefix)) + "_get_latest_policies", json.dumps(policy_filter))) PolicyRest._logger.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None, None, scope_prefix + return None, None @staticmethod - def get_latest_policies(audit, policy_filter=None): - """Get the latest policies of the same scope from the policy-engine""" + def get_latest_policies(audit, policy_filter=None, policy_filters=None): + """Get the latest policies by policy-filter(s) from the policy-engine""" result = {} aud_policy_filters = None str_policy_filters = None @@ -479,51 +461,53 @@ class PolicyRest(object): try: PolicyRest._lazy_init() - if policy_filter is not None: - aud_policy_filters = [(audit, policy_filter, None)] + if policy_filter: + aud_policy_filters = [(audit, policy_filter)] str_policy_filters = json.dumps(policy_filter) str_metrics = "get_latest_policies for policy_filter {0}".format( str_policy_filters) target_entity = ("{0} total get_latest_policies by policy_filter" .format(PolicyRest._target_entity)) result[POLICY_FILTER] = copy.deepcopy(policy_filter) - else: - aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) - for scope_prefix in PolicyRest._scope_prefixes] - str_policy_filters = json.dumps(PolicyRest._scope_prefixes) - str_metrics = "get_latest_policies for scopes {0} {1}".format( \ - len(PolicyRest._scope_prefixes), str_policy_filters) - target_entity = ("{0} total get_latest_policies by scope_prefixes" + elif policy_filters: + aud_policy_filters = [ + (audit, policy_filter) + for policy_filter in policy_filters + ] + str_policy_filters = json.dumps(policy_filters) + str_metrics = "get_latest_policies for policy_filters {0}".format( + str_policy_filters) + target_entity = ("{0} total get_latest_policies by policy_filters" .format(PolicyRest._target_entity)) - result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) + result[POLICY_FILTERS] = copy.deepcopy(policy_filters) + else: + return result PolicyRest._logger.debug("%s", str_policy_filters) - metrics = Metrics(aud_parent=audit, targetEntity=target_entity, - targetServiceName=PolicyRest._url_get_config) + metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity, + targetServiceName=PolicyRest._url_get_config) - metrics.metrics_start(str_metrics) + metrics_total.metrics_start(str_metrics) latest_policies = None apfs_length = len(aud_policy_filters) if apfs_length == 1: latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])] else: - pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length)) + pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length)) latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters) pool.close() pool.join() - metrics.metrics("total result {0}: {1} {2}".format( + metrics_total.metrics("total result {0}: {1} {2}".format( str_metrics, len(latest_policies), json.dumps(latest_policies))) - # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + # latest_policies == [(valid_policies, errored_policies), ...] result[LATEST_POLICIES] = dict( - pair for (vps, _, _) in latest_policies if vps for pair in vps.items()) + pair for (vps, _) in latest_policies if vps for pair in vps.items()) result[ERRORED_POLICIES] = dict( - pair for (_, eps, _) in latest_policies if eps for pair in eps.items()) - - result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) + pair for (_, eps) in latest_policies if eps for pair in eps.items()) PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", str_policy_filters, json.dumps(result)) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 5ba7c29..3ae8199 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -18,24 +18,101 @@ """policy-updater thread""" -import copy import json import logging -from queue import Queue -from threading import Lock, Thread +from threading import Event, Lock, Thread from .config import Config -from .deploy_handler import DeployHandler +from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode -from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, - REMOVED_POLICIES) +from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, + POLICY_NAME, POLICY_NAMES, POLICY_VERSION) +from .policy_matcher import PolicyMatcher from .policy_rest import PolicyRest -from .policy_utils import Utils +from .policy_utils import PolicyUtils from .step_timer import StepTimer +class _PolicyUpdate(object): + """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" + _logger = logging.getLogger("policy_handler.policy_update") + + def __init__(self): + """init and reset""" + self._audit = None + self._policies_updated = {} + self._policies_removed = {} + + def reset(self): + """resets the state""" + self.__init__() + + def pop_policy_update(self): + """ + Returns the consolidated (audit, policies_updated, policies_removed) + and resets the state + """ + if not self._audit: + return None, None, None + + audit = self._audit + policies_updated = self._policies_updated + policies_removed = self._policies_removed + + self.reset() + + return audit, policies_updated, policies_removed + + + def push_policy_update(self, policies_updated, policies_removed): + """consolidate the new policies_updated, policies_removed to existing ones""" + for policy_body in policies_updated: + policy_name = policy_body.get(POLICY_NAME) + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + + self._policies_updated[policy_id] = policy + + rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) + if rm_policy_names and policy_name in rm_policy_names: + del rm_policy_names[policy_name] + + for policy_body in policies_removed: + policy_name = policy_body.get(POLICY_NAME) + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + + if policy_id in self._policies_removed: + policy = self._policies_removed[policy_id] + + if POLICY_NAMES not in policy: + policy[POLICY_NAMES] = {} + policy[POLICY_NAMES][policy_name] = True + self._policies_removed[policy_id] = policy + + req_message = ("policy-update notification - updated[{0}], removed[{1}]" + .format(len(self._policies_updated), + len(self._policies_removed))) + + if not self._audit: + self._audit = Audit(job_name="policy_update", + req_message=req_message, + retry_get_config=True) + else: + self._audit.req_message = req_message + + self._logger.info( + "pending request_id %s for %s policies_updated %s policies_removed %s", + self._audit.request_id, req_message, + json.dumps(policies_updated), json.dumps(policies_removed)) + + class PolicyUpdater(Thread): - """queue and handle the policy-updates in a separate thread""" + """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" _logger = logging.getLogger("policy_handler.policy_updater") def __init__(self): @@ -43,33 +120,22 @@ class PolicyUpdater(Thread): Thread.__init__(self, name="policy_updater") self.daemon = True + self._lock = Lock() + self._run = Event() + self._catch_up_timer = None self._aud_shutdown = None self._aud_catch_up = None + self._policy_update = _PolicyUpdate() catch_up_config = Config.settings.get(CATCH_UP, {}) self._catch_up_interval = catch_up_config.get("interval") or 15*60 - self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 - self._catch_up_skips = 0 - self._catch_up_prev_message = None - - self._lock = Lock() - self._queue = Queue() - - def enqueue(self, audit=None, policies_updated=None, policies_removed=None): + def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" - policies_updated = policies_updated or [] - policies_removed = policies_removed or [] - - PolicyUpdater._logger.info( - "enqueue request_id %s policies_updated %s policies_removed %s", - ((audit and audit.request_id) or "none"), - json.dumps(policies_updated), json.dumps(policies_removed)) - with self._lock: - self._queue.put((audit, policies_updated, policies_removed)) - + self._policy_update.push_policy_update(policies_updated, policies_removed) + self._run.set() def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" @@ -80,30 +146,24 @@ class PolicyUpdater(Thread): "catch_up %s request_id %s", self._aud_catch_up.req_message, self._aud_catch_up.request_id ) - - self.enqueue() - + self._run.set() def run(self): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - queued_audit, policies_updated, policies_removed = self._queue.get() - PolicyUpdater._logger.info( - "got request_id %s policies_updated %s policies_removed %s", - ((queued_audit and queued_audit.request_id) or "none"), - json.dumps(policies_updated), json.dumps(policies_removed)) + self._run.wait() + + with self._lock: + self._run.clear() if not self._keep_running(): break if self._on_catch_up(): - self._reset_queue() - continue - elif not queued_audit: continue - self._on_policies_update(queued_audit, policies_updated, policies_removed) + self._on_policy_update() PolicyUpdater._logger.info("exit policy-updater") @@ -151,44 +211,13 @@ class PolicyUpdater(Thread): self._catch_up_timer = None self._logger.info("stopped catch_up_timer") - def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): - """try not to send the duplicate messages on auto catchup unless hitting the max count""" - if aud_catch_up.req_message != AUTO_CATCH_UP \ - or self._catch_up_skips >= self._catch_up_max_skips \ - or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): - self._catch_up_skips = 0 - self._catch_up_prev_message = copy.deepcopy(catch_up_message) - log_line = "going to send the catch_up {0}: {1}".format( - aud_catch_up.req_message, - json.dumps(self._catch_up_prev_message) - ) - self._logger.info(log_line) - aud_catch_up.info(log_line) - return True - - self._catch_up_skips += 1 - self._catch_up_prev_message = copy.deepcopy(catch_up_message) - log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format( - self._catch_up_skips, self._catch_up_max_skips, - aud_catch_up.req_message, json.dumps(self._catch_up_prev_message) - ) - self._logger.info(log_line) - aud_catch_up.info(log_line) - return False - - def _reset_queue(self): - """clear up the queue""" - with self._lock: - if not self._aud_catch_up and not self._aud_shutdown: - with self._queue.mutex: - self._queue.queue.clear() - def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: aud_catch_up = self._aud_catch_up if self._aud_catch_up: self._aud_catch_up = None + self._policy_update.reset() if not aud_catch_up: return False @@ -196,20 +225,20 @@ class PolicyUpdater(Thread): log_line = "catch_up {0} request_id {1}".format( aud_catch_up.req_message, aud_catch_up.request_id ) + catch_up_result = "" try: PolicyUpdater._logger.info(log_line) self._pause_catch_up_timer() - catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) - catch_up_message[CATCH_UP] = True + _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up) - catch_up_result = "" - if not aud_catch_up.is_success(): + if not catch_up_message or not aud_catch_up.is_success_or_not_found(): catch_up_result = "- not sending catch-up to deployment-handler due to errors" PolicyUpdater._logger.warning(catch_up_result) - elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message): - catch_up_result = "- skipped sending the same policies" + elif catch_up_message.empty(): + catch_up_result = "- not sending empty catch-up to deployment-handler" else: + aud_catch_up.reset_http_status_not_found() DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" @@ -229,9 +258,6 @@ class PolicyUpdater(Thread): aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) success = False - if not success: - self._catch_up_prev_message = None - self._run_catch_up_timer() PolicyUpdater._logger.info("policy_handler health: %s", @@ -240,49 +266,73 @@ class PolicyUpdater(Thread): return success - def _on_policies_update(self, queued_audit, policies_updated, policies_removed): - """handle the event of policy-updates from the queue""" - deployment_handler_changed = None + def _on_policy_update(self): + """handle the event of policy-updates""" result = "" + with self._lock: + audit, policies_updated, policies_removed = self._policy_update.pop_policy_update() + + if not audit: + return log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( - ((queued_audit and queued_audit.request_id) or "none"), - json.dumps(policies_updated), json.dumps(policies_removed)) + audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed)) + PolicyUpdater._logger.info(log_line) try: - updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (queued_audit, policies_updated, policies_removed)) - - if not queued_audit.is_success(): + (updated_policies, removed_policies, + policy_filter_matches) = PolicyMatcher.match_to_deployed_policies( + audit, policies_updated, policies_removed) + + if updated_policies or removed_policies: + updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( + (audit, + [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) + for policy_id, policy in updated_policies.items()], + [(policy_id, policy.get(POLICY_NAMES, {})) + for policy_id, policy in removed_policies.items()] + )) + + if not audit.is_success_or_not_found(): result = "- not sending policy-updates to deployment-handler due to errors" PolicyUpdater._logger.warning(result) + elif not updated_policies and not removed_policies: + result = "- not sending empty policy-updates to deployment-handler" + PolicyUpdater._logger.warning(result) else: - message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) - if not queued_audit.is_success(): - result = "- failed to send policy-updates to deployment-handler" + message = PolicyUpdateMessage(updated_policies, removed_policies, + policy_filter_matches, False) + log_updates = ("policies-updated[{0}], removed[{1}]" + .format(len(updated_policies), len(removed_policies))) + + audit.reset_http_status_not_found() + DeployHandler.policy_update(audit, message) + + if not audit.is_success(): + result = "- failed to send to deployment-handler {}".format(log_updates) PolicyUpdater._logger.warning(result) else: - result = "- sent policy-updates to deployment-handler" + result = "- sent to deployment-handler {}".format(log_updates) + log_line = "request_id: {} updated_policies: {} removed_policies: {}".format( + audit.request_id, + json.dumps(updated_policies), json.dumps(removed_policies)) - success, _, _ = queued_audit.audit_done(result=result) + audit.audit_done(result=result) + PolicyUpdater._logger.info(log_line + " " + result) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" - .format(queued_audit.request_id, type(ex).__name__, str(ex), + .format(audit.request_id, type(ex).__name__, str(ex), "on_policies_update", log_line + " " + result)) PolicyUpdater._logger.exception(error_msg) - queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) - queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - success = False + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - if deployment_handler_changed: - self._catch_up_prev_message = None + if DeployHandler.server_instance_changed: + DeployHandler.server_instance_changed = False self._pause_catch_up_timer() self.catch_up() - elif not success: - self._catch_up_prev_message = None def shutdown(self, audit): @@ -290,7 +340,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.info("shutdown policy-updater") with self._lock: self._aud_shutdown = audit - self.enqueue() + self._run.set() self._stop_catch_up_timer() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index c2a8b07..c63f382 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -16,15 +16,18 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -"""policy-client communicates with policy-engine thru REST API""" +"""utils for policy usage and conversions""" import json import logging import re +from copy import deepcopy +from typing import Pattern from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, POLICY_VERSION) + class PolicyUtils(object): """policy-client utils""" _logger = logging.getLogger("policy_handler.policy_utils") @@ -59,30 +62,30 @@ class PolicyUtils(object): return policy @staticmethod - def convert_to_policy(policy_config): - """wrap policy_config received from policy-engine with policy_id.""" - if not policy_config: - return - policy_name = policy_config.get(POLICY_NAME) - policy_version = policy_config.get(POLICY_VERSION) + def convert_to_policy(policy_body): + """wrap policy_body received from policy-engine with policy_id.""" + if not policy_body: + return None + policy_name = policy_body.get(POLICY_NAME) + policy_version = policy_body.get(POLICY_VERSION) if not policy_name or not policy_version: - return + return None policy_id = PolicyUtils.extract_policy_id(policy_name) if not policy_id: - return - return {POLICY_ID:policy_id, POLICY_BODY:policy_config} + return None + return {POLICY_ID:policy_id, POLICY_BODY:policy_body} @staticmethod - def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None): - """For some reason, the policy-engine returns all version of the policy_configs. + def select_latest_policy(policy_bodies, min_version_expected=None, ignore_policy_names=None): + """For some reason, the policy-engine returns all version of the policy_bodies. DCAE-Controller is only interested in the latest version """ - if not policy_configs: + if not policy_bodies: return - latest_policy_config = {} - for policy_config in policy_configs: - policy_name = policy_config.get(POLICY_NAME) - policy_version = policy_config.get(POLICY_VERSION) + latest_policy_body = {} + for policy_body in policy_bodies: + policy_name = policy_body.get(POLICY_NAME) + policy_version = policy_body.get(POLICY_VERSION) if not policy_name or not policy_version or not policy_version.isdigit(): continue policy_version = int(policy_version) @@ -91,30 +94,30 @@ class PolicyUtils(object): if ignore_policy_names and policy_name in ignore_policy_names: continue - if not latest_policy_config \ - or int(latest_policy_config[POLICY_VERSION]) < policy_version: - latest_policy_config = policy_config + if (not latest_policy_body + or int(latest_policy_body[POLICY_VERSION]) < policy_version): + latest_policy_body = policy_body - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body)) @staticmethod - def select_latest_policies(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. + def select_latest_policies(policy_bodies): + """For some reason, the policy-engine returns all version of the policy_bodies. DCAE-Controller is only interested in the latest versions """ - if not policy_configs: + if not policy_bodies: return {} policies = {} - for policy_config in policy_configs: - policy = PolicyUtils.convert_to_policy(policy_config) + for policy_body in policy_bodies: + policy = PolicyUtils.convert_to_policy(policy_body) if not policy: continue policy_id = policy.get(POLICY_ID) policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) if not policy_id or not policy_version or not policy_version.isdigit(): continue - if policy_id not in policies \ - or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]): + if (policy_id not in policies + or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])): policies[policy_id] = policy for policy_id in policies: @@ -173,3 +176,204 @@ class Utils(object): if not the_same_values: Utils._logger.debug("values %s != %s", body_1, body_2) return the_same_values + +class RegexCoarser(object): + """ + utility to combine or coarse the collection of regex patterns + into a single regex that is at least not narrower (wider or the same) + than the collection regexes + + inspired by https://github.com/spadgos/regex-combiner in js + """ + ENDER = '***' + GROUPERS = {'{': '}', '[': ']', '(': ')'} + MODIFIERS = '*?+' + CHOICE_STARTER = '(' + HIDDEN_CHOICE_STARTER = '(?:' + ANY_CHARS = '.*' + LINE_START = '^' + + def __init__(self, regex_patterns=None): + """regex coarser""" + self.trie = {} + self.patterns = [] + self.add_regex_patterns(regex_patterns) + + def get_combined_regex_pattern(self): + """gets the pattern for the combined regex""" + trie = deepcopy(self.trie) + RegexCoarser._compress(trie) + return RegexCoarser._trie_to_pattern(trie) + + def get_coarse_regex_patterns(self, max_length=100): + """gets the patterns for the coarse regex""" + trie = deepcopy(self.trie) + RegexCoarser._compress(trie) + patterns = RegexCoarser._trie_to_pattern(trie, True) + + root_patterns = [] + for pattern in patterns: + left, _, choice = pattern.partition(RegexCoarser.CHOICE_STARTER) + if choice and left and left.strip() != RegexCoarser.LINE_START and not left.isspace(): + pattern = left + RegexCoarser.ANY_CHARS + root_patterns.append(pattern) + root_patterns = RegexCoarser._join_patterns(root_patterns, max_length) + + if not root_patterns or root_patterns == ['']: + return [] + return root_patterns + + + def add_regex_patterns(self, new_regex_patterns): + """adds the new_regex patterns to RegexPatternCoarser""" + if not new_regex_patterns or not isinstance(new_regex_patterns, list): + return + for new_regex_pattern in new_regex_patterns: + self.add_regex_pattern(new_regex_pattern) + + def add_regex_pattern(self, new_regex_pattern): + """adds the new_regex to RegexPatternCoarser""" + new_regex_pattern = RegexCoarser._regex_pattern_to_string(new_regex_pattern) + if not new_regex_pattern: + return + + self.patterns.append(new_regex_pattern) + + tokens = RegexCoarser._tokenize(new_regex_pattern) + last_token_idx = len(tokens) - 1 + trie_node = self.trie + for idx, token in enumerate(tokens): + if token not in trie_node: + trie_node[token] = {} + if idx == last_token_idx: + trie_node[token][RegexCoarser.ENDER] = {} + trie_node = trie_node[token] + + @staticmethod + def _regex_pattern_to_string(regex_pattern): + """convert regex pattern to string""" + if not regex_pattern: + return '' + + if isinstance(regex_pattern, str): + return regex_pattern + + if isinstance(regex_pattern, Pattern): + return regex_pattern.pattern + return None + + @staticmethod + def _tokenize(regex_pattern): + """tokenize the regex pattern for trie assignment""" + tokens = [] + token = '' + group_ender = None + use_next = False + + for char in regex_pattern: + if use_next: + use_next = False + token += char + char = None + + if char == '\\': + use_next = True + token += char + continue + + if not group_ender and char in RegexCoarser.GROUPERS: + group_ender = RegexCoarser.GROUPERS[char] + token = char + char = None + + if char is None: + pass + elif char == group_ender: + token += char + group_ender = None + if char == '}': # this group is a modifier + tokens[len(tokens) - 1] += token + token = '' + continue + elif char in RegexCoarser.MODIFIERS: + if group_ender: + token += char + else: + tokens[len(tokens) - 1] += char + continue + else: + token += char + + if not group_ender: + tokens.append(token) + token = '' + + if token: + tokens.append(token) + return tokens + + @staticmethod + def _compress(trie): + """compress trie into shortest leaves""" + for key, subtrie in trie.items(): + RegexCoarser._compress(subtrie) + subkeys = list(subtrie.keys()) + if len(subkeys) == 1: + trie[key + subkeys[0]] = subtrie[subkeys[0]] + del trie[key] + + @staticmethod + def _trie_to_pattern(trie, top_keep=False): + """convert trie to the regex pattern""" + patterns = [ + key.replace(RegexCoarser.ENDER, '') + RegexCoarser._trie_to_pattern(subtrie) + for key, subtrie in trie.items() + ] + + if top_keep: + return patterns + + return RegexCoarser._join_patterns(patterns)[0] + + @staticmethod + def _join_patterns(patterns, max_length=0): + """convert list of patterns to the segmented list of dense regex patterns""" + if not patterns: + return [''] + + if len(patterns) == 1: + return patterns + + if not max_length: + return [RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns) + ')'] + + long_patterns = [] + join_patterns = [] + for pattern in patterns: + len_pattern = len(pattern) + if not len_pattern: + continue + if len_pattern >= max_length: + long_patterns.append(pattern) + continue + + for idx, patterns_to_join in enumerate(join_patterns): + patterns_to_join, len_patterns_to_join = patterns_to_join + if len_pattern + len_patterns_to_join < max_length: + patterns_to_join.append(pattern) + len_patterns_to_join += len_pattern + join_patterns[idx] = (patterns_to_join, len_patterns_to_join) + len_pattern = 0 + break + if len_pattern: + join_patterns.append(([pattern], len_pattern)) + join_patterns.sort(key=lambda x: x[1]) + + if join_patterns: + # pattern, _, choice = pattern.endswith(RegexCoarser.ANY_CHARS) + join_patterns = [ + RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns_to_join) + ')' + for patterns_to_join, _ in join_patterns + ] + + return join_patterns + long_patterns diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index c49536f..24db468 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -16,31 +16,39 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -"""web-service for policy_handler""" +"""web-server for policy_handler""" -import logging import json +import logging from datetime import datetime + import cherrypy from .config import Config +from .deploy_handler import PolicyUpdateMessage from .onap.audit import Audit -from .policy_rest import PolicyRest +from .policy_matcher import PolicyMatcher from .policy_receiver import PolicyReceiver +from .policy_rest import PolicyRest + class PolicyWeb(object): - """run REST API of policy-handler""" - SERVER_HOST = "0.0.0.0" + """run http API of policy-handler on 0.0.0.0:wservice_port - any incoming address""" + HOST_INADDR_ANY = ".".join("0"*4) logger = logging.getLogger("policy_handler.policy_web") @staticmethod def run_forever(audit): """run the web-server of the policy-handler forever""" - PolicyWeb.logger.info("policy_handler web-service at port(%d)...", Config.wservice_port) - cherrypy.config.update({"server.socket_host": PolicyWeb.SERVER_HOST, - 'server.socket_port': Config.wservice_port}) + PolicyWeb.logger.info("policy_handler web-server on port(%d)...", Config.wservice_port) + cherrypy.config.update({"server.socket_host": PolicyWeb.HOST_INADDR_ANY, + "server.socket_port": Config.wservice_port}) cherrypy.tree.mount(_PolicyWeb(), '/') - audit.info("running policy_handler web-service at port({0})".format(Config.wservice_port)) + audit.info("running policy_handler web-server as {0}:{1}".format( + cherrypy.server.socket_host, cherrypy.server.socket_port)) + PolicyWeb.logger.info("running policy_handler web-server as %s:%d with config: %s", + cherrypy.server.socket_host, cherrypy.server.socket_port, + json.dumps(cherrypy.config)) cherrypy.engine.start() class _PolicyWeb(object): @@ -81,7 +89,9 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - result = PolicyRest.get_latest_policies(audit) + result, policy_update = PolicyMatcher.get_latest_policies(audit) + if policy_update and isinstance(policy_update, PolicyUpdateMessage): + result["policy_update"] = policy_update.get_message() PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result)) @@ -96,8 +106,7 @@ class _PolicyWeb(object): @cherrypy.tools.json_in() def policies_latest(self): """ - on :GET: retrieves all the latest policies from policy-engine that are - in the scope of the policy-handler. + on :GET: retrieves all the latest policies from policy-engine that are deployed on :POST: expects to receive the params that mimic the /getConfig of policy-engine and retrieves the matching policies from policy-engine and picks the latest on each policy. @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. <groupId>org.onap.dcaegen2.platform</groupId> <artifactId>policy-handler</artifactId> <name>dcaegen2-platform-policy-handler</name> - <version>3.0.1-SNAPSHOT</version> + <version>4.0.0-SNAPSHOT</version> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -23,7 +23,7 @@ from setuptools import setup setup(
name='policyhandler',
description='DCAE-Controller policy-handler to communicate with policy-engine',
- version="3.0.1",
+ version="4.0.0",
author='Alex Shatov',
packages=['policyhandler'],
zip_safe=False,
diff --git a/tests/mock_settings.py b/tests/mock_settings.py index 28bc6cc..017ad7e 100644 --- a/tests/mock_settings.py +++ b/tests/mock_settings.py @@ -45,7 +45,7 @@ class Settings(object): Config.load_from_file("etc_upload/config.json") - Config.settings["catch_up"] = {"interval": 10, "max_skips": 2} + Config.settings["catch_up"] = {"interval": 10} Settings.logger = logging.getLogger("policy_handler.unit_test") sys.stdout = LogWriter(Settings.logger.info) diff --git a/tests/test_policy_utils.py b/tests/test_policy_utils.py new file mode 100644 index 0000000..b88f1ea --- /dev/null +++ b/tests/test_policy_utils.py @@ -0,0 +1,186 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""test of the policy_utils""" + +import json +import logging +import re + +from policyhandler.config import Config +from policyhandler.policy_utils import RegexCoarser + +Config.load_from_file() +LOGGER = logging.getLogger("policy_handler.unit_test_policy_utils") + + +def check_coarse_regex(test_name, patterns, matching_strings=None, expected_subpatterns=None): + """generic test""" + regex_coarser = RegexCoarser(patterns) + coarse_patterns = regex_coarser.get_coarse_regex_patterns(max_length=20) + LOGGER.info("check_coarse_regex %s (%s) for [%s]", + test_name, coarse_patterns, json.dumps(regex_coarser.patterns)) + coarse_regexes = [re.compile(coarse_pattern) for coarse_pattern in coarse_patterns] + coarse_patterns_str = json.dumps(coarse_patterns) + if matching_strings: + for test_str in matching_strings: + LOGGER.info(" match '%s' to %s (%s)", test_str, test_name, coarse_patterns_str) + assert bool(list(filter(None, [ + coarse_regex.match(test_str) for coarse_regex in coarse_regexes + ]))) + + if expected_subpatterns: + for subpattern in expected_subpatterns: + LOGGER.info(" subpattern '%s' in %s", subpattern, coarse_patterns_str) + assert subpattern in coarse_patterns_str + +def check_combined_regex(test_name, patterns, matching_strings=None, unmatching_strings=None): + """generic test""" + regex_coarser = RegexCoarser(patterns) + combined_pattern = regex_coarser.get_combined_regex_pattern() + LOGGER.info("check_combined_regex %s (%s) for [%s]", + test_name, combined_pattern, json.dumps(regex_coarser.patterns)) + coarse_regex = re.compile(combined_pattern) + if matching_strings: + for test_str in matching_strings: + LOGGER.info(" match '%s' to %s (%s)", test_str, test_name, combined_pattern) + assert coarse_regex.match(test_str) + + if unmatching_strings: + for test_str in unmatching_strings: + LOGGER.info(" not match '%s' to %s (%s)", test_str, test_name, combined_pattern) + assert not coarse_regex.match(test_str) + +def test_regex_coarser(): + """test variety of regex combinations""" + + test_data = [ + ( + "simple", + [ + "plain text", "plain pick", + "aaa (((a|b)|c)|d)", + "aaa (((a|b)|c)|d zzz", + "nested (expr[aeiou]ss(ions)?)", + "nested (expr[aeiou]ss(?:ions|ion)?)", + "^ (any expr|more|less|some|who cares)", + " (any expr|more|less|some|who cares)", + "(any expr|more|less|some|who cares)" + ], + [ + 'plain text', + 'nested exprussions', + 'nested expross', + 'aaa c', + 'aaa d', + 'who cares', + ' who cares' + ], + None, + [ + "nested .*", + "plain .*", + "aaa .*", + "(any expr|more|less|some|who cares)", + "^ (any expr|more|less|some|who cares)", + " (any expr|more|less|some|who cares)"] + ), + ( + "combination", + [ + "plain text", + "^with* modifiers?", + "cha[ra][ra]cter classes", + "^with (groups)", + "^with (groups|more groups)", + r"^with (mod+ifiers\s*|in groups{2,3}\s*)+", + "sub", + "substrings", + "su.*bstrings", + "char count{1,3}s", + "nested (expr[aeiou]ss(ions)?)", + r"escaped (\)) chars", + r"escaped ([\)\]]) chars", + r"escaped ([\)\]]){3} chars" + ], + [ + 'plain text', + 'withhh modifier', + 'character classes', + 'with groups', + 'with more groups', + 'with modddifiers in groupss modifiers in groupsss', + 'sub', + 'substrings', + 'char counttts', + 'nested exprassions', + 'nested express', + 'escaped ) chars', + 'escaped ] chars', + 'escaped ]]] chars' + ], + [ + 'plain', + 'text', + 'something with modifiers', + 'su', + 'char counttttts', + 'escaped ]] chars' + ], + [ + "nested .*", + "escaped .*", + "^wit.*", + "plain text", + "cha.*", + "su.*" + ] + ), + ( + "combined", + [ + 'foo+', + 'food', + 'football', + "ba[rh]", + "bard" + ], + [ + 'foo', + 'fooooo', + 'football', + 'food', + 'bar', + 'bah', + 'bard' + ], + [ + 'fo', + 'bat' + ], + [ + "fo.*", + "ba.*" + ] + ) + ] + + for (test_name, patterns, + matching_strings, unmatching_strings, expected_subpatterns) in test_data: + check_combined_regex(test_name, patterns, matching_strings, unmatching_strings) + check_coarse_regex(test_name, patterns, matching_strings, expected_subpatterns) diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index ec0b030..eda1981 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -25,7 +25,6 @@ import time import uuid import pytest - import cherrypy from cherrypy.test.helper import CPWebCase @@ -34,10 +33,9 @@ from policyhandler.deploy_handler import DeployHandler from policyhandler.discovery import DiscoveryClient from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode) -from policyhandler.policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, - LATEST_POLICIES, POLICY_BODY, +from policyhandler.policy_consts import (LATEST_POLICIES, POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, - POLICY_VERSION, SCOPE_PREFIXES) + POLICY_VERSION, POLICY_VERSIONS) from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, REMOVED_POLICIES, PolicyReceiver) from policyhandler.policy_rest import PolicyRest @@ -125,9 +123,9 @@ class MonkeyPolicyBody(object): } -class MonkeyPolicyEngine(object): +class MockPolicyEngine(object): """pretend this is the policy-engine""" - _scope_prefix = Config.settings["scope_prefixes"][0] + scope_prefix = "test_scope_prefix.Config_" LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split() LONG_TEXT = "0123456789" * 100 _policies = [] @@ -135,13 +133,13 @@ class MonkeyPolicyEngine(object): @staticmethod def init(): """init static vars""" - MonkeyPolicyEngine._policies = [ + MockPolicyEngine._policies = [ MonkeyPolicyBody.create_policy_body( - MonkeyPolicyEngine._scope_prefix + policy_id, policy_index + 1) - for policy_id in MonkeyPolicyEngine.LOREM_IPSUM - for policy_index in range(1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))] - Settings.logger.info("MonkeyPolicyEngine._policies: %s", - json.dumps(MonkeyPolicyEngine._policies)) + MockPolicyEngine.scope_prefix + policy_id, policy_index + 1) + for policy_id in MockPolicyEngine.LOREM_IPSUM + for policy_index in range(1 + MockPolicyEngine.LOREM_IPSUM.index(policy_id))] + Settings.logger.info("MockPolicyEngine._policies: %s", + json.dumps(MockPolicyEngine._policies)) @staticmethod def get_config(policy_name): @@ -149,61 +147,75 @@ class MonkeyPolicyEngine(object): if not policy_name: return [] return [copy.deepcopy(policy) - for policy in MonkeyPolicyEngine._policies + for policy in MockPolicyEngine._policies if re.match(policy_name, policy[POLICY_NAME])] @staticmethod def get_configs_all(): """get all policies the way the policy-engine finds""" policies = [copy.deepcopy(policy) - for policy in MonkeyPolicyEngine._policies] + for policy in MockPolicyEngine._policies] for policy in policies: - policy["config"] = MonkeyPolicyEngine.LONG_TEXT + policy["config"] = MockPolicyEngine.LONG_TEXT return policies @staticmethod def get_policy_id(policy_index): """get the policy_id by index""" - return (MonkeyPolicyEngine._scope_prefix - + MonkeyPolicyEngine.LOREM_IPSUM[ - policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)]) + return (MockPolicyEngine.scope_prefix + + MockPolicyEngine.LOREM_IPSUM[ + policy_index % len(MockPolicyEngine.LOREM_IPSUM)]) @staticmethod - def gen_policy_latest(policy_index): + def gen_policy_latest(policy_index, version_offset=0): """generate the policy response by policy_index = version - 1""" - policy_id = MonkeyPolicyEngine.get_policy_id(policy_index) - expected_policy = { + policy_id = MockPolicyEngine.get_policy_id(policy_index) + policy = { POLICY_ID: policy_id, - POLICY_BODY: MonkeyPolicyBody.create_policy_body(policy_id, policy_index + 1) + POLICY_BODY: MonkeyPolicyBody.create_policy_body( + policy_id, policy_index + 1 - version_offset) } - return policy_id, PolicyUtils.parse_policy_config(expected_policy) + return policy_id, PolicyUtils.parse_policy_config(policy) @staticmethod - def gen_all_policies_latest(): + def gen_all_policies_latest(version_offset=0): """generate all latest policies""" - return { - LATEST_POLICIES: dict(MonkeyPolicyEngine.gen_policy_latest(policy_index) - for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))), - ERRORED_SCOPES: ["DCAE.Config_*"], - SCOPE_PREFIXES: ["DCAE.Config_*"], - ERRORED_POLICIES: {} - } + return dict(MockPolicyEngine.gen_policy_latest(policy_index, version_offset=version_offset) + for policy_index in range(len(MockPolicyEngine.LOREM_IPSUM))) @staticmethod def gen_policies_latest(match_to_policy_name): """generate all latest policies""" - return { - LATEST_POLICIES: - dict((k, v) - for k, v in MonkeyPolicyEngine.gen_all_policies_latest() - [LATEST_POLICIES].items() - if re.match(match_to_policy_name, k)), - ERRORED_SCOPES: [], - ERRORED_POLICIES: {} - } + return dict((k, v) + for k, v in MockPolicyEngine.gen_all_policies_latest().items() + if re.match(match_to_policy_name, k)) + +MockPolicyEngine.init() -MonkeyPolicyEngine.init() + +class MockDeploymentHandler(object): + """pretend this is the deployment-handler""" + + @staticmethod + def default_response(): + """generate the deployed policies message""" + return {"server_instance_uuid": Settings.deploy_handler_instance_uuid} + + @staticmethod + def get_deployed_policies(): + """generate the deployed policies message""" + response = MockDeploymentHandler.default_response() + policies = dict( + (policy_id, { + POLICY_ID: policy_id, + POLICY_VERSIONS: {policy.get(POLICY_BODY, {}).get(POLICY_VERSION, "999"): True}, + "pending_update": False}) + for policy_id, policy in (MockPolicyEngine.gen_all_policies_latest(version_offset=1) + .items())) + response["policies"] = policies + + return response @pytest.fixture() @@ -211,7 +223,7 @@ def fix_pdp_post(monkeypatch): """monkeyed request /getConfig to PDP""" def monkeyed_policy_rest_post(full_path, json=None, headers=None): """monkeypatch for the POST to policy-engine""" - res_json = MonkeyPolicyEngine.get_config(json.get(POLICY_NAME)) + res_json = MockPolicyEngine.get_config(json.get(POLICY_NAME)) return MonkeyedResponse(full_path, res_json, json, headers) Settings.logger.info("setup fix_pdp_post") @@ -227,7 +239,7 @@ def fix_pdp_post_big(monkeypatch): """monkeyed request /getConfig to PDP""" def monkeyed_policy_rest_post(full_path, json=None, headers=None): """monkeypatch for the POST to policy-engine""" - res_json = MonkeyPolicyEngine.get_configs_all() + res_json = MockPolicyEngine.get_configs_all() return MonkeyedResponse(full_path, res_json, json, headers) Settings.logger.info("setup fix_pdp_post_big") @@ -280,20 +292,26 @@ def fix_select_latest_policies_boom(monkeypatch): @pytest.fixture() def fix_deploy_handler(monkeypatch, fix_discovery): - """monkeyed discovery request.get""" - def monkeyed_deploy_handler(full_path, json=None, headers=None): - """monkeypatch for deploy_handler""" - return MonkeyedResponse( - full_path, - {"server_instance_uuid": Settings.deploy_handler_instance_uuid}, - json, headers - ) + """monkeyed requests to deployment-handler""" + def monkeyed_deploy_handler_put(full_path, json=None, headers=None): + """monkeypatch for policy-update request.put to deploy_handler""" + return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), + json, headers) + + def monkeyed_deploy_handler_get(full_path, headers=None): + """monkeypatch policy-update request.get to deploy_handler""" + return MonkeyedResponse(full_path, MockDeploymentHandler.get_deployed_policies(), + None, headers) Settings.logger.info("setup fix_deploy_handler") audit = Audit(req_message="fix_deploy_handler") DeployHandler._lazy_init(audit) - monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', - monkeyed_deploy_handler) + + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', + monkeyed_deploy_handler_put) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', + monkeyed_deploy_handler_get) + yield fix_deploy_handler # provide the fixture value Settings.logger.info("teardown fix_deploy_handler") @@ -301,7 +319,7 @@ def fix_deploy_handler(monkeypatch, fix_discovery): @pytest.fixture() def fix_deploy_handler_fail(monkeypatch, fix_discovery): """monkeyed failed discovery request.get""" - def monkeyed_deploy_handler(full_path, json=None, headers=None): + def monkeyed_deploy_handler_put(full_path, json=None, headers=None): """monkeypatch for deploy_handler""" res = MonkeyedResponse( full_path, @@ -311,6 +329,11 @@ def fix_deploy_handler_fail(monkeypatch, fix_discovery): res.status_code = 413 return res + def monkeyed_deploy_handler_get(full_path, headers=None): + """monkeypatch policy-update request.get to deploy_handler""" + return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), + None, headers) + @staticmethod def monkeyed_deploy_handler_init(audit_ignore, rediscover=False): """monkeypatch for deploy_handler init""" @@ -318,28 +341,32 @@ def fix_deploy_handler_fail(monkeypatch, fix_discovery): Settings.logger.info("setup fix_deploy_handler_fail") config_catch_up = Config.settings["catch_up"] - Config.settings["catch_up"] = {"interval": 1, "max_skips": 0} + Config.settings["catch_up"] = {"interval": 1} audit = Audit(req_message="fix_deploy_handler_fail") DeployHandler._lazy_init(audit, rediscover=True) - monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', - monkeyed_deploy_handler) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init', monkeyed_deploy_handler_init) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', + monkeyed_deploy_handler_put) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', + monkeyed_deploy_handler_get) + yield fix_deploy_handler_fail Settings.logger.info("teardown fix_deploy_handler_fail") Config.settings["catch_up"] = config_catch_up -def monkeyed_cherrypy_engine_exit(): - """monkeypatch for deploy_handler""" - Settings.logger.info("cherrypy_engine_exit()") - - @pytest.fixture() def fix_cherrypy_engine_exit(monkeypatch): """monkeyed cherrypy.engine.exit()""" Settings.logger.info("setup fix_cherrypy_engine_exit") + + def monkeyed_cherrypy_engine_exit(): + """monkeypatch for deploy_handler""" + Settings.logger.info("cherrypy_engine_exit()") + monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit', monkeyed_cherrypy_engine_exit) yield fix_cherrypy_engine_exit # provide the fixture value @@ -358,7 +385,7 @@ class MonkeyedWebSocket(object): message = { LOADED_POLICIES: [ {POLICY_NAME: "{0}.{1}.xml".format( - MonkeyPolicyEngine.get_policy_id(policy_index), policy_index + 1), + MockPolicyEngine.get_policy_id(policy_index), policy_index + 1), POLICY_VER: str(policy_index + 1)} for policy_index in updated_indexes or [] ], @@ -413,7 +440,7 @@ def fix_policy_receiver_websocket(monkeypatch): def test_get_policy_latest(fix_pdp_post): """test /policy_latest/<policy-id>""" - policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) audit = Audit(job_name="test_get_policy_latest", req_message="get /policy_latest/{0}".format(policy_id or "")) @@ -444,7 +471,7 @@ class WebServerTest(CPWebCase): def test_web_policy_latest(self): """test /policy_latest/<policy-id>""" - policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus('200 OK') @@ -458,10 +485,10 @@ class WebServerTest(CPWebCase): result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) + @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" - expected_policies = MonkeyPolicyEngine.gen_all_policies_latest() - expected_policies = expected_policies[LATEST_POLICIES] + expected_policies = MockPolicyEngine.gen_all_policies_latest() result = self.getPage("/policies_latest") Settings.logger.info("result: %s", result) @@ -481,9 +508,8 @@ class WebServerTest(CPWebCase): def test_web_policies_latest(self): """test POST /policies_latest with policyName""" - match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" - expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name) - expected_policies = expected_policies[LATEST_POLICIES] + match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" + expected_policies = MockPolicyEngine.gen_policies_latest(match_to_policy_name) body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', @@ -629,7 +655,7 @@ class WebServerPDPBoomTest(CPWebCase): def test_web_policy_latest(self): """test /policy_latest/<policy-id>""" - policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, _ = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) @@ -637,6 +663,7 @@ class WebServerPDPBoomTest(CPWebCase): result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) + @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" result = self.getPage("/policies_latest") @@ -649,7 +676,7 @@ class WebServerPDPBoomTest(CPWebCase): def test_web_policies_latest(self): """test POST /policies_latest with policyName""" - match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', @@ -789,7 +816,7 @@ class WebServerInternalBoomTest(CPWebCase): def test_web_policy_latest(self): """test /policy_latest/<policy-id>""" - policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, _ = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) @@ -797,6 +824,7 @@ class WebServerInternalBoomTest(CPWebCase): result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) + @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" result = self.getPage("/policies_latest") @@ -809,7 +837,7 @@ class WebServerInternalBoomTest(CPWebCase): def test_web_policies_latest(self): """test POST /policies_latest with policyName""" - match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', diff --git a/version.properties b/version.properties index 77d3d8c..ddff91c 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ -major=3
+major=4
minor=0
-patch=1
+patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT
|