summaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-08-24 13:15:04 -0400
committerAlex Shatov <alexs@att.com>2018-08-24 13:15:04 -0400
commit1d693376205c66af93283d04e8e9740c947a7d02 (patch)
tree9188af307614661c1afbe50cdaa2fa8a2cdc691c /policyhandler
parent1cddbc70e4799970dc606014ef79e025d6a8e722 (diff)
4.2.0 policy-handler - periodic reconfigure
- reconfigure == periodically retrieve the policy-handler config from consul-kv and compare to previous config and subconfigs. If changed, reconfigure the subunits - selectively change one or any settings for the following = catch_up timer interval = reconfigure timer interval = deployment-handler url and params (thread-safe) = policy-engine url and params (thread-safe) = web-socket url to policy-engine (through a callback) - each subunit has its own Settings that keep track of changes - try-catch and metrics around discovery - consul API - hidden the secrets from logs - froze the web-socket version to 0.49.0 because 0.50.0 and 0.51.0 are broken - looking around for stable alternatives - fixed-adapted the callbacks passed to the web-socket lib that changed its API in 0.49.0 and later - log the stack on the exception occurring in the web-socket lib - unit test refactoring Change-Id: Id53bad59660a197f59d9aeb7c05ab761d1060cd0 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-470
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/__main__.py12
-rw-r--r--policyhandler/config.py174
-rw-r--r--policyhandler/deploy_handler.py106
-rw-r--r--policyhandler/discovery.py119
-rw-r--r--policyhandler/onap/audit.py3
-rw-r--r--policyhandler/policy_consts.py1
-rw-r--r--policyhandler/policy_receiver.py66
-rw-r--r--policyhandler/policy_rest.py111
-rw-r--r--policyhandler/policy_updater.py144
-rw-r--r--policyhandler/policy_utils.py12
-rw-r--r--policyhandler/step_timer.py4
11 files changed, 556 insertions, 196 deletions
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py
index 04ca657..63dc5da 100644
--- a/policyhandler/__main__.py
+++ b/policyhandler/__main__.py
@@ -35,20 +35,20 @@ from policyhandler.web_server import PolicyWeb
def run_policy_handler():
"""main run function for policy-handler"""
- Config.load_from_file()
- Config.discover()
+ Config.init_config()
logger = logging.getLogger("policy_handler")
sys.stdout = LogWriter(logger.info)
sys.stderr = LogWriter(logger.error)
logger.info("========== run_policy_handler ========== %s", __package__)
- Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
-
- logger.info("starting policy_handler with config:")
- logger.info(Audit.log_json_dumps(Config.settings))
+ Audit.init(Config.system_name, Config.LOGGER_CONFIG_FILE_PATH)
audit = Audit(req_message="start policy handler")
+
+ Config.discover(audit)
+ logger.info("starting policy_handler with config: %s", Config.discovered_config)
+
PolicyReceiver.run(audit)
PolicyWeb.run_forever(audit)
diff --git a/policyhandler/config.py b/policyhandler/config.py
index 8e6edf9..3d68235 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -25,64 +25,131 @@ import logging.config
import os
from .discovery import DiscoveryClient
+from .onap.audit import Audit
+from .policy_utils import Utils
+
+LOGS_DIR = 'logs'
+
+try:
+ os.makedirs(LOGS_DIR, mode=0o770, exist_ok=True)
+except Exception:
+ pass
logging.basicConfig(
- filename='logs/policy_handler.log', \
- format='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \
- '%(threadName)s %(name)s.%(funcName)s: %(message)s', \
+ filename=os.path.join(LOGS_DIR, 'policy_handler.log'),
+ format=('%(asctime)s.%(msecs)03d %(levelname)+8s ' +
+ '%(threadName)s %(name)s.%(funcName)s: %(message)s'),
datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG)
+class Settings(object):
+ """settings of module or an application
+ that is the config filtered by the collection of config-keys.
+
+ keeps track of changes versus the previous set_config unless committed
+ """
+ def __init__(self, *config_keys):
+ """provide the collection of top level keys in config to limit the config"""
+ self._config_keys = config_keys
+ self._changed = False
+ self._config = None
+ self._prev_config = None
+
+ def __str__(self):
+ """get str of the config"""
+ return Audit.json_dumps({
+ "config_keys": self._config_keys,
+ "changed": self._changed,
+ "config": self._config,
+ "prev_config": self._prev_config
+ })
+
+ def is_loaded(self):
+ """whether loaded already"""
+ return bool(self._config)
+
+ def commit_change(self):
+ """set the prev config to the latest config"""
+ self._prev_config = copy.deepcopy(self._config)
+ self._changed = False
+
+ def _set_changed(self):
+ """determine whether the config changed"""
+ self._changed = not (self._prev_config
+ and Utils.are_the_same(self._prev_config, self._config,
+ Audit.json_dumps))
+
+ def set_config(self, config, auto_commit=False):
+ """update the config"""
+ self.commit_change()
+
+ if isinstance(config, Settings):
+ config = config._config
+
+ if not isinstance(config, dict):
+ config = {}
+
+ self._config = copy.deepcopy(dict((k, v) for (k, v) in config.items()
+ if not self._config_keys or k in self._config_keys))
+
+ if auto_commit:
+ self.commit_change()
+ else:
+ self._set_changed()
+
+ def is_changed(self):
+ """whether the config has changed"""
+ return self._changed
+
+ def get_by_key(self, config_key, default=None):
+ """get the latest sub config by config_key and whether it has changed"""
+ if not config_key or not isinstance(config_key, str):
+ return False, default
+
+ value = copy.deepcopy(self._config.get(config_key, default))
+ if not self._prev_config:
+ return True, value
+ prev_value = self._prev_config.get(config_key, default)
+ return self._changed and not Utils.are_the_same(prev_value, value, Audit.json_dumps), value
+
+ def update(self, config_key, value=None):
+ """set the latest sub config by config_key and determine whether the config has changed"""
+ if not config_key:
+ return
+
+ self._config[config_key] = copy.deepcopy(value)
+ self._set_changed()
+
+
class Config(object):
"""main config of the application"""
+ _logger = logging.getLogger("policy_handler.config")
CONFIG_FILE_PATH = "etc/config.json"
LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config"
SERVICE_NAME_POLICY_HANDLER = "policy_handler"
+
FIELD_SYSTEM = "system"
FIELD_WSERVICE_PORT = "wservice_port"
FIELD_POLICY_ENGINE = "policy_engine"
+ POOL_CONNECTIONS = "pool_connections"
+ DEPLOY_HANDLER = "deploy_handler"
+ THREAD_POOL_SIZE = "thread_pool_size"
+ POLICY_RETRY_COUNT = "policy_retry_count"
+ POLICY_RETRY_SLEEP = "policy_retry_sleep"
+ RECONFIGURE = "reconfigure"
+ TIMER_INTERVAL = "interval"
+
+ system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
- _logger = logging.getLogger("policy_handler.config")
- settings = None
+ _local_config = Settings()
+ discovered_config = Settings()
@staticmethod
- def merge(new_config):
- """merge the new_config into current config - override the values"""
- if not new_config:
- return
-
- if not Config.settings:
- Config.settings = new_config
- return
-
- new_config = copy.deepcopy(new_config)
- Config.settings.update(new_config)
-
- @staticmethod
- def get_system_name():
- """find the name of the policy-handler system
- to be used as the key in consul-kv for config of policy-handler
- """
- return (Config.settings or {}).get(Config.FIELD_SYSTEM, Config.SERVICE_NAME_POLICY_HANDLER)
-
- @staticmethod
- def discover():
- """bring and merge the config settings from the discovery service"""
- discovery_key = Config.get_system_name()
- new_config = DiscoveryClient.get_value(discovery_key)
-
- if not new_config or not isinstance(new_config, dict):
- Config._logger.warning("unexpected config from discovery: %s", new_config)
+ def init_config(file_path=None):
+ """read and store the config from config file"""
+ if Config._local_config.is_loaded():
+ Config._logger.info("config already inited: %s", Config._local_config)
return
- Config._logger.debug("loaded config from discovery(%s): %s", \
- discovery_key, json.dumps(new_config))
- Config._logger.debug("config before merge from discovery: %s", json.dumps(Config.settings))
- Config.merge(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- Config._logger.info("merged config from discovery: %s", json.dumps(Config.settings))
-
- @staticmethod
- def load_from_file(file_path=None):
- """read and store the config from config file"""
if not file_path:
file_path = Config.CONFIG_FILE_PATH
@@ -101,6 +168,25 @@ class Config(object):
logging.config.dictConfig(logging_config)
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
- Config.merge(loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- Config._logger.info("config loaded from file: %s", json.dumps(Config.settings))
- return True
+
+ local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)
+ Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name)
+
+ Config._local_config.set_config(local_config, auto_commit=True)
+ Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config)
+
+ @staticmethod
+ def discover(audit):
+ """bring and merge the config settings from the discovery service"""
+ discovery_key = Config.system_name
+ new_config = DiscoveryClient.get_value(audit, discovery_key)
+
+ if not new_config or not isinstance(new_config, dict):
+ Config._logger.warning("unexpected config from discovery: %s", new_config)
+ return
+
+ Config._logger.debug("loaded config from discovery(%s): %s",
+ discovery_key, Audit.json_dumps(new_config))
+
+ Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
+ Config._logger.info("config from discovery: %s", Config.discovered_config)
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 6d258f2..e308c1a 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -21,17 +21,18 @@
import json
import logging
from copy import copy, deepcopy
+from threading import Lock
import requests
-from .config import Config
+from .config import Config, Settings
from .customize import CustomizerUser
from .discovery import DiscoveryClient
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
POLICY_FILTER_MATCHES, POLICY_FILTERS,
- REMOVED_POLICIES)
+ REMOVED_POLICIES, TARGET_ENTITY)
class PolicyUpdateMessage(object):
@@ -144,7 +145,11 @@ class PolicyUpdateMessage(object):
class DeployHandler(object):
"""calling the deployment-handler web apis"""
_logger = logging.getLogger("policy_handler.deploy_handler")
+ DEFAULT_TARGET_ENTITY = "deployment_handler"
+
_lazy_inited = False
+ _lock = Lock()
+ _settings = Settings(Config.POOL_CONNECTIONS, Config.DEPLOY_HANDLER)
_requests_session = None
_url = None
@@ -157,11 +162,8 @@ class DeployHandler(object):
server_instance_changed = False
@staticmethod
- def _lazy_init(audit, rediscover=False):
- """ set static properties """
- if DeployHandler._lazy_inited and not rediscover:
- return
-
+ def _init(audit):
+ """set config"""
DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
.get_deploy_handler_kwargs(audit))
if (not DeployHandler._custom_kwargs
@@ -169,18 +171,18 @@ class DeployHandler(object):
DeployHandler._custom_kwargs = {}
if not DeployHandler._requests_session:
- pool_size = Config.settings.get("pool_connections", 20)
DeployHandler._requests_session = requests.Session()
+
+ changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10)
+ if changed:
DeployHandler._requests_session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
DeployHandler._requests_session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
- config_dh = Config.settings.get("deploy_handler")
+ _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER)
if config_dh and isinstance(config_dh, dict):
# dns based routing to deployment-handler
# config for policy-handler >= 2.4.0
@@ -192,7 +194,8 @@ class DeployHandler(object):
# "cfy_tenant_name" : "default_tenant"
# }
# }
- DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
+ DeployHandler._target_entity = config_dh.get(TARGET_ENTITY,
+ DeployHandler.DEFAULT_TARGET_ENTITY)
DeployHandler._url = config_dh.get("url")
DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
DeployHandler._max_msg_length_mb)
@@ -205,19 +208,45 @@ class DeployHandler(object):
if not isinstance(config_dh, dict):
# config for policy-handler <= 2.3.1
# "deploy_handler" : "deployment_handler"
- DeployHandler._target_entity = str(config_dh or "deployment_handler")
+ DeployHandler._target_entity = str(config_dh or DeployHandler.DEFAULT_TARGET_ENTITY)
DeployHandler._url = DiscoveryClient.get_service_url(audit,
DeployHandler._target_entity)
DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
- DeployHandler._logger.info(
- "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
+ DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity,
+ DeployHandler._url_policy, DeployHandler._settings)
+ DeployHandler._settings.commit_change()
DeployHandler._lazy_inited = bool(DeployHandler._url)
+ @staticmethod
+ def reconfigure(audit):
+ """reconfigure"""
+ with DeployHandler._lock:
+ DeployHandler._settings.set_config(Config.discovered_config)
+ if not DeployHandler._settings.is_changed():
+ DeployHandler._settings.commit_change()
+ return False
+
+ DeployHandler._lazy_inited = False
+ DeployHandler._init(audit)
+ return True
@staticmethod
- def policy_update(audit, policy_update_message, rediscover=False):
+ def _lazy_init(audit):
+ """set config"""
+ if DeployHandler._lazy_inited:
+ return
+
+ with DeployHandler._lock:
+ if DeployHandler._lazy_inited:
+ return
+
+ DeployHandler._settings.set_config(Config.discovered_config)
+ DeployHandler._init(audit)
+
+ @staticmethod
+ def policy_update(audit, policy_update_message):
"""
segments the big policy_update_message limited by size
and sequatially sends each segment as put to deployment-handler at /policy.
@@ -227,7 +256,7 @@ class DeployHandler(object):
if not policy_update_message or policy_update_message.empty():
return
- DeployHandler._lazy_init(audit, rediscover)
+ DeployHandler._lazy_init(audit)
str_metrics = "policy_update {0}".format(str(policy_update_message))
@@ -277,11 +306,14 @@ class DeployHandler(object):
res = None
try:
- res = DeployHandler._requests_session.put(
- DeployHandler._url_policy, json=message,
- headers=headers, params=DeployHandler._query,
- **DeployHandler._custom_kwargs
- )
+ with DeployHandler._lock:
+ session = DeployHandler._requests_session
+ url = DeployHandler._url_policy
+ params = deepcopy(DeployHandler._query)
+ custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
+
+ res = session.put(url, json=message,
+ headers=headers, params=params, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -311,12 +343,12 @@ class DeployHandler(object):
@staticmethod
- def get_deployed_policies(audit, rediscover=False):
+ def get_deployed_policies(audit):
"""
Retrieves policies and policy-filters from components
that were deployed by deployment-handler
"""
- DeployHandler._lazy_init(audit, rediscover)
+ DeployHandler._lazy_init(audit)
metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_policy)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
@@ -338,10 +370,13 @@ class DeployHandler(object):
res = None
try:
- res = DeployHandler._requests_session.get(
- DeployHandler._url_policy, headers=headers, params=DeployHandler._query,
- **DeployHandler._custom_kwargs
- )
+ with DeployHandler._lock:
+ session = DeployHandler._requests_session
+ url = DeployHandler._url_policy
+ params = deepcopy(DeployHandler._query)
+ custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
+
+ res = session.get(url, headers=headers, params=params, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -390,7 +425,6 @@ class DeployHandler(object):
and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
DeployHandler.server_instance_changed = True
- log_line = ("deployment_handler_changed: {1} != {0}"
- .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
- metrics.info(log_line)
- DeployHandler._logger.info(log_line)
+ DeployHandler._logger.info(metrics.info(
+ "deployment_handler_changed: {1} != {0}"
+ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)))
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
index ce24c3d..4e6bc3d 100644
--- a/policyhandler/discovery.py
+++ b/policyhandler/discovery.py
@@ -25,6 +25,8 @@ import logging
import requests
from .customize import CustomizerUser
+from .onap.audit import AuditHttpCode, Metrics
+
class DiscoveryClient(object):
"""talking to consul at http://consul:8500
@@ -40,56 +42,103 @@ class DiscoveryClient(object):
-p <outport>:<innerport>
${APPNAME}:latest
"""
+ CONSUL_ENTITY = "consul"
CONSUL_SERVICE_MASK = "http://consul:8500/v1/catalog/service/{0}"
CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}"
_logger = logging.getLogger("policy_handler.discovery")
@staticmethod
- def get_service_url(audit, service_name):
+ def _discover_service(audit, service_name, service_path):
"""find the service record in consul"""
- service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name)
- log_line = "discover {0}".format(service_path)
- DiscoveryClient._logger.info(log_line)
- audit.info(log_line)
response = requests.get(service_path)
-
- log_line = "response {0} for {1}: {2}".format(
- response.status_code, service_path, response.text)
- DiscoveryClient._logger.info(log_line)
- audit.info(log_line)
+ DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format(
+ response.status_code, service_path, response.text)))
response.raise_for_status()
+ status_code = response.status_code
+ service = response.json()[0]
+ return (status_code,
+ CustomizerUser.get_customizer().get_service_url(audit, service_name, service))
+
+ @staticmethod
+ def get_service_url(audit, service_name):
+ """find the service record in consul"""
+ service_path = DiscoveryClient.CONSUL_SERVICE_MASK.format(service_name)
+ metrics = Metrics(aud_parent=audit, targetEntity=DiscoveryClient.CONSUL_ENTITY,
+ targetServiceName=service_path)
+
+ log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, service_path)
- service = response.json()
- if not service:
- log_line = "failed discover {0}".format(service_path)
- DiscoveryClient._logger.error(log_line)
- audit.error(log_line)
- return
- service = service[0]
+ DiscoveryClient._logger.info(metrics.metrics_start(log_line))
+ status_code = None
+ try:
+ (status_code,
+ service_url) = DiscoveryClient._discover_service(audit, service_name, service_path)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line,
+ type(ex).__name__, str(ex)))
+ DiscoveryClient._logger.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None
- service_url = CustomizerUser.get_customizer().get_service_url(audit, service_name, service)
if not service_url:
- log_line = "failed to get service_url for {0}".format(service_name)
- DiscoveryClient._logger.error(log_line)
- audit.error(log_line)
- return
-
- log_line = "got service_url: {0} for {1}".format(service_url, service_name)
- DiscoveryClient._logger.info(log_line)
- audit.info(log_line)
+ error_code = AuditHttpCode.DATA_ERROR.value
+ error_msg = "failed {}/{} to {}".format(status_code, error_code, log_line)
+ DiscoveryClient._logger.error(audit.error(error_msg))
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None
+
+ log_line = "response {} {}".format(status_code, log_line)
+ DiscoveryClient._logger.info(audit.info("got service_url: {} after {}"
+ .format(service_url, log_line)))
+
+ metrics.set_http_status_code(status_code)
+ audit.set_http_status_code(status_code)
+ metrics.metrics(log_line)
return service_url
@staticmethod
- def get_value(key):
- """get the value for the key from consul-kv"""
- response = requests.get(DiscoveryClient.CONSUL_KV_MASK.format(key))
+ def _get_value_from_kv(url):
+ """get the value from consul-kv at discovery url"""
+ response = requests.get(url)
response.raise_for_status()
data = response.json()
- if not data:
- DiscoveryClient._logger.error("failed get_value %s", key)
- return
value = base64.b64decode(data[0]["Value"]).decode("utf-8")
- DiscoveryClient._logger.info("consul-kv key=%s value(%s) data=%s",
- key, value, json.dumps(data))
- return json.loads(value)
+ return response.status_code, json.loads(value)
+
+ @staticmethod
+ def get_value(audit, key):
+ """get the value for the key from consul-kv"""
+ discovery_url = DiscoveryClient.CONSUL_KV_MASK.format(key)
+ metrics = Metrics(aud_parent=audit, targetEntity=DiscoveryClient.CONSUL_ENTITY,
+ targetServiceName=discovery_url)
+
+ log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url)
+
+ DiscoveryClient._logger.info(metrics.metrics_start(log_line))
+ try:
+ status_code, value = DiscoveryClient._get_value_from_kv(discovery_url)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line,
+ type(ex).__name__, str(ex)))
+ DiscoveryClient._logger.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None
+
+ log_line = "response {} {}".format(status_code, log_line)
+ metrics.set_http_status_code(status_code)
+ audit.set_http_status_code(status_code)
+ metrics.metrics(log_line)
+ return value
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index db85c18..1bee4a7 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -339,7 +339,7 @@ class _Audit(object):
return obj
@staticmethod
- def log_json_dumps(obj, **kwargs):
+ def json_dumps(obj, **kwargs):
"""hide the known secret field values of the dictionary and return json.dumps"""
if not isinstance(obj, dict):
return json.dumps(obj, **kwargs)
@@ -455,6 +455,7 @@ class Metrics(_Audit):
self.info(log_line, **self.merge_all_kwargs(**kwargs))
_Audit._health.start(self._metrics_name, self.request_id)
_Audit._health.start(METRICS_TOTAL_STATS, self.request_id)
+ return log_line
def metrics(self, log_line, **kwargs):
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 51ac173..cde4551 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -36,3 +36,4 @@ POLICY_VERSIONS = "policy_versions"
MATCHING_CONDITIONS = "matchingConditions"
POLICY_NAMES = "policy_names"
POLICY_FILTER_MATCHES = "policy_filter_matches"
+TARGET_ENTITY = "target_entity"
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index bb33cd5..1edb24d 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -32,7 +32,7 @@ from threading import Lock, Thread
import websocket
-from .config import Config
+from .config import Config, Settings
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
@@ -47,25 +47,50 @@ class _PolicyReceiver(Thread):
def __init__(self):
"""web-socket inside the thread to receive policy notifications from PolicyEngine"""
- Thread.__init__(self, name="policy_receiver")
- self.daemon = True
+ Thread.__init__(self, name="policy_receiver", daemon=True)
self._lock = Lock()
self._keep_running = True
+ self._settings = Settings(Config.FIELD_POLICY_ENGINE)
- config = Config.settings[Config.FIELD_POLICY_ENGINE]
- self.web_socket_url = resturl = config["url"] + config["path_pdp"]
-
- if resturl.startswith("https:"):
- self.web_socket_url = resturl.replace("https:", "wss:") + "notifications"
- else:
- self.web_socket_url = resturl.replace("http:", "ws:") + "notifications"
-
+ self._web_socket_url = None
self._web_socket = None
+ self.reconfigure()
- self._policy_updater = PolicyUpdater()
+ self._policy_updater = PolicyUpdater(self.reconfigure)
self._policy_updater.start()
+ def reconfigure(self):
+ """configure and reconfigure the web-socket"""
+ with self._lock:
+ self._settings.set_config(Config.discovered_config)
+ changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+
+ if not changed:
+ self._settings.commit_change()
+ return False
+
+ prev_web_socket_url = self._web_socket_url
+ resturl = config.get("url", "") + config.get("path_pdp", "")
+
+ if resturl.startswith("https:"):
+ self._web_socket_url = resturl.replace("https:", "wss:") + "notifications"
+ else:
+ self._web_socket_url = resturl.replace("http:", "ws:") + "notifications"
+
+ if self._web_socket_url == prev_web_socket_url:
+ _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s",
+ self._web_socket_url, self._settings)
+ self._settings.commit_change()
+ return False
+
+ _PolicyReceiver._logger.info("changed web_socket_url(%s): %s",
+ self._web_socket_url, self._settings)
+ self._settings.commit_change()
+
+ self._stop_notifications()
+ return True
+
def run(self):
"""listen on web-socket and pass the policy notifications to policy-updater"""
websocket.enableTrace(True)
@@ -80,9 +105,9 @@ class _PolicyReceiver(Thread):
time.sleep(5)
_PolicyReceiver._logger.info(
- "connecting to policy-notifications at: %s", self.web_socket_url)
+ "connecting to policy-notifications at: %s", self._web_socket_url)
self._web_socket = websocket.WebSocketApp(
- self.web_socket_url,
+ self._web_socket_url,
on_message=self._on_pdp_message,
on_close=self._on_ws_close,
on_error=self._on_ws_error
@@ -101,15 +126,16 @@ class _PolicyReceiver(Thread):
return keep_running
def _stop_notifications(self):
- """Shuts down the AutoNotification service if running."""
+ """close the web-socket == stops the notification service if running."""
with self._lock:
if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
self._web_socket.close()
_PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
- def _on_pdp_message(self, _, message):
+ def _on_pdp_message(self, *args):
"""received the notification from PDP"""
try:
+ message = args and args[-1]
_PolicyReceiver._logger.info("Received notification message: %s", message)
if not message:
return
@@ -144,13 +170,13 @@ class _PolicyReceiver(Thread):
_PolicyReceiver._logger.exception(error_msg)
- def _on_ws_error(self, _, error):
+ def _on_ws_error(self, error):
"""report an error"""
- _PolicyReceiver._logger.error("policy-notification error: %s", error)
+ _PolicyReceiver._logger.exception("policy-notification error: %s", str(error))
- def _on_ws_close(self, _):
+ def _on_ws_close(self, code, reason):
"""restart web-socket on close"""
- _PolicyReceiver._logger.info("lost connection to PDP - restarting...")
+ _PolicyReceiver._logger.info("lost connection(%s, %s) to PDP - restarting...", code, reason)
def shutdown(self, audit):
"""Shutdown the policy-receiver"""
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 6ec982a..d10f4bf 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -23,10 +23,11 @@ import json
import logging
import time
from multiprocessing.dummy import Pool as ThreadPool
+from threading import Lock
import requests
-from .config import Config
+from .config import Config, Settings
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
@@ -51,6 +52,11 @@ class PolicyRest(object):
EXPECTED_VERSIONS = "expected_versions"
IGNORE_POLICY_NAMES = "ignore_policy_names"
+ _lock = Lock()
+ _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS,
+ Config.THREAD_POOL_SIZE,
+ Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP)
+
_requests_session = None
_url_get_config = None
_headers = None
@@ -60,39 +66,69 @@ class PolicyRest(object):
_policy_retry_sleep = 0
@staticmethod
- def _lazy_init():
+ def _init():
"""init static config"""
- if PolicyRest._lazy_inited:
- return
- PolicyRest._lazy_inited = True
+ _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
- config = Config.settings[Config.FIELD_POLICY_ENGINE]
+ if not PolicyRest._requests_session:
+ PolicyRest._requests_session = requests.Session()
- pool_size = Config.settings.get("pool_connections", 20)
- PolicyRest._requests_session = requests.Session()
- PolicyRest._requests_session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
- PolicyRest._requests_session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
+ changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20)
+ if changed:
+ PolicyRest._requests_session.mount(
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
+ PolicyRest._requests_session.mount(
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
PolicyRest._url_get_config = (config["url"] + config["path_api"]
+ PolicyRest.POLICY_GET_CONFIG)
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
- PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
+ _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
+ Config.THREAD_POOL_SIZE, 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
- PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
- PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
+ _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_COUNT, 1)
+ _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_SLEEP, 0)
- PolicyRest._logger.info(
- "PolicyClient url(%s) headers(%s)",
- PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers))
+ PolicyRest._logger.info("PDP url(%s) headers(%s): %s",
+ PolicyRest._url_get_config,
+ Metrics.json_dumps(PolicyRest._headers),
+ PolicyRest._settings)
+
+ PolicyRest._settings.commit_change()
+ PolicyRest._lazy_inited = True
+
+ @staticmethod
+ def reconfigure():
+ """reconfigure"""
+ with PolicyRest._lock:
+ PolicyRest._settings.set_config(Config.discovered_config)
+ if not PolicyRest._settings.is_changed():
+ PolicyRest._settings.commit_change()
+ return False
+
+ PolicyRest._lazy_inited = False
+ PolicyRest._init()
+ return True
+
+ @staticmethod
+ def _lazy_init():
+ """init static config"""
+ if PolicyRest._lazy_inited:
+ return
+
+ with PolicyRest._lock:
+ if PolicyRest._lazy_inited:
+ return
+
+ PolicyRest._settings.set_config(Config.discovered_config)
+ PolicyRest._init()
@staticmethod
def _pdp_get_config(audit, json_body):
@@ -100,26 +136,27 @@ class PolicyRest(object):
metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
targetServiceName=PolicyRest._url_get_config)
- msg = json.dumps(json_body)
- headers = copy.copy(PolicyRest._headers)
+ with PolicyRest._lock:
+ session = PolicyRest._requests_session
+ url = PolicyRest._url_get_config
+ headers = copy.deepcopy(PolicyRest._headers)
+
headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
- headers_str = Metrics.log_json_dumps(headers)
+ headers_str = Metrics.json_dumps(headers)
+
+ msg = json.dumps(json_body)
+ log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str)
+
+ PolicyRest._logger.info(metrics.metrics_start(log_line))
- log_line = "post to PDP {0} msg={1} headers={2}".format(
- PolicyRest._url_get_config, msg, headers_str)
- metrics.metrics_start(log_line)
- PolicyRest._logger.info(log_line)
res = None
try:
- res = PolicyRest._requests_session.post(
- PolicyRest._url_get_config, json=json_body, headers=headers)
+ res = session.post(url, json=json_body, headers=headers)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- error_msg = (
- "failed to post to PDP {0} {1}: {2} msg={3} headers={4}"
- .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str))
+ error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
PolicyRest._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
@@ -127,9 +164,9 @@ class PolicyRest(object):
metrics.metrics(error_msg)
return (error_code, None)
- log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
- PolicyRest._url_get_config, res.status_code, msg, res.text,
- Metrics.log_json_dumps(dict(res.request.headers.items())))
+ log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format(
+ res.status_code, url, msg, res.text,
+ Metrics.json_dumps(dict(res.request.headers.items())))
status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 7733146..8909cc7 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -22,7 +22,7 @@ import json
import logging
from threading import Event, Lock, Thread
-from .config import Config
+from .config import Config, Settings
from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID,
@@ -115,21 +115,45 @@ class PolicyUpdater(Thread):
"""sequentially handle the policy-updates and catch-ups in its own policy_updater thread"""
_logger = logging.getLogger("policy_handler.policy_updater")
- def __init__(self):
+ def __init__(self, on_reconfigure_receiver):
"""init static config of PolicyUpdater."""
- Thread.__init__(self, name="policy_updater")
- self.daemon = True
+ Thread.__init__(self, name="policy_updater", daemon=True)
+ self._reconfigure_receiver = on_reconfigure_receiver
self._lock = Lock()
self._run = Event()
+ self._settings = Settings(CATCH_UP, Config.RECONFIGURE)
self._catch_up_timer = None
+ self._reconfigure_timer = None
+
self._aud_shutdown = None
self._aud_catch_up = None
+ self._aud_reconfigure = None
self._policy_update = _PolicyUpdate()
- catch_up_config = Config.settings.get(CATCH_UP, {})
- self._catch_up_interval = catch_up_config.get("interval") or 15*60
+ self._catch_up_interval = None
+ self._reconfigure_interval = None
+ self._set_timer_intervals()
+
+ def _set_timer_intervals(self):
+ """set intervals on timers"""
+ self._settings.set_config(Config.discovered_config)
+ if not self._settings.is_changed():
+ self._settings.commit_change()
+ return False
+
+ _, catch_up = self._settings.get_by_key(CATCH_UP, {})
+ self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60
+
+ _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {})
+ self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60
+
+ PolicyUpdater._logger.info(
+ "intervals: catch_up(%s) reconfigure(%s): %s",
+ self._catch_up_interval, self._reconfigure_interval, self._settings)
+ self._settings.commit_change()
+ return True
def policy_update(self, policies_updated, policies_removed):
"""enqueue the policy-updates"""
@@ -148,8 +172,20 @@ class PolicyUpdater(Thread):
)
self._run.set()
+ def _reconfigure(self):
+ """job to check for and bring in the updated config for policy-handler"""
+ with self._lock:
+ if not self._aud_reconfigure:
+ self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE)
+ PolicyUpdater._logger.info(
+ "reconfigure %s request_id %s",
+ self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
+ )
+ self._run.set()
+
def run(self):
"""wait and run the policy-update in thread"""
+ self._run_reconfigure_timer()
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
self._run.wait()
@@ -160,6 +196,11 @@ class PolicyUpdater(Thread):
if not self._keep_running():
break
+ self._on_reconfigure()
+
+ if not self._keep_running():
+ break
+
if self._on_catch_up():
continue
@@ -183,7 +224,7 @@ class PolicyUpdater(Thread):
if self._catch_up_timer:
self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
- self._catch_up_timer.next()
+ self._catch_up_timer.next(self._catch_up_interval)
return
self._catch_up_timer = StepTimer(
@@ -196,14 +237,34 @@ class PolicyUpdater(Thread):
self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
self._catch_up_timer.start()
+ def _run_reconfigure_timer(self):
+ """create and start the reconfigure timer"""
+ if not self._reconfigure_interval:
+ return
+
+ if self._reconfigure_timer:
+ self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval)
+ self._reconfigure_timer.next(self._reconfigure_interval)
+ return
+
+ self._reconfigure_timer = StepTimer(
+ "reconfigure_timer",
+ self._reconfigure_interval,
+ PolicyUpdater._reconfigure,
+ PolicyUpdater._logger,
+ self
+ )
+ self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval)
+ self._reconfigure_timer.start()
+
def _pause_catch_up_timer(self):
"""pause catch_up_timer"""
if self._catch_up_timer:
self._logger.info("pause catch_up_timer")
self._catch_up_timer.pause()
- def _stop_catch_up_timer(self):
- """stop and destroy the catch_up_timer"""
+ def _stop_timers(self):
+ """stop and destroy the catch_up and reconfigure timers"""
if self._catch_up_timer:
self._logger.info("stopping catch_up_timer")
self._catch_up_timer.stop()
@@ -211,6 +272,66 @@ class PolicyUpdater(Thread):
self._catch_up_timer = None
self._logger.info("stopped catch_up_timer")
+ if self._reconfigure_timer:
+ self._logger.info("stopping reconfigure_timer")
+ self._reconfigure_timer.stop()
+ self._reconfigure_timer.join()
+ self._reconfigure_timer = None
+ self._logger.info("stopped reconfigure_timer")
+
+ def _on_reconfigure(self):
+ """bring the latest config and reconfigure"""
+ with self._lock:
+ aud_reconfigure = self._aud_reconfigure
+ if self._aud_reconfigure:
+ self._aud_reconfigure = None
+
+ if not aud_reconfigure:
+ return
+
+ log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id)
+ reconfigure_result = ""
+ try:
+ PolicyUpdater._logger.info(log_line)
+ Config.discover(aud_reconfigure)
+ if not Config.discovered_config.is_changed():
+ reconfigure_result = " -- config not changed"
+ else:
+ reconfigure_result = " -- config changed for:"
+ if self._set_timer_intervals():
+ reconfigure_result += " timer_intervals"
+
+ if PolicyRest.reconfigure():
+ reconfigure_result += " " + Config.FIELD_POLICY_ENGINE
+
+ if DeployHandler.reconfigure(aud_reconfigure):
+ reconfigure_result += " " + Config.DEPLOY_HANDLER
+
+ if self._reconfigure_receiver():
+ reconfigure_result += " web-socket"
+
+ reconfigure_result += " -- change: {}".format(Config.discovered_config)
+
+ Config.discovered_config.commit_change()
+ aud_reconfigure.audit_done(result=reconfigure_result)
+ PolicyUpdater._logger.info(log_line + reconfigure_result)
+
+ except Exception as ex:
+ error_msg = "crash {} {}{}: {}: {}".format(
+ "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex))
+
+ PolicyUpdater._logger.exception(error_msg)
+ aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ aud_reconfigure.audit_done(result=error_msg)
+
+ self._run_reconfigure_timer()
+
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_reconfigure.health(full=True)))
+ PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info()))
+
+
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
@@ -239,7 +360,7 @@ class PolicyUpdater(Thread):
catch_up_result = "- not sending empty catch-up to deployment-handler"
else:
aud_catch_up.reset_http_status_not_found()
- DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+ DeployHandler.policy_update(aud_catch_up, catch_up_message)
if not aud_catch_up.is_success():
catch_up_result = "- failed to send catch-up to deployment-handler"
PolicyUpdater._logger.warning(catch_up_result)
@@ -256,6 +377,7 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.exception(error_msg)
aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ aud_catch_up.audit_done(result=error_msg)
success = False
self._run_catch_up_timer()
@@ -342,7 +464,7 @@ class PolicyUpdater(Thread):
self._aud_shutdown = audit
self._run.set()
- self._stop_catch_up_timer()
+ self._stop_timers()
if self.is_alive():
self.join()
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
index da83935..08d26f0 100644
--- a/policyhandler/policy_utils.py
+++ b/policyhandler/policy_utils.py
@@ -141,8 +141,10 @@ class Utils(object):
return json_str
@staticmethod
- def are_the_same(body_1, body_2):
+ def are_the_same(body_1, body_2, json_dumps=None):
"""check whether both objects are the same"""
+ if not json_dumps:
+ json_dumps = json.dumps
if (body_1 and not body_2) or (not body_1 and body_2):
Utils._logger.debug("only one is empty %s != %s", body_1, body_2)
return False
@@ -152,21 +154,21 @@ class Utils(object):
if isinstance(body_1, list) and isinstance(body_2, list):
if len(body_1) != len(body_2):
- Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2))
+ Utils._logger.debug("len %s != %s", json_dumps(body_1), json_dumps(body_2))
return False
for val_1, val_2 in zip(body_1, body_2):
- if not Utils.are_the_same(val_1, val_2):
+ if not Utils.are_the_same(val_1, val_2, json_dumps):
return False
return True
if isinstance(body_1, dict) and isinstance(body_2, dict):
if body_1.keys() ^ body_2.keys():
- Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2))
+ Utils._logger.debug("keys %s != %s", json_dumps(body_1), json_dumps(body_2))
return False
for key, val_1 in body_1.items():
- if not Utils.are_the_same(val_1, body_2[key]):
+ if not Utils.are_the_same(val_1, body_2[key], json_dumps):
return False
return True
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
index 768b400..0f4f8e4 100644
--- a/policyhandler/step_timer.py
+++ b/policyhandler/step_timer.py
@@ -71,9 +71,11 @@ class StepTimer(Thread):
self._finished,
)
- def next(self):
+ def next(self, interval=None):
"""continue with the next timeout"""
with self._lock:
+ if interval:
+ self._interval = interval
self._paused = False
if self._waiting_for_timeout:
self._next.set()